You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@drill.apache.org by gp...@apache.org on 2019/01/04 06:53:58 UTC
[drill] 03/10: DRILL-6931: File listing: fix issue for S3 directory
objects and improve performance for recursive listing closes #1590
This is an automated email from the ASF dual-hosted git repository.
gparai pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/drill.git
commit 7108f162cd4f18121aa9a8ace76326bd5fbf8264
Author: Arina Ielchiieva <ar...@gmail.com>
AuthorDate: Fri Dec 28 19:45:38 2018 +0200
DRILL-6931: File listing: fix issue for S3 directory objects and improve performance for recursive listing
closes #1590
---
.../planner/sql/handlers/ShowFilesHandler.java | 8 +-
.../store/ischema/InfoSchemaRecordGenerator.java | 3 +-
.../org/apache/drill/exec/util/FileSystemUtil.java | 230 ++++++++++++---------
3 files changed, 144 insertions(+), 97 deletions(-)
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/planner/sql/handlers/ShowFilesHandler.java b/exec/java-exec/src/main/java/org/apache/drill/exec/planner/sql/handlers/ShowFilesHandler.java
index 9782bbf..3398340 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/planner/sql/handlers/ShowFilesHandler.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/planner/sql/handlers/ShowFilesHandler.java
@@ -50,7 +50,7 @@ public class ShowFilesHandler extends DefaultSqlHandler {
SchemaPlus drillSchema = defaultSchema;
SqlShowFiles showFiles = unwrap(sqlNode, SqlShowFiles.class);
SqlIdentifier from = showFiles.getDb();
- String fromDir = "./";
+ String fromDir = null;
// Show files can be used without from clause, in which case we display the files in the default schema
if (from != null) {
@@ -61,7 +61,7 @@ public class ShowFilesHandler extends DefaultSqlHandler {
// Entire from clause is not a schema, try to obtain the schema without the last part of the specified clause.
drillSchema = SchemaUtilites.findSchema(defaultSchema, from.names.subList(0, from.names.size() - 1));
// Listing for specific directory: show files in dfs.tmp.specific_directory
- fromDir = fromDir + from.names.get((from.names.size() - 1));
+ fromDir = from.names.get((from.names.size() - 1));
}
if (drillSchema == null) {
@@ -81,7 +81,9 @@ public class ShowFilesHandler extends DefaultSqlHandler {
.build(logger);
}
- Path path = new Path(wsSchema.getDefaultLocation(), fromDir);
+ Path endPath = fromDir == null ? new Path(wsSchema.getDefaultLocation()) : new Path(wsSchema.getDefaultLocation(), fromDir);
+ // add URI to the path to ensure that directory objects are skipped (see S3AFileSystem.listStatus method)
+ Path path = new Path(wsSchema.getFS().getUri().toString(), endPath);
List<ShowFilesCommandResult> records = FileSystemUtil.listAllSafe(wsSchema.getFS(), path, false).stream()
// use ShowFilesCommandResult for backward compatibility
.map(fileStatus -> new ShowFilesCommandResult(new Records.File(wsSchema.getFullSchemaName(), wsSchema, fileStatus)))
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/store/ischema/InfoSchemaRecordGenerator.java b/exec/java-exec/src/main/java/org/apache/drill/exec/store/ischema/InfoSchemaRecordGenerator.java
index 1e72840..bb49e17 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/store/ischema/InfoSchemaRecordGenerator.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/store/ischema/InfoSchemaRecordGenerator.java
@@ -435,7 +435,8 @@ public abstract class InfoSchemaRecordGenerator<S> {
String defaultLocation = wsSchema.getDefaultLocation();
FileSystem fs = wsSchema.getFS();
boolean recursive = optionManager.getBoolean(ExecConstants.LIST_FILES_RECURSIVELY);
- FileSystemUtil.listAllSafe(fs, new Path(defaultLocation), recursive).forEach(
+ // add URI to the path to ensure that directory objects are skipped (see S3AFileSystem.listStatus method)
+ FileSystemUtil.listAllSafe(fs, new Path(fs.getUri().toString(), defaultLocation), recursive).forEach(
fileStatus -> records.add(new Records.File(schemaName, wsSchema, fileStatus))
);
}
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/util/FileSystemUtil.java b/exec/java-exec/src/main/java/org/apache/drill/exec/util/FileSystemUtil.java
index 47ac44c..82500da 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/util/FileSystemUtil.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/util/FileSystemUtil.java
@@ -17,6 +17,7 @@
*/
package org.apache.drill.exec.util;
+import org.apache.drill.common.exceptions.ErrorHelper;
import org.apache.hadoop.fs.FileStatus;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
@@ -25,7 +26,12 @@ import org.apache.hadoop.fs.PathFilter;
import java.io.IOException;
import java.util.ArrayList;
import java.util.Arrays;
+import java.util.Collections;
import java.util.List;
+import java.util.concurrent.ForkJoinPool;
+import java.util.concurrent.ForkJoinTask;
+import java.util.concurrent.RecursiveTask;
+import java.util.stream.Collectors;
import java.util.stream.Stream;
/**
@@ -42,6 +48,15 @@ public class FileSystemUtil {
public static final PathFilter DUMMY_FILTER = path -> true;
/**
+ * Indicates which file system objects should be returned during listing.
+ */
+ private enum Scope {
+ DIRECTORIES,
+ FILES,
+ ALL
+ }
+
+ /**
* Returns statuses of all directories present in given path applying custom filters if present.
* Will also include nested directories if recursive flag is set to true.
*
@@ -51,10 +66,8 @@ public class FileSystemUtil {
* @param filters list of custom filters (optional)
* @return list of matching directory statuses
*/
- public static List<FileStatus> listDirectories(final FileSystem fs, Path path, boolean recursive, PathFilter... filters) throws IOException {
- List<FileStatus> statuses = new ArrayList<>();
- listDirectories(fs, path, recursive, false, statuses, mergeFilters(filters));
- return statuses;
+ public static List<FileStatus> listDirectories(FileSystem fs, Path path, boolean recursive, PathFilter... filters) throws IOException {
+ return list(fs, path, Scope.DIRECTORIES, recursive, false, filters);
}
/**
@@ -68,14 +81,13 @@ public class FileSystemUtil {
* @param filters list of custom filters (optional)
* @return list of matching directory statuses
*/
- public static List<FileStatus> listDirectoriesSafe(final FileSystem fs, Path path, boolean recursive, PathFilter... filters) {
- List<FileStatus> statuses = new ArrayList<>();
+ public static List<FileStatus> listDirectoriesSafe(FileSystem fs, Path path, boolean recursive, PathFilter... filters) {
try {
- listDirectories(fs, path, recursive, true, statuses, mergeFilters(filters));
+ return list(fs, path, Scope.DIRECTORIES, recursive, true, filters);
} catch (Exception e) {
// all exceptions are ignored
+ return Collections.emptyList();
}
- return statuses;
}
/**
@@ -89,9 +101,7 @@ public class FileSystemUtil {
* @return list of matching file statuses
*/
public static List<FileStatus> listFiles(FileSystem fs, Path path, boolean recursive, PathFilter... filters) throws IOException {
- List<FileStatus> statuses = new ArrayList<>();
- listFiles(fs, path, recursive, false, statuses, mergeFilters(filters));
- return statuses;
+ return list(fs, path, Scope.FILES, recursive, false, filters);
}
/**
@@ -105,13 +115,12 @@ public class FileSystemUtil {
* @return list of matching file statuses
*/
public static List<FileStatus> listFilesSafe(FileSystem fs, Path path, boolean recursive, PathFilter... filters) {
- List<FileStatus> statuses = new ArrayList<>();
try {
- listFiles(fs, path, recursive, true, statuses, mergeFilters(filters));
+ return list(fs, path, Scope.FILES, recursive, true, filters);
} catch (Exception e) {
// all exceptions are ignored
+ return Collections.emptyList();
}
- return statuses;
}
/**
@@ -125,9 +134,7 @@ public class FileSystemUtil {
* @return list of matching directory and file statuses
*/
public static List<FileStatus> listAll(FileSystem fs, Path path, boolean recursive, PathFilter... filters) throws IOException {
- List<FileStatus> statuses = new ArrayList<>();
- listAll(fs, path, recursive, false, statuses, mergeFilters(filters));
- return statuses;
+ return list(fs, path, Scope.ALL, recursive, false, filters);
}
/**
@@ -142,13 +149,12 @@ public class FileSystemUtil {
* @return list of matching directory and file statuses
*/
public static List<FileStatus> listAllSafe(FileSystem fs, Path path, boolean recursive, PathFilter... filters) {
- List<FileStatus> statuses = new ArrayList<>();
try {
- listAll(fs, path, recursive, true, statuses, mergeFilters(filters));
+ return list(fs, path, Scope.ALL, recursive, true, filters);
} catch (Exception e) {
// all exceptions are ignored
+ return Collections.emptyList();
}
- return statuses;
}
/**
@@ -177,7 +183,7 @@ public class FileSystemUtil {
* @param filters array of filters
* @return one filter that combines all given filters
*/
- public static PathFilter mergeFilters(final PathFilter... filters) {
+ public static PathFilter mergeFilters(PathFilter... filters) {
if (filters.length == 0) {
return DUMMY_FILTER;
}
@@ -186,103 +192,141 @@ public class FileSystemUtil {
}
/**
- * Helper method that will store in given holder statuses of all directories present in given path applying custom filter.
- * If recursive flag is set to true, will call itself recursively to add statuses of nested directories.
- * If suppress exceptions flag is set to true, will ignore all exceptions during listing.
+ * Helper method that merges given filters into one and
+ * determines which listing method should be called based on recursive flag value.
*
- * @param fs current file system
- * @param path path to directory
- * @param recursive true if nested directories should be included
- * @param suppressExceptions indicates if exceptions should be ignored during listing
- * @param statuses holder for directory statuses
- * @param filter custom filter
- * @return holder with all matching directory statuses
+ * @param fs file system
+ * @param path path to file or directory
+ * @param scope file system objects scope
+ * @param recursive indicates if listing should be done recursively
+ * @param suppressExceptions indicates if exceptions should be ignored
+ * @param filters filters to be applied
+ * @return list of file statuses
*/
- private static List<FileStatus> listDirectories(FileSystem fs, Path path, boolean recursive, boolean suppressExceptions,
- List<FileStatus> statuses, PathFilter filter) throws IOException {
- try {
- for (FileStatus status : fs.listStatus(path, filter)) {
- if (status.isDirectory()) {
- statuses.add(status);
- if (recursive) {
- listDirectories(fs, status.getPath(), true, suppressExceptions, statuses, filter);
- }
- }
- }
- } catch (Exception e) {
- if (suppressExceptions) {
- logger.debug("Exception during listing file statuses", e);
- } else {
- throw e;
- }
- }
- return statuses;
+ private static List<FileStatus> list(FileSystem fs, Path path, Scope scope, boolean recursive, boolean suppressExceptions, PathFilter... filters) throws IOException {
+ PathFilter filter = mergeFilters(filters);
+ return recursive ? listRecursive(fs, path, scope, suppressExceptions, filter)
+ : listNonRecursive(fs, path, scope, suppressExceptions, filter);
}
/**
- * Helper method that will store in given holder statuses of all files present in given path applying custom filter.
- * If recursive flag is set to true, will call itself recursively to add file statuses from nested directories.
- * If suppress exceptions flag is set to true, will ignore all exceptions during listing.
+ * Lists file statuses non-recursively based on given file system objects {@link Scope}.
*
- * @param fs current file system
+ * @param fs file system
* @param path path to file or directory
- * @param recursive true if files in nested directories should be included
- * @param suppressExceptions indicates if exceptions should be ignored during listing
- * @param statuses holder for file statuses
- * @param filter custom filter
- * @return holder with all matching file statuses
+ * @param scope file system objects scope
+ * @param suppressExceptions indicates if exceptions should be ignored
+ * @param filter filter to be applied
+ * @return list of file statuses
*/
- private static List<FileStatus> listFiles(FileSystem fs, Path path, boolean recursive, boolean suppressExceptions,
- List<FileStatus> statuses, PathFilter filter) throws IOException {
+ private static List<FileStatus> listNonRecursive(FileSystem fs, Path path, Scope scope, boolean suppressExceptions, PathFilter filter) throws IOException {
try {
- for (FileStatus status : fs.listStatus(path, filter)) {
- if (status.isDirectory()) {
- if (recursive) {
- listFiles(fs, status.getPath(), true, suppressExceptions, statuses, filter);
- }
- } else {
- statuses.add(status);
- }
- }
+ return Stream.of(fs.listStatus(path, filter))
+ .filter(status -> isStatusApplicable(status, scope))
+ .collect(Collectors.toList());
} catch (Exception e) {
if (suppressExceptions) {
logger.debug("Exception during listing file statuses", e);
+ return Collections.emptyList();
} else {
throw e;
}
}
- return statuses;
}
/**
- * Helper method that will store in given holder statuses of all directories and files present in given path applying custom filter.
- * If recursive flag is set to true, will call itself recursively to add nested directories and their file statuses.
- * If suppress exceptions flag is set to true, will ignore all exceptions during listing.
+ * Lists file statuses recursively based on given file system objects {@link Scope}.
+ * Uses {@link ForkJoinPool} executor service and {@link RecursiveListing} task
+ * to parallel and speed up listing.
*
- * @param fs current file system
+ * @param fs file system
* @param path path to file or directory
- * @param recursive true if nested directories and their files should be included
- * @param suppressExceptions indicates if exceptions should be ignored during listing
- * @param statuses holder for directory and file statuses
- * @param filter custom filter
- * @return holder with all matching directory and file statuses
+ * @param scope file system objects scope
+ * @param suppressExceptions indicates if exceptions should be ignored
+ * @param filter filter to be applied
+ * @return list of file statuses
*/
- private static List<FileStatus> listAll(FileSystem fs, Path path, boolean recursive, boolean suppressExceptions,
- List<FileStatus> statuses, PathFilter filter) throws IOException {
+ private static List<FileStatus> listRecursive(FileSystem fs, Path path, Scope scope, boolean suppressExceptions, PathFilter filter) {
+ ForkJoinPool pool = new ForkJoinPool();
try {
- for (FileStatus status : fs.listStatus(path, filter)) {
- statuses.add(status);
- if (status.isDirectory() && recursive) {
- listAll(fs, status.getPath(), true, suppressExceptions, statuses, filter);
+ RecursiveListing task = new RecursiveListing(fs, path, scope, suppressExceptions, filter);
+ return pool.invoke(task);
+ } finally {
+ pool.shutdown();
+ }
+ }
+
+ /**
+ * Checks if file status is applicable based on file system object {@link Scope}.
+ *
+ * @param status file status
+ * @param scope file system objects scope
+ * @return true if status is applicable, false otherwise
+ */
+ private static boolean isStatusApplicable(FileStatus status, Scope scope) {
+ switch (scope) {
+ case DIRECTORIES:
+ return status.isDirectory();
+ case FILES:
+ return status.isFile();
+ case ALL:
+ return true;
+ default:
+ return false;
+ }
+ }
+
+ /**
+ * Task that parallels file status listing for each nested directory,
+ * gathers and returns common list of file statuses.
+ */
+ private static class RecursiveListing extends RecursiveTask<List<FileStatus>> {
+
+ private final FileSystem fs;
+ private final Path path;
+ private final Scope scope;
+ private final boolean suppressExceptions;
+ private final PathFilter filter;
+
+ RecursiveListing(FileSystem fs, Path path, Scope scope, boolean suppressExceptions, PathFilter filter) {
+ this.fs = fs;
+ this.path = path;
+ this.scope = scope;
+ this.suppressExceptions = suppressExceptions;
+ this.filter = filter;
+ }
+
+ @Override
+ protected List<FileStatus> compute() {
+ List<FileStatus> statuses = new ArrayList<>();
+ List<RecursiveListing> tasks = new ArrayList<>();
+
+ try {
+ for (FileStatus status : fs.listStatus(path, filter)) {
+ if (isStatusApplicable(status, scope)) {
+ statuses.add(status);
+ }
+ if (status.isDirectory()) {
+ RecursiveListing task = new RecursiveListing(fs, status.getPath(), scope, suppressExceptions, filter);
+ task.fork();
+ tasks.add(task);
+ }
+ }
+ } catch (Exception e) {
+ if (suppressExceptions) {
+ logger.debug("Exception during listing file statuses", e);
+ } else {
+ // is used to re-throw checked exception
+ ErrorHelper.sneakyThrow(e);
}
}
- } catch (Exception e) {
- if (suppressExceptions) {
- logger.debug("Exception during listing file statuses", e);
- } else {
- throw e;
- }
+
+ tasks.stream()
+ .map(ForkJoinTask::join)
+ .forEach(statuses::addAll);
+
+ return statuses;
}
- return statuses;
}
+
}