You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@drill.apache.org by gp...@apache.org on 2019/01/04 06:53:58 UTC

[drill] 03/10: DRILL-6931: File listing: fix issue for S3 directory objects and improve performance for recursive listing closes #1590

This is an automated email from the ASF dual-hosted git repository.

gparai pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/drill.git

commit 7108f162cd4f18121aa9a8ace76326bd5fbf8264
Author: Arina Ielchiieva <ar...@gmail.com>
AuthorDate: Fri Dec 28 19:45:38 2018 +0200

    DRILL-6931: File listing: fix issue for S3 directory objects and improve performance for recursive listing
    closes #1590
---
 .../planner/sql/handlers/ShowFilesHandler.java     |   8 +-
 .../store/ischema/InfoSchemaRecordGenerator.java   |   3 +-
 .../org/apache/drill/exec/util/FileSystemUtil.java | 230 ++++++++++++---------
 3 files changed, 144 insertions(+), 97 deletions(-)

diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/planner/sql/handlers/ShowFilesHandler.java b/exec/java-exec/src/main/java/org/apache/drill/exec/planner/sql/handlers/ShowFilesHandler.java
index 9782bbf..3398340 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/planner/sql/handlers/ShowFilesHandler.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/planner/sql/handlers/ShowFilesHandler.java
@@ -50,7 +50,7 @@ public class ShowFilesHandler extends DefaultSqlHandler {
     SchemaPlus drillSchema = defaultSchema;
     SqlShowFiles showFiles = unwrap(sqlNode, SqlShowFiles.class);
     SqlIdentifier from = showFiles.getDb();
-    String fromDir = "./";
+    String fromDir = null;
 
     // Show files can be used without from clause, in which case we display the files in the default schema
     if (from != null) {
@@ -61,7 +61,7 @@ public class ShowFilesHandler extends DefaultSqlHandler {
         // Entire from clause is not a schema, try to obtain the schema without the last part of the specified clause.
         drillSchema = SchemaUtilites.findSchema(defaultSchema, from.names.subList(0, from.names.size() - 1));
         // Listing for specific directory: show files in dfs.tmp.specific_directory
-        fromDir = fromDir + from.names.get((from.names.size() - 1));
+        fromDir = from.names.get((from.names.size() - 1));
       }
 
       if (drillSchema == null) {
@@ -81,7 +81,9 @@ public class ShowFilesHandler extends DefaultSqlHandler {
           .build(logger);
     }
 
-    Path path = new Path(wsSchema.getDefaultLocation(), fromDir);
+    Path endPath = fromDir == null ? new Path(wsSchema.getDefaultLocation()) : new Path(wsSchema.getDefaultLocation(), fromDir);
+    // add URI to the path to ensure that directory objects are skipped (see S3AFileSystem.listStatus method)
+    Path path = new Path(wsSchema.getFS().getUri().toString(), endPath);
     List<ShowFilesCommandResult> records = FileSystemUtil.listAllSafe(wsSchema.getFS(), path, false).stream()
         // use ShowFilesCommandResult for backward compatibility
         .map(fileStatus -> new ShowFilesCommandResult(new Records.File(wsSchema.getFullSchemaName(), wsSchema, fileStatus)))
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/store/ischema/InfoSchemaRecordGenerator.java b/exec/java-exec/src/main/java/org/apache/drill/exec/store/ischema/InfoSchemaRecordGenerator.java
index 1e72840..bb49e17 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/store/ischema/InfoSchemaRecordGenerator.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/store/ischema/InfoSchemaRecordGenerator.java
@@ -435,7 +435,8 @@ public abstract class InfoSchemaRecordGenerator<S> {
           String defaultLocation = wsSchema.getDefaultLocation();
           FileSystem fs = wsSchema.getFS();
           boolean recursive = optionManager.getBoolean(ExecConstants.LIST_FILES_RECURSIVELY);
-          FileSystemUtil.listAllSafe(fs, new Path(defaultLocation), recursive).forEach(
+          // add URI to the path to ensure that directory objects are skipped (see S3AFileSystem.listStatus method)
+          FileSystemUtil.listAllSafe(fs, new Path(fs.getUri().toString(), defaultLocation), recursive).forEach(
               fileStatus -> records.add(new Records.File(schemaName, wsSchema, fileStatus))
           );
         }
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/util/FileSystemUtil.java b/exec/java-exec/src/main/java/org/apache/drill/exec/util/FileSystemUtil.java
index 47ac44c..82500da 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/util/FileSystemUtil.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/util/FileSystemUtil.java
@@ -17,6 +17,7 @@
  */
 package org.apache.drill.exec.util;
 
+import org.apache.drill.common.exceptions.ErrorHelper;
 import org.apache.hadoop.fs.FileStatus;
 import org.apache.hadoop.fs.FileSystem;
 import org.apache.hadoop.fs.Path;
@@ -25,7 +26,12 @@ import org.apache.hadoop.fs.PathFilter;
 import java.io.IOException;
 import java.util.ArrayList;
 import java.util.Arrays;
+import java.util.Collections;
 import java.util.List;
+import java.util.concurrent.ForkJoinPool;
+import java.util.concurrent.ForkJoinTask;
+import java.util.concurrent.RecursiveTask;
+import java.util.stream.Collectors;
 import java.util.stream.Stream;
 
 /**
@@ -42,6 +48,15 @@ public class FileSystemUtil {
   public static final PathFilter DUMMY_FILTER = path -> true;
 
   /**
+   * Indicates which file system objects should be returned during listing.
+   */
+  private enum Scope {
+    DIRECTORIES,
+    FILES,
+    ALL
+  }
+
+  /**
    * Returns statuses of all directories present in given path applying custom filters if present.
    * Will also include nested directories if recursive flag is set to true.
    *
@@ -51,10 +66,8 @@ public class FileSystemUtil {
    * @param filters list of custom filters (optional)
    * @return list of matching directory statuses
    */
-  public static List<FileStatus> listDirectories(final FileSystem fs, Path path, boolean recursive, PathFilter... filters) throws IOException {
-    List<FileStatus> statuses = new ArrayList<>();
-    listDirectories(fs, path, recursive, false, statuses, mergeFilters(filters));
-    return statuses;
+  public static List<FileStatus> listDirectories(FileSystem fs, Path path, boolean recursive, PathFilter... filters) throws IOException {
+    return list(fs, path, Scope.DIRECTORIES, recursive, false, filters);
   }
 
   /**
@@ -68,14 +81,13 @@ public class FileSystemUtil {
    * @param filters list of custom filters (optional)
    * @return list of matching directory statuses
    */
-  public static List<FileStatus> listDirectoriesSafe(final FileSystem fs, Path path, boolean recursive, PathFilter... filters) {
-    List<FileStatus> statuses = new ArrayList<>();
+  public static List<FileStatus> listDirectoriesSafe(FileSystem fs, Path path, boolean recursive, PathFilter... filters) {
     try {
-      listDirectories(fs, path, recursive, true, statuses, mergeFilters(filters));
+      return list(fs, path, Scope.DIRECTORIES, recursive, true, filters);
     } catch (Exception e) {
       // all exceptions are ignored
+      return Collections.emptyList();
     }
-    return statuses;
   }
 
   /**
@@ -89,9 +101,7 @@ public class FileSystemUtil {
    * @return list of matching file statuses
    */
   public static List<FileStatus> listFiles(FileSystem fs, Path path, boolean recursive, PathFilter... filters) throws IOException {
-    List<FileStatus> statuses = new ArrayList<>();
-    listFiles(fs, path, recursive, false, statuses, mergeFilters(filters));
-    return statuses;
+    return list(fs, path, Scope.FILES, recursive, false, filters);
   }
 
   /**
@@ -105,13 +115,12 @@ public class FileSystemUtil {
    * @return list of matching file statuses
    */
   public static List<FileStatus> listFilesSafe(FileSystem fs, Path path, boolean recursive, PathFilter... filters) {
-    List<FileStatus> statuses = new ArrayList<>();
     try {
-      listFiles(fs, path, recursive, true, statuses, mergeFilters(filters));
+      return list(fs, path, Scope.FILES, recursive, true, filters);
     } catch (Exception e) {
       // all exceptions are ignored
+      return Collections.emptyList();
     }
-    return statuses;
   }
 
   /**
@@ -125,9 +134,7 @@ public class FileSystemUtil {
    * @return list of matching directory and file statuses
    */
   public static List<FileStatus> listAll(FileSystem fs, Path path, boolean recursive, PathFilter... filters) throws IOException {
-    List<FileStatus> statuses = new ArrayList<>();
-    listAll(fs, path, recursive, false, statuses, mergeFilters(filters));
-    return statuses;
+    return list(fs, path, Scope.ALL, recursive, false, filters);
   }
 
   /**
@@ -142,13 +149,12 @@ public class FileSystemUtil {
    * @return list of matching directory and file statuses
    */
   public static List<FileStatus> listAllSafe(FileSystem fs, Path path, boolean recursive, PathFilter... filters) {
-    List<FileStatus> statuses = new ArrayList<>();
     try {
-      listAll(fs, path, recursive, true, statuses, mergeFilters(filters));
+      return list(fs, path, Scope.ALL, recursive, true, filters);
     } catch (Exception e) {
       // all exceptions are ignored
+      return Collections.emptyList();
     }
-    return statuses;
   }
 
   /**
@@ -177,7 +183,7 @@ public class FileSystemUtil {
    * @param filters array of filters
    * @return one filter that combines all given filters
    */
-  public static PathFilter mergeFilters(final PathFilter... filters) {
+  public static PathFilter mergeFilters(PathFilter... filters) {
     if (filters.length == 0) {
       return DUMMY_FILTER;
     }
@@ -186,103 +192,141 @@ public class FileSystemUtil {
   }
 
   /**
-   * Helper method that will store in given holder statuses of all directories present in given path applying custom filter.
-   * If recursive flag is set to true, will call itself recursively to add statuses of nested directories.
-   * If suppress exceptions flag is set to true, will ignore all exceptions during listing.
+   * Helper method that merges given filters into one and
+   * determines which listing method should be called based on recursive flag value.
    *
-   * @param fs current file system
-   * @param path path to directory
-   * @param recursive true if nested directories should be included
-   * @param suppressExceptions indicates if exceptions should be ignored during listing
-   * @param statuses holder for directory statuses
-   * @param filter custom filter
-   * @return holder with all matching directory statuses
+   * @param fs file system
+   * @param path path to file or directory
+   * @param scope file system objects scope
+   * @param recursive indicates if listing should be done recursively
+   * @param suppressExceptions indicates if exceptions should be ignored
+   * @param filters filters to be applied
+   * @return list of file statuses
    */
-  private static List<FileStatus> listDirectories(FileSystem fs, Path path, boolean recursive, boolean suppressExceptions,
-                                                  List<FileStatus> statuses, PathFilter filter) throws IOException {
-    try {
-      for (FileStatus status : fs.listStatus(path, filter)) {
-        if (status.isDirectory()) {
-          statuses.add(status);
-          if (recursive) {
-            listDirectories(fs, status.getPath(), true, suppressExceptions, statuses, filter);
-          }
-        }
-      }
-    } catch (Exception e) {
-      if (suppressExceptions) {
-        logger.debug("Exception during listing file statuses", e);
-      } else {
-        throw e;
-      }
-    }
-    return statuses;
+  private static List<FileStatus> list(FileSystem fs, Path path, Scope scope, boolean recursive, boolean suppressExceptions, PathFilter... filters) throws IOException {
+    PathFilter filter = mergeFilters(filters);
+    return recursive ? listRecursive(fs, path, scope, suppressExceptions, filter)
+      : listNonRecursive(fs, path, scope, suppressExceptions, filter);
   }
 
   /**
-   * Helper method that will store in given holder statuses of all files present in given path applying custom filter.
-   * If recursive flag is set to true, will call itself recursively to add file statuses from nested directories.
-   * If suppress exceptions flag is set to true, will ignore all exceptions during listing.
+   * Lists file statuses non-recursively based on given file system objects {@link Scope}.
    *
-   * @param fs current file system
+   * @param fs file system
    * @param path path to file or directory
-   * @param recursive true if files in nested directories should be included
-   * @param suppressExceptions indicates if exceptions should be ignored during listing
-   * @param statuses holder for file statuses
-   * @param filter custom filter
-   * @return holder with all matching file statuses
+   * @param scope file system objects scope
+   * @param suppressExceptions indicates if exceptions should be ignored
+   * @param filter filter to be applied
+   * @return list of file statuses
    */
-  private static List<FileStatus> listFiles(FileSystem fs, Path path, boolean recursive, boolean suppressExceptions,
-                                            List<FileStatus> statuses, PathFilter filter) throws IOException {
+  private static List<FileStatus> listNonRecursive(FileSystem fs, Path path, Scope scope, boolean suppressExceptions, PathFilter filter) throws IOException {
     try {
-      for (FileStatus status : fs.listStatus(path, filter)) {
-        if (status.isDirectory()) {
-          if (recursive) {
-            listFiles(fs, status.getPath(), true, suppressExceptions, statuses, filter);
-          }
-        } else {
-          statuses.add(status);
-        }
-      }
+      return Stream.of(fs.listStatus(path, filter))
+        .filter(status -> isStatusApplicable(status, scope))
+        .collect(Collectors.toList());
     } catch (Exception e) {
       if (suppressExceptions) {
         logger.debug("Exception during listing file statuses", e);
+        return Collections.emptyList();
       } else {
         throw e;
       }
     }
-    return statuses;
   }
 
   /**
-   * Helper method that will store in given holder statuses of all directories and files present in given path applying custom filter.
-   * If recursive flag is set to true, will call itself recursively to add nested directories and their file statuses.
-   * If suppress exceptions flag is set to true, will ignore all exceptions during listing.
+   * Lists file statuses recursively based on given file system objects {@link Scope}.
+   * Uses {@link ForkJoinPool} executor service and {@link RecursiveListing} task
+   * to parallel and speed up listing.
    *
-   * @param fs current file system
+   * @param fs file system
    * @param path path to file or directory
-   * @param recursive true if nested directories and their files should be included
-   * @param suppressExceptions indicates if exceptions should be ignored during listing
-   * @param statuses holder for directory and file statuses
-   * @param filter custom filter
-   * @return holder with all matching directory and file statuses
+   * @param scope file system objects scope
+   * @param suppressExceptions indicates if exceptions should be ignored
+   * @param filter filter to be applied
+   * @return list of file statuses
    */
-  private static List<FileStatus> listAll(FileSystem fs, Path path, boolean recursive, boolean suppressExceptions,
-                                          List<FileStatus> statuses, PathFilter filter) throws IOException {
+  private static List<FileStatus> listRecursive(FileSystem fs, Path path, Scope scope, boolean suppressExceptions, PathFilter filter) {
+    ForkJoinPool pool = new ForkJoinPool();
     try {
-      for (FileStatus status : fs.listStatus(path, filter)) {
-        statuses.add(status);
-        if (status.isDirectory() && recursive) {
-          listAll(fs, status.getPath(), true, suppressExceptions, statuses, filter);
+      RecursiveListing task = new RecursiveListing(fs, path, scope, suppressExceptions, filter);
+      return pool.invoke(task);
+    } finally {
+      pool.shutdown();
+    }
+  }
+
+  /**
+   * Checks if file status is applicable based on file system object {@link Scope}.
+   *
+   * @param status file status
+   * @param scope file system objects scope
+   * @return true if status is applicable, false otherwise
+   */
+  private static boolean isStatusApplicable(FileStatus status, Scope scope) {
+    switch (scope) {
+      case DIRECTORIES:
+        return status.isDirectory();
+      case FILES:
+        return status.isFile();
+      case ALL:
+        return true;
+      default:
+        return false;
+    }
+  }
+
+  /**
+   * Task that parallels file status listing for each nested directory,
+   * gathers and returns common list of file statuses.
+   */
+  private static class RecursiveListing extends RecursiveTask<List<FileStatus>> {
+
+    private final FileSystem fs;
+    private final Path path;
+    private final Scope scope;
+    private final boolean suppressExceptions;
+    private final PathFilter filter;
+
+    RecursiveListing(FileSystem fs, Path path, Scope scope, boolean suppressExceptions, PathFilter filter) {
+      this.fs = fs;
+      this.path = path;
+      this.scope = scope;
+      this.suppressExceptions = suppressExceptions;
+      this.filter = filter;
+    }
+
+    @Override
+    protected List<FileStatus> compute() {
+      List<FileStatus> statuses = new ArrayList<>();
+      List<RecursiveListing> tasks = new ArrayList<>();
+
+      try {
+        for (FileStatus status : fs.listStatus(path, filter)) {
+          if (isStatusApplicable(status, scope)) {
+            statuses.add(status);
+          }
+          if (status.isDirectory()) {
+            RecursiveListing task = new RecursiveListing(fs, status.getPath(), scope, suppressExceptions, filter);
+            task.fork();
+            tasks.add(task);
+          }
+        }
+      } catch (Exception e) {
+        if (suppressExceptions) {
+          logger.debug("Exception during listing file statuses", e);
+        } else {
+          // is used to re-throw checked exception
+          ErrorHelper.sneakyThrow(e);
         }
       }
-    } catch (Exception e) {
-      if (suppressExceptions) {
-        logger.debug("Exception during listing file statuses", e);
-      } else {
-        throw e;
-      }
+
+      tasks.stream()
+        .map(ForkJoinTask::join)
+        .forEach(statuses::addAll);
+
+      return statuses;
     }
-    return statuses;
   }
+
 }