You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@impala.apache.org by st...@apache.org on 2021/03/31 08:52:19 UTC

[impala] branch master updated: IMPALA-10579: Fix usage of RemoteIterator in FileSystemUtil

This is an automated email from the ASF dual-hosted git repository.

stigahuang pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/impala.git


The following commit(s) were added to refs/heads/master by this push:
     new 1d839e4  IMPALA-10579: Fix usage of RemoteIterator in FileSystemUtil
1d839e4 is described below

commit 1d839e423e51b05314e3dbfd790cb1fa7fc82d98
Author: stiga-huang <hu...@gmail.com>
AuthorDate: Thu Mar 11 10:10:28 2021 +0800

    IMPALA-10579: Fix usage of RemoteIterator in FileSystemUtil
    
    HDFS FileSystem provides a listStatusIterator() API for listing remote
    storage using a RemoteIterator. We use it to list files when loading
    table file metadata.
    
    It's not guaranteed that a RemoteIterator can survive when its hasNext()
    or next() throws IOExceptions. We should stop the loop in this case.
    Otherwise, we may go into a infinite loop.
    
    Without HADOOP-16685, it's also not guaranteed that
    FileSystem.listStatusIterator() will throw a FileNotFoundException when
    the path doesn't exist.
    
    This patch refactors the file listing iterators so we don't need to
    depend on these two assumptions. The basic idea is:
     - On one side, we should not depends on other RemoteIterator's behavior
       after exception.
     - On the other side, we try to make our own iterators more robust on
       transient sub-directories. So table loading won't be failed by them.
    
    Tests:
     - Loop test_insert_stress.py 100 times. Verified the non-existing
       subdirs are skipped and inserts are stable in a high concurrency.
    
    Change-Id: I859bd4f976c51a34eb6a03cefd2ddcdf11656cea
    Reviewed-on: http://gerrit.cloudera.org:8080/17171
    Reviewed-by: Impala Public Jenkins <im...@cloudera.com>
    Tested-by: Impala Public Jenkins <im...@cloudera.com>
---
 .../org/apache/impala/common/FileSystemUtil.java   | 113 +++++++++++++++------
 1 file changed, 82 insertions(+), 31 deletions(-)

diff --git a/fe/src/main/java/org/apache/impala/common/FileSystemUtil.java b/fe/src/main/java/org/apache/impala/common/FileSystemUtil.java
index 592f74f..50e4ead 100644
--- a/fe/src/main/java/org/apache/impala/common/FileSystemUtil.java
+++ b/fe/src/main/java/org/apache/impala/common/FileSystemUtil.java
@@ -26,6 +26,7 @@ import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.fs.FileStatus;
 import org.apache.hadoop.fs.FileSystem;
 import org.apache.hadoop.fs.FileUtil;
+import org.apache.hadoop.fs.LocatedFileStatus;
 import org.apache.hadoop.fs.Path;
 import org.apache.hadoop.fs.RemoteIterator;
 import org.apache.hadoop.fs.adl.AdlFileSystem;
@@ -375,6 +376,14 @@ public class FileSystemUtil {
   }
 
   /**
+   * Returns true if the FileSystem supports recursive listFiles (instead of using the
+   * base FileSystem.listFiles()). Currently only S3.
+   */
+  public static boolean hasRecursiveListFiles(FileSystem fs) {
+    return isS3AFileSystem(fs);
+  }
+
+  /**
    * Returns true iff the filesystem is a S3AFileSystem.
    */
   public static boolean isS3AFileSystem(FileSystem fs) {
@@ -686,7 +695,8 @@ public class FileSystemUtil {
           return listFiles(fs, p, true, debugAction);
         }
         DebugUtils.executeDebugAction(debugAction, DebugUtils.REFRESH_HDFS_LISTING_DELAY);
-        return new FilterIterator(p, new RecursingIterator(fs, p));
+        return new FilterIterator(p, new RecursingIterator<>(fs, p,
+            FileSystemUtil::listStatusIterator));
       }
       DebugUtils.executeDebugAction(debugAction, DebugUtils.REFRESH_HDFS_LISTING_DELAY);
       return new FilterIterator(p, listStatusIterator(fs, p));
@@ -703,7 +713,16 @@ public class FileSystemUtil {
       boolean recursive, String debugAction) throws IOException {
     try {
       DebugUtils.executeDebugAction(debugAction, DebugUtils.REFRESH_HDFS_LISTING_DELAY);
-      return new FilterIterator(p, fs.listFiles(p, recursive));
+      RemoteIterator<LocatedFileStatus> baseIterator;
+      // For fs that doesn't override FileSystem.listFiles(), use our RecursingIterator to
+      // survive from transient sub-directories.
+      if (hasRecursiveListFiles(fs)) {
+        baseIterator = fs.listFiles(p, recursive);
+      } else {
+        baseIterator = new RecursingIterator<>(fs, p,
+            FileSystemUtil::listLocatedStatusIterator);
+      }
+      return new FilterIterator(p, baseIterator);
     } catch (FileNotFoundException e) {
       if (LOG.isWarnEnabled()) LOG.warn("Path does not exist: " + p.toString(), e);
       return null;
@@ -719,7 +738,8 @@ public class FileSystemUtil {
   public static RemoteIterator<FileStatus> listStatusIterator(FileSystem fs, Path p)
       throws IOException {
     RemoteIterator<FileStatus> iterator = fs.listStatusIterator(p);
-    // Some FileSystem implementations like GoogleHadoopFileSystem doesn't check
+    // Before HADOOP-16685, some FileSystem implementations (e.g. AzureBlobFileSystem,
+    // GoogleHadoopFileSystem and S3AFileSystem(pre-HADOOP-17281)) don't check
     // existence of the start path when creating the RemoteIterator. Instead, their
     // iterators throw the FileNotFoundException in the first call of hasNext() when
     // the start path doesn't exist. Here we call hasNext() to ensure start path exists.
@@ -728,6 +748,20 @@ public class FileSystemUtil {
   }
 
   /**
+   * Wrapper around FileSystem.listLocatedStatus() to make sure the path exists.
+   *
+   * @throws FileNotFoundException if <code>p</code> does not exist
+   * @throws IOException if any I/O error occurredd
+   */
+  public static RemoteIterator<LocatedFileStatus> listLocatedStatusIterator(
+      FileSystem fs, Path p) throws IOException {
+    RemoteIterator<LocatedFileStatus> iterator = fs.listLocatedStatus(p);
+    // Same as above, call hasNext() to ensure start path exists.
+    iterator.hasNext();
+    return iterator;
+  }
+
+  /**
    * Returns true if the path 'p' is a directory, false otherwise.
    */
   public static boolean isDir(Path p) throws IOException {
@@ -822,19 +856,9 @@ public class FileSystemUtil {
       // state)
       while (curFile_ == null) {
         FileStatus next;
-        try {
-          if (!baseIterator_.hasNext()) return false;
-          // if the next fileStatus is in ignored directory skip it
-           next = baseIterator_.next();
-        } catch (FileNotFoundException ex) {
-          // in case of concurrent operations by multiple engines it is possible that
-          // some temporary files are deleted while Impala is loading the table. For
-          // instance, hive deletes the temporary files in the .hive-staging directory
-          // after an insert query from Hive completes. If we are loading the table at
-          // the same time, we may get a FileNotFoundException which is safe to ignore.
-          LOG.warn(ex.getMessage());
-          continue;
-        }
+        if (!baseIterator_.hasNext()) return false;
+        next = baseIterator_.next();
+        // if the next fileStatus is in ignored directory skip it
         if (!isInIgnoredDirectory(startPath_, next)) {
           curFile_ = next;
           return true;
@@ -855,18 +879,35 @@ public class FileSystemUtil {
   }
 
   /**
-   * Iterator which recursively visits directories on a FileSystem, yielding
-   * files in an unspecified order.
+   * A function interface similar to java.util.BiFunction but allows throwing IOException.
    */
-  private static class RecursingIterator implements RemoteIterator<FileStatus> {
+  @FunctionalInterface
+  public interface BiFunctionWithException<T, U, R> {
+    R apply(T t, U u) throws IOException;
+  }
+
+  /**
+   * Iterator which recursively visits directories on a FileSystem, yielding
+   * files in an unspecified order. Some directories got from the current level listing
+   * may be found non-existing when we start to list them recursively. Such non-existing
+   * sub-directories will be skipped.
+   */
+  private static class RecursingIterator<T extends FileStatus>
+      implements RemoteIterator<T> {
+    private final BiFunctionWithException<FileSystem, Path, RemoteIterator<T>>
+        newIterFunc_;
     private final FileSystem fs_;
-    private final Stack<RemoteIterator<FileStatus>> iters_ = new Stack<>();
-    private RemoteIterator<FileStatus> curIter_;
-    private FileStatus curFile_;
+    private final Stack<RemoteIterator<T>> iters_ = new Stack<>();
+    private RemoteIterator<T> curIter_;
+    private T curFile_;
 
-    private RecursingIterator(FileSystem fs, Path startPath) throws IOException {
+    private RecursingIterator(FileSystem fs, Path startPath,
+        BiFunctionWithException<FileSystem, Path, RemoteIterator<T>> newIterFunc)
+        throws IOException {
       this.fs_ = Preconditions.checkNotNull(fs);
-      curIter_ = listStatusIterator(fs, Preconditions.checkNotNull(startPath));
+      this.newIterFunc_ = Preconditions.checkNotNull(newIterFunc);
+      Preconditions.checkNotNull(startPath);
+      curIter_ = newIterFunc.apply(fs, startPath);
     }
 
     @Override
@@ -876,8 +917,17 @@ public class FileSystemUtil {
       // state)
       while (curFile_ == null) {
         if (curIter_.hasNext()) {
-          // Consume the next file or directory from the current iterator.
-          handleFileStat(curIter_.next());
+          T fileStat = curIter_.next();
+          try {
+            handleFileStat(fileStat);
+          } catch (FileNotFoundException e) {
+            // in case of concurrent operations by multiple engines it is possible that
+            // some temporary files are deleted while Impala is loading the table. For
+            // instance, hive deletes the temporary files in the .hive-staging directory
+            // after an insert query from Hive completes. If we are loading the table at
+            // the same time, we may get a FileNotFoundException which is safe to ignore.
+            LOG.warn("Ignoring non-existing sub dir", e);
+          }
         } else if (!iters_.empty()) {
           // We ran out of entries in the current one, but we might still have
           // entries at a higher level of recursion.
@@ -898,21 +948,22 @@ public class FileSystemUtil {
      * @param fileStatus input status
      * @throws IOException if any IO error occurs
      */
-    private void handleFileStat(FileStatus fileStatus) throws IOException {
+    private void handleFileStat(T fileStatus) throws IOException {
       if (fileStatus.isFile()) {
         curFile_ = fileStatus;
         return;
       }
-      RemoteIterator<FileStatus> iter = listStatusIterator(fs_, fileStatus.getPath());
+      // Get sub iterator before updating curIter_ in case it throws exceptions.
+      RemoteIterator<T> subIter = newIterFunc_.apply(fs_, fileStatus.getPath());
       iters_.push(curIter_);
-      curIter_ = iter;
+      curIter_ = subIter;
       curFile_ = fileStatus;
     }
 
     @Override
-    public FileStatus next() throws IOException {
+    public T next() throws IOException {
       if (hasNext()) {
-        FileStatus result = curFile_;
+        T result = curFile_;
         // Reset back to 'null' so that hasNext() will pull a new entry on the next
         // call.
         curFile_ = null;