You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@impala.apache.org by st...@apache.org on 2021/03/31 08:52:19 UTC
[impala] branch master updated: IMPALA-10579: Fix usage of
RemoteIterator in FileSystemUtil
This is an automated email from the ASF dual-hosted git repository.
stigahuang pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/impala.git
The following commit(s) were added to refs/heads/master by this push:
new 1d839e4 IMPALA-10579: Fix usage of RemoteIterator in FileSystemUtil
1d839e4 is described below
commit 1d839e423e51b05314e3dbfd790cb1fa7fc82d98
Author: stiga-huang <hu...@gmail.com>
AuthorDate: Thu Mar 11 10:10:28 2021 +0800
IMPALA-10579: Fix usage of RemoteIterator in FileSystemUtil
HDFS FileSystem provides a listStatusIterator() API for listing remote
storage using a RemoteIterator. We use it to list files when loading
table file metadata.
It's not guaranteed that a RemoteIterator can survive when its hasNext()
or next() throws IOExceptions. We should stop the loop in this case.
Otherwise, we may go into a infinite loop.
Without HADOOP-16685, it's also not guaranteed that
FileSystem.listStatusIterator() will throw a FileNotFoundException when
the path doesn't exist.
This patch refactors the file listing iterators so we don't need to
depend on these two assumptions. The basic idea is:
- On one side, we should not depends on other RemoteIterator's behavior
after exception.
- On the other side, we try to make our own iterators more robust on
transient sub-directories. So table loading won't be failed by them.
Tests:
- Loop test_insert_stress.py 100 times. Verified the non-existing
subdirs are skipped and inserts are stable in a high concurrency.
Change-Id: I859bd4f976c51a34eb6a03cefd2ddcdf11656cea
Reviewed-on: http://gerrit.cloudera.org:8080/17171
Reviewed-by: Impala Public Jenkins <im...@cloudera.com>
Tested-by: Impala Public Jenkins <im...@cloudera.com>
---
.../org/apache/impala/common/FileSystemUtil.java | 113 +++++++++++++++------
1 file changed, 82 insertions(+), 31 deletions(-)
diff --git a/fe/src/main/java/org/apache/impala/common/FileSystemUtil.java b/fe/src/main/java/org/apache/impala/common/FileSystemUtil.java
index 592f74f..50e4ead 100644
--- a/fe/src/main/java/org/apache/impala/common/FileSystemUtil.java
+++ b/fe/src/main/java/org/apache/impala/common/FileSystemUtil.java
@@ -26,6 +26,7 @@ import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FileStatus;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.FileUtil;
+import org.apache.hadoop.fs.LocatedFileStatus;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.fs.RemoteIterator;
import org.apache.hadoop.fs.adl.AdlFileSystem;
@@ -375,6 +376,14 @@ public class FileSystemUtil {
}
/**
+ * Returns true if the FileSystem supports recursive listFiles (instead of using the
+ * base FileSystem.listFiles()). Currently only S3.
+ */
+ public static boolean hasRecursiveListFiles(FileSystem fs) {
+ return isS3AFileSystem(fs);
+ }
+
+ /**
* Returns true iff the filesystem is a S3AFileSystem.
*/
public static boolean isS3AFileSystem(FileSystem fs) {
@@ -686,7 +695,8 @@ public class FileSystemUtil {
return listFiles(fs, p, true, debugAction);
}
DebugUtils.executeDebugAction(debugAction, DebugUtils.REFRESH_HDFS_LISTING_DELAY);
- return new FilterIterator(p, new RecursingIterator(fs, p));
+ return new FilterIterator(p, new RecursingIterator<>(fs, p,
+ FileSystemUtil::listStatusIterator));
}
DebugUtils.executeDebugAction(debugAction, DebugUtils.REFRESH_HDFS_LISTING_DELAY);
return new FilterIterator(p, listStatusIterator(fs, p));
@@ -703,7 +713,16 @@ public class FileSystemUtil {
boolean recursive, String debugAction) throws IOException {
try {
DebugUtils.executeDebugAction(debugAction, DebugUtils.REFRESH_HDFS_LISTING_DELAY);
- return new FilterIterator(p, fs.listFiles(p, recursive));
+ RemoteIterator<LocatedFileStatus> baseIterator;
+ // For fs that doesn't override FileSystem.listFiles(), use our RecursingIterator to
+ // survive from transient sub-directories.
+ if (hasRecursiveListFiles(fs)) {
+ baseIterator = fs.listFiles(p, recursive);
+ } else {
+ baseIterator = new RecursingIterator<>(fs, p,
+ FileSystemUtil::listLocatedStatusIterator);
+ }
+ return new FilterIterator(p, baseIterator);
} catch (FileNotFoundException e) {
if (LOG.isWarnEnabled()) LOG.warn("Path does not exist: " + p.toString(), e);
return null;
@@ -719,7 +738,8 @@ public class FileSystemUtil {
public static RemoteIterator<FileStatus> listStatusIterator(FileSystem fs, Path p)
throws IOException {
RemoteIterator<FileStatus> iterator = fs.listStatusIterator(p);
- // Some FileSystem implementations like GoogleHadoopFileSystem doesn't check
+ // Before HADOOP-16685, some FileSystem implementations (e.g. AzureBlobFileSystem,
+ // GoogleHadoopFileSystem and S3AFileSystem(pre-HADOOP-17281)) don't check
// existence of the start path when creating the RemoteIterator. Instead, their
// iterators throw the FileNotFoundException in the first call of hasNext() when
// the start path doesn't exist. Here we call hasNext() to ensure start path exists.
@@ -728,6 +748,20 @@ public class FileSystemUtil {
}
/**
+ * Wrapper around FileSystem.listLocatedStatus() to make sure the path exists.
+ *
+ * @throws FileNotFoundException if <code>p</code> does not exist
+ * @throws IOException if any I/O error occurredd
+ */
+ public static RemoteIterator<LocatedFileStatus> listLocatedStatusIterator(
+ FileSystem fs, Path p) throws IOException {
+ RemoteIterator<LocatedFileStatus> iterator = fs.listLocatedStatus(p);
+ // Same as above, call hasNext() to ensure start path exists.
+ iterator.hasNext();
+ return iterator;
+ }
+
+ /**
* Returns true if the path 'p' is a directory, false otherwise.
*/
public static boolean isDir(Path p) throws IOException {
@@ -822,19 +856,9 @@ public class FileSystemUtil {
// state)
while (curFile_ == null) {
FileStatus next;
- try {
- if (!baseIterator_.hasNext()) return false;
- // if the next fileStatus is in ignored directory skip it
- next = baseIterator_.next();
- } catch (FileNotFoundException ex) {
- // in case of concurrent operations by multiple engines it is possible that
- // some temporary files are deleted while Impala is loading the table. For
- // instance, hive deletes the temporary files in the .hive-staging directory
- // after an insert query from Hive completes. If we are loading the table at
- // the same time, we may get a FileNotFoundException which is safe to ignore.
- LOG.warn(ex.getMessage());
- continue;
- }
+ if (!baseIterator_.hasNext()) return false;
+ next = baseIterator_.next();
+ // if the next fileStatus is in ignored directory skip it
if (!isInIgnoredDirectory(startPath_, next)) {
curFile_ = next;
return true;
@@ -855,18 +879,35 @@ public class FileSystemUtil {
}
/**
- * Iterator which recursively visits directories on a FileSystem, yielding
- * files in an unspecified order.
+ * A function interface similar to java.util.BiFunction but allows throwing IOException.
*/
- private static class RecursingIterator implements RemoteIterator<FileStatus> {
+ @FunctionalInterface
+ public interface BiFunctionWithException<T, U, R> {
+ R apply(T t, U u) throws IOException;
+ }
+
+ /**
+ * Iterator which recursively visits directories on a FileSystem, yielding
+ * files in an unspecified order. Some directories got from the current level listing
+ * may be found non-existing when we start to list them recursively. Such non-existing
+ * sub-directories will be skipped.
+ */
+ private static class RecursingIterator<T extends FileStatus>
+ implements RemoteIterator<T> {
+ private final BiFunctionWithException<FileSystem, Path, RemoteIterator<T>>
+ newIterFunc_;
private final FileSystem fs_;
- private final Stack<RemoteIterator<FileStatus>> iters_ = new Stack<>();
- private RemoteIterator<FileStatus> curIter_;
- private FileStatus curFile_;
+ private final Stack<RemoteIterator<T>> iters_ = new Stack<>();
+ private RemoteIterator<T> curIter_;
+ private T curFile_;
- private RecursingIterator(FileSystem fs, Path startPath) throws IOException {
+ private RecursingIterator(FileSystem fs, Path startPath,
+ BiFunctionWithException<FileSystem, Path, RemoteIterator<T>> newIterFunc)
+ throws IOException {
this.fs_ = Preconditions.checkNotNull(fs);
- curIter_ = listStatusIterator(fs, Preconditions.checkNotNull(startPath));
+ this.newIterFunc_ = Preconditions.checkNotNull(newIterFunc);
+ Preconditions.checkNotNull(startPath);
+ curIter_ = newIterFunc.apply(fs, startPath);
}
@Override
@@ -876,8 +917,17 @@ public class FileSystemUtil {
// state)
while (curFile_ == null) {
if (curIter_.hasNext()) {
- // Consume the next file or directory from the current iterator.
- handleFileStat(curIter_.next());
+ T fileStat = curIter_.next();
+ try {
+ handleFileStat(fileStat);
+ } catch (FileNotFoundException e) {
+ // in case of concurrent operations by multiple engines it is possible that
+ // some temporary files are deleted while Impala is loading the table. For
+ // instance, hive deletes the temporary files in the .hive-staging directory
+ // after an insert query from Hive completes. If we are loading the table at
+ // the same time, we may get a FileNotFoundException which is safe to ignore.
+ LOG.warn("Ignoring non-existing sub dir", e);
+ }
} else if (!iters_.empty()) {
// We ran out of entries in the current one, but we might still have
// entries at a higher level of recursion.
@@ -898,21 +948,22 @@ public class FileSystemUtil {
* @param fileStatus input status
* @throws IOException if any IO error occurs
*/
- private void handleFileStat(FileStatus fileStatus) throws IOException {
+ private void handleFileStat(T fileStatus) throws IOException {
if (fileStatus.isFile()) {
curFile_ = fileStatus;
return;
}
- RemoteIterator<FileStatus> iter = listStatusIterator(fs_, fileStatus.getPath());
+ // Get sub iterator before updating curIter_ in case it throws exceptions.
+ RemoteIterator<T> subIter = newIterFunc_.apply(fs_, fileStatus.getPath());
iters_.push(curIter_);
- curIter_ = iter;
+ curIter_ = subIter;
curFile_ = fileStatus;
}
@Override
- public FileStatus next() throws IOException {
+ public T next() throws IOException {
if (hasNext()) {
- FileStatus result = curFile_;
+ T result = curFile_;
// Reset back to 'null' so that hasNext() will pull a new entry on the next
// call.
curFile_ = null;