You are viewing a plain text version of this content. The canonical link for it is here.
Posted to dev@drill.apache.org by GitBox <gi...@apache.org> on 2019/01/04 06:54:07 UTC

[GitHub] gparai closed pull request #1590: DRILL-6931: File listing: fix issue for S3 directory objects and impove performance for recursive listing

gparai closed pull request #1590: DRILL-6931: File listing: fix issue for S3 directory objects and impove performance for recursive listing
URL: https://github.com/apache/drill/pull/1590
 
 
   

This is a PR merged from a forked repository.
As GitHub hides the original diff on merge, it is displayed below for
the sake of provenance:

As this is a foreign pull request (from a fork), the diff is supplied
below (as it won't show otherwise due to GitHub magic):

diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/planner/sql/handlers/ShowFilesHandler.java b/exec/java-exec/src/main/java/org/apache/drill/exec/planner/sql/handlers/ShowFilesHandler.java
index 9782bbff6b8..3398340574c 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/planner/sql/handlers/ShowFilesHandler.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/planner/sql/handlers/ShowFilesHandler.java
@@ -50,7 +50,7 @@ public PhysicalPlan getPlan(SqlNode sqlNode) throws ForemanSetupException {
     SchemaPlus drillSchema = defaultSchema;
     SqlShowFiles showFiles = unwrap(sqlNode, SqlShowFiles.class);
     SqlIdentifier from = showFiles.getDb();
-    String fromDir = "./";
+    String fromDir = null;
 
     // Show files can be used without from clause, in which case we display the files in the default schema
     if (from != null) {
@@ -61,7 +61,7 @@ public PhysicalPlan getPlan(SqlNode sqlNode) throws ForemanSetupException {
         // Entire from clause is not a schema, try to obtain the schema without the last part of the specified clause.
         drillSchema = SchemaUtilites.findSchema(defaultSchema, from.names.subList(0, from.names.size() - 1));
         // Listing for specific directory: show files in dfs.tmp.specific_directory
-        fromDir = fromDir + from.names.get((from.names.size() - 1));
+        fromDir = from.names.get((from.names.size() - 1));
       }
 
       if (drillSchema == null) {
@@ -81,7 +81,9 @@ public PhysicalPlan getPlan(SqlNode sqlNode) throws ForemanSetupException {
           .build(logger);
     }
 
-    Path path = new Path(wsSchema.getDefaultLocation(), fromDir);
+    Path endPath = fromDir == null ? new Path(wsSchema.getDefaultLocation()) : new Path(wsSchema.getDefaultLocation(), fromDir);
+    // add URI to the path to ensure that directory objects are skipped (see S3AFileSystem.listStatus method)
+    Path path = new Path(wsSchema.getFS().getUri().toString(), endPath);
     List<ShowFilesCommandResult> records = FileSystemUtil.listAllSafe(wsSchema.getFS(), path, false).stream()
         // use ShowFilesCommandResult for backward compatibility
         .map(fileStatus -> new ShowFilesCommandResult(new Records.File(wsSchema.getFullSchemaName(), wsSchema, fileStatus)))
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/store/ischema/InfoSchemaRecordGenerator.java b/exec/java-exec/src/main/java/org/apache/drill/exec/store/ischema/InfoSchemaRecordGenerator.java
index 1e728403d31..bb49e171e4e 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/store/ischema/InfoSchemaRecordGenerator.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/store/ischema/InfoSchemaRecordGenerator.java
@@ -435,7 +435,8 @@ public void visitFiles(String schemaName, SchemaPlus schemaPlus) {
           String defaultLocation = wsSchema.getDefaultLocation();
           FileSystem fs = wsSchema.getFS();
           boolean recursive = optionManager.getBoolean(ExecConstants.LIST_FILES_RECURSIVELY);
-          FileSystemUtil.listAllSafe(fs, new Path(defaultLocation), recursive).forEach(
+          // add URI to the path to ensure that directory objects are skipped (see S3AFileSystem.listStatus method)
+          FileSystemUtil.listAllSafe(fs, new Path(fs.getUri().toString(), defaultLocation), recursive).forEach(
               fileStatus -> records.add(new Records.File(schemaName, wsSchema, fileStatus))
           );
         }
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/util/FileSystemUtil.java b/exec/java-exec/src/main/java/org/apache/drill/exec/util/FileSystemUtil.java
index 47ac44c0755..82500da30f6 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/util/FileSystemUtil.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/util/FileSystemUtil.java
@@ -17,6 +17,7 @@
  */
 package org.apache.drill.exec.util;
 
+import org.apache.drill.common.exceptions.ErrorHelper;
 import org.apache.hadoop.fs.FileStatus;
 import org.apache.hadoop.fs.FileSystem;
 import org.apache.hadoop.fs.Path;
@@ -25,7 +26,12 @@
 import java.io.IOException;
 import java.util.ArrayList;
 import java.util.Arrays;
+import java.util.Collections;
 import java.util.List;
+import java.util.concurrent.ForkJoinPool;
+import java.util.concurrent.ForkJoinTask;
+import java.util.concurrent.RecursiveTask;
+import java.util.stream.Collectors;
 import java.util.stream.Stream;
 
 /**
@@ -41,6 +47,15 @@
    */
   public static final PathFilter DUMMY_FILTER = path -> true;
 
+  /**
+   * Indicates which file system objects should be returned during listing.
+   */
+  private enum Scope {
+    DIRECTORIES,
+    FILES,
+    ALL
+  }
+
   /**
    * Returns statuses of all directories present in given path applying custom filters if present.
    * Will also include nested directories if recursive flag is set to true.
@@ -51,10 +66,8 @@
    * @param filters list of custom filters (optional)
    * @return list of matching directory statuses
    */
-  public static List<FileStatus> listDirectories(final FileSystem fs, Path path, boolean recursive, PathFilter... filters) throws IOException {
-    List<FileStatus> statuses = new ArrayList<>();
-    listDirectories(fs, path, recursive, false, statuses, mergeFilters(filters));
-    return statuses;
+  public static List<FileStatus> listDirectories(FileSystem fs, Path path, boolean recursive, PathFilter... filters) throws IOException {
+    return list(fs, path, Scope.DIRECTORIES, recursive, false, filters);
   }
 
   /**
@@ -68,14 +81,13 @@
    * @param filters list of custom filters (optional)
    * @return list of matching directory statuses
    */
-  public static List<FileStatus> listDirectoriesSafe(final FileSystem fs, Path path, boolean recursive, PathFilter... filters) {
-    List<FileStatus> statuses = new ArrayList<>();
+  public static List<FileStatus> listDirectoriesSafe(FileSystem fs, Path path, boolean recursive, PathFilter... filters) {
     try {
-      listDirectories(fs, path, recursive, true, statuses, mergeFilters(filters));
+      return list(fs, path, Scope.DIRECTORIES, recursive, true, filters);
     } catch (Exception e) {
       // all exceptions are ignored
+      return Collections.emptyList();
     }
-    return statuses;
   }
 
   /**
@@ -89,9 +101,7 @@
    * @return list of matching file statuses
    */
   public static List<FileStatus> listFiles(FileSystem fs, Path path, boolean recursive, PathFilter... filters) throws IOException {
-    List<FileStatus> statuses = new ArrayList<>();
-    listFiles(fs, path, recursive, false, statuses, mergeFilters(filters));
-    return statuses;
+    return list(fs, path, Scope.FILES, recursive, false, filters);
   }
 
   /**
@@ -105,13 +115,12 @@
    * @return list of matching file statuses
    */
   public static List<FileStatus> listFilesSafe(FileSystem fs, Path path, boolean recursive, PathFilter... filters) {
-    List<FileStatus> statuses = new ArrayList<>();
     try {
-      listFiles(fs, path, recursive, true, statuses, mergeFilters(filters));
+      return list(fs, path, Scope.FILES, recursive, true, filters);
     } catch (Exception e) {
       // all exceptions are ignored
+      return Collections.emptyList();
     }
-    return statuses;
   }
 
   /**
@@ -125,9 +134,7 @@
    * @return list of matching directory and file statuses
    */
   public static List<FileStatus> listAll(FileSystem fs, Path path, boolean recursive, PathFilter... filters) throws IOException {
-    List<FileStatus> statuses = new ArrayList<>();
-    listAll(fs, path, recursive, false, statuses, mergeFilters(filters));
-    return statuses;
+    return list(fs, path, Scope.ALL, recursive, false, filters);
   }
 
   /**
@@ -142,13 +149,12 @@
    * @return list of matching directory and file statuses
    */
   public static List<FileStatus> listAllSafe(FileSystem fs, Path path, boolean recursive, PathFilter... filters) {
-    List<FileStatus> statuses = new ArrayList<>();
     try {
-      listAll(fs, path, recursive, true, statuses, mergeFilters(filters));
+      return list(fs, path, Scope.ALL, recursive, true, filters);
     } catch (Exception e) {
       // all exceptions are ignored
+      return Collections.emptyList();
     }
-    return statuses;
   }
 
   /**
@@ -177,7 +183,7 @@ public static PathFilter mergeFilters(PathFilter filter, PathFilter[] filters) {
    * @param filters array of filters
    * @return one filter that combines all given filters
    */
-  public static PathFilter mergeFilters(final PathFilter... filters) {
+  public static PathFilter mergeFilters(PathFilter... filters) {
     if (filters.length == 0) {
       return DUMMY_FILTER;
     }
@@ -186,103 +192,141 @@ public static PathFilter mergeFilters(final PathFilter... filters) {
   }
 
   /**
-   * Helper method that will store in given holder statuses of all directories present in given path applying custom filter.
-   * If recursive flag is set to true, will call itself recursively to add statuses of nested directories.
-   * If suppress exceptions flag is set to true, will ignore all exceptions during listing.
+   * Helper method that merges given filters into one and
+   * determines which listing method should be called based on recursive flag value.
    *
-   * @param fs current file system
-   * @param path path to directory
-   * @param recursive true if nested directories should be included
-   * @param suppressExceptions indicates if exceptions should be ignored during listing
-   * @param statuses holder for directory statuses
-   * @param filter custom filter
-   * @return holder with all matching directory statuses
+   * @param fs file system
+   * @param path path to file or directory
+   * @param scope file system objects scope
+   * @param recursive indicates if listing should be done recursively
+   * @param suppressExceptions indicates if exceptions should be ignored
+   * @param filters filters to be applied
+   * @return list of file statuses
    */
-  private static List<FileStatus> listDirectories(FileSystem fs, Path path, boolean recursive, boolean suppressExceptions,
-                                                  List<FileStatus> statuses, PathFilter filter) throws IOException {
-    try {
-      for (FileStatus status : fs.listStatus(path, filter)) {
-        if (status.isDirectory()) {
-          statuses.add(status);
-          if (recursive) {
-            listDirectories(fs, status.getPath(), true, suppressExceptions, statuses, filter);
-          }
-        }
-      }
-    } catch (Exception e) {
-      if (suppressExceptions) {
-        logger.debug("Exception during listing file statuses", e);
-      } else {
-        throw e;
-      }
-    }
-    return statuses;
+  private static List<FileStatus> list(FileSystem fs, Path path, Scope scope, boolean recursive, boolean suppressExceptions, PathFilter... filters) throws IOException {
+    PathFilter filter = mergeFilters(filters);
+    return recursive ? listRecursive(fs, path, scope, suppressExceptions, filter)
+      : listNonRecursive(fs, path, scope, suppressExceptions, filter);
   }
 
   /**
-   * Helper method that will store in given holder statuses of all files present in given path applying custom filter.
-   * If recursive flag is set to true, will call itself recursively to add file statuses from nested directories.
-   * If suppress exceptions flag is set to true, will ignore all exceptions during listing.
+   * Lists file statuses non-recursively based on given file system objects {@link Scope}.
    *
-   * @param fs current file system
+   * @param fs file system
    * @param path path to file or directory
-   * @param recursive true if files in nested directories should be included
-   * @param suppressExceptions indicates if exceptions should be ignored during listing
-   * @param statuses holder for file statuses
-   * @param filter custom filter
-   * @return holder with all matching file statuses
+   * @param scope file system objects scope
+   * @param suppressExceptions indicates if exceptions should be ignored
+   * @param filter filter to be applied
+   * @return list of file statuses
    */
-  private static List<FileStatus> listFiles(FileSystem fs, Path path, boolean recursive, boolean suppressExceptions,
-                                            List<FileStatus> statuses, PathFilter filter) throws IOException {
+  private static List<FileStatus> listNonRecursive(FileSystem fs, Path path, Scope scope, boolean suppressExceptions, PathFilter filter) throws IOException {
     try {
-      for (FileStatus status : fs.listStatus(path, filter)) {
-        if (status.isDirectory()) {
-          if (recursive) {
-            listFiles(fs, status.getPath(), true, suppressExceptions, statuses, filter);
-          }
-        } else {
-          statuses.add(status);
-        }
-      }
+      return Stream.of(fs.listStatus(path, filter))
+        .filter(status -> isStatusApplicable(status, scope))
+        .collect(Collectors.toList());
     } catch (Exception e) {
       if (suppressExceptions) {
         logger.debug("Exception during listing file statuses", e);
+        return Collections.emptyList();
       } else {
         throw e;
       }
     }
-    return statuses;
   }
 
   /**
-   * Helper method that will store in given holder statuses of all directories and files present in given path applying custom filter.
-   * If recursive flag is set to true, will call itself recursively to add nested directories and their file statuses.
-   * If suppress exceptions flag is set to true, will ignore all exceptions during listing.
+   * Lists file statuses recursively based on given file system objects {@link Scope}.
+   * Uses {@link ForkJoinPool} executor service and {@link RecursiveListing} task
+   * to parallel and speed up listing.
    *
-   * @param fs current file system
+   * @param fs file system
    * @param path path to file or directory
-   * @param recursive true if nested directories and their files should be included
-   * @param suppressExceptions indicates if exceptions should be ignored during listing
-   * @param statuses holder for directory and file statuses
-   * @param filter custom filter
-   * @return holder with all matching directory and file statuses
+   * @param scope file system objects scope
+   * @param suppressExceptions indicates if exceptions should be ignored
+   * @param filter filter to be applied
+   * @return list of file statuses
    */
-  private static List<FileStatus> listAll(FileSystem fs, Path path, boolean recursive, boolean suppressExceptions,
-                                          List<FileStatus> statuses, PathFilter filter) throws IOException {
+  private static List<FileStatus> listRecursive(FileSystem fs, Path path, Scope scope, boolean suppressExceptions, PathFilter filter) {
+    ForkJoinPool pool = new ForkJoinPool();
     try {
-      for (FileStatus status : fs.listStatus(path, filter)) {
-        statuses.add(status);
-        if (status.isDirectory() && recursive) {
-          listAll(fs, status.getPath(), true, suppressExceptions, statuses, filter);
+      RecursiveListing task = new RecursiveListing(fs, path, scope, suppressExceptions, filter);
+      return pool.invoke(task);
+    } finally {
+      pool.shutdown();
+    }
+  }
+
+  /**
+   * Checks if file status is applicable based on file system object {@link Scope}.
+   *
+   * @param status file status
+   * @param scope file system objects scope
+   * @return true if status is applicable, false otherwise
+   */
+  private static boolean isStatusApplicable(FileStatus status, Scope scope) {
+    switch (scope) {
+      case DIRECTORIES:
+        return status.isDirectory();
+      case FILES:
+        return status.isFile();
+      case ALL:
+        return true;
+      default:
+        return false;
+    }
+  }
+
+  /**
+   * Task that parallels file status listing for each nested directory,
+   * gathers and returns common list of file statuses.
+   */
+  private static class RecursiveListing extends RecursiveTask<List<FileStatus>> {
+
+    private final FileSystem fs;
+    private final Path path;
+    private final Scope scope;
+    private final boolean suppressExceptions;
+    private final PathFilter filter;
+
+    RecursiveListing(FileSystem fs, Path path, Scope scope, boolean suppressExceptions, PathFilter filter) {
+      this.fs = fs;
+      this.path = path;
+      this.scope = scope;
+      this.suppressExceptions = suppressExceptions;
+      this.filter = filter;
+    }
+
+    @Override
+    protected List<FileStatus> compute() {
+      List<FileStatus> statuses = new ArrayList<>();
+      List<RecursiveListing> tasks = new ArrayList<>();
+
+      try {
+        for (FileStatus status : fs.listStatus(path, filter)) {
+          if (isStatusApplicable(status, scope)) {
+            statuses.add(status);
+          }
+          if (status.isDirectory()) {
+            RecursiveListing task = new RecursiveListing(fs, status.getPath(), scope, suppressExceptions, filter);
+            task.fork();
+            tasks.add(task);
+          }
+        }
+      } catch (Exception e) {
+        if (suppressExceptions) {
+          logger.debug("Exception during listing file statuses", e);
+        } else {
+          // is used to re-throw checked exception
+          ErrorHelper.sneakyThrow(e);
         }
       }
-    } catch (Exception e) {
-      if (suppressExceptions) {
-        logger.debug("Exception during listing file statuses", e);
-      } else {
-        throw e;
-      }
+
+      tasks.stream()
+        .map(ForkJoinTask::join)
+        .forEach(statuses::addAll);
+
+      return statuses;
     }
-    return statuses;
   }
+
 }


 

----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
users@infra.apache.org


With regards,
Apache Git Services