You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@drill.apache.org by ar...@apache.org on 2017/07/21 17:23:41 UTC

[5/5] drill git commit: DRILL-4720: Fix SchemaPartitionExplorer.getSubPartitions method implementations to return only Drill file system directories

DRILL-4720: Fix SchemaPartitionExplorer.getSubPartitions method implementations to return only Drill file system directories

1. Added file system util helper classes to standardize list directory and file statuses usage in Drill with appropriate unit tests.
2. Fixed SchemaPartitionExplorer.getSubPartitions method implementations to return only directories that can be partitions according to Drill file system rules
(excluded all files and directories that start with dot or underscore).
3. Added unit test for directory explorers UDFs with and without metadata cache presence.
4. Minor refactoring.

closes #864


Project: http://git-wip-us.apache.org/repos/asf/drill/repo
Commit: http://git-wip-us.apache.org/repos/asf/drill/commit/a0c178ba
Tree: http://git-wip-us.apache.org/repos/asf/drill/tree/a0c178ba
Diff: http://git-wip-us.apache.org/repos/asf/drill/diff/a0c178ba

Branch: refs/heads/master
Commit: a0c178babb6d82a30af8fdf5e912cbc9c9526a85
Parents: 368bc38
Author: Arina Ielchiieva <ar...@gmail.com>
Authored: Thu Jun 29 16:08:33 2017 +0300
Committer: Arina Ielchiieva <ar...@gmail.com>
Committed: Fri Jul 21 19:38:35 2017 +0300

----------------------------------------------------------------------
 .../planner/sql/handlers/ShowFileHandler.java   |  13 +-
 .../drill/exec/store/dfs/DrillFileSystem.java   |  36 +--
 .../drill/exec/store/dfs/DrillPathFilter.java   |  34 ---
 .../drill/exec/store/dfs/FileSelection.java     |  28 +--
 .../exec/store/dfs/FileSystemSchemaFactory.java |   3 +-
 .../exec/store/dfs/WorkspaceSchemaFactory.java  |  17 +-
 .../exec/store/parquet/FooterGatherer.java      |  12 +-
 .../drill/exec/store/parquet/Metadata.java      |  36 +--
 .../exec/store/parquet/ParquetFormatPlugin.java |  14 +-
 .../exec/store/parquet/ParquetGroupScan.java    |  16 +-
 .../store/sys/store/LocalPersistentStore.java   |  21 +-
 .../drill/exec/util/DrillFileSystemUtil.java    |  91 +++++++
 .../apache/drill/exec/util/FileSystemUtil.java  | 207 ++++++++++++++++
 .../exec/planner/TestDirectoryExplorerUDFs.java |  70 +++++-
 .../exec/util/DrillFileSystemUtilTest.java      | 158 ++++++++++++
 .../drill/exec/util/FileSystemUtilTest.java     | 240 +++++++++++++++++++
 .../drill/exec/util/FileSystemUtilTestBase.java | 112 +++++++++
 17 files changed, 942 insertions(+), 166 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/drill/blob/a0c178ba/exec/java-exec/src/main/java/org/apache/drill/exec/planner/sql/handlers/ShowFileHandler.java
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/planner/sql/handlers/ShowFileHandler.java b/exec/java-exec/src/main/java/org/apache/drill/exec/planner/sql/handlers/ShowFileHandler.java
index fb564a2..5e6af7c 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/planner/sql/handlers/ShowFileHandler.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/planner/sql/handlers/ShowFileHandler.java
@@ -1,4 +1,4 @@
-/**
+/*
  * Licensed to the Apache Software Foundation (ASF) under one
  * or more contributor license agreements.  See the NOTICE file
  * distributed with this work for additional information
@@ -33,6 +33,7 @@ import org.apache.drill.exec.planner.sql.SchemaUtilites;
 import org.apache.drill.exec.planner.sql.parser.SqlShowFiles;
 import org.apache.drill.exec.store.AbstractSchema;
 import org.apache.drill.exec.store.dfs.DrillFileSystem;
+import org.apache.drill.exec.util.FileSystemUtil;
 import org.apache.drill.exec.store.dfs.WorkspaceSchemaFactory.WorkspaceSchema;
 import org.apache.hadoop.fs.FileStatus;
 import org.apache.hadoop.fs.Path;
@@ -50,8 +51,8 @@ public class ShowFileHandler extends DefaultSqlHandler {
 
     SqlIdentifier from = ((SqlShowFiles) sqlNode).getDb();
 
-    DrillFileSystem fs = null;
-    String defaultLocation = null;
+    DrillFileSystem fs;
+    String defaultLocation;
     String fromDir = "./";
 
     SchemaPlus defaultSchema = config.getConverter().getDefaultSchema();
@@ -93,9 +94,9 @@ public class ShowFileHandler extends DefaultSqlHandler {
 
     List<ShowFilesCommandResult> rows = new ArrayList<>();
 
-    for (FileStatus fileStatus : fs.list(false, new Path(defaultLocation, fromDir))) {
-      ShowFilesCommandResult result = new ShowFilesCommandResult(fileStatus.getPath().getName(), fileStatus.isDir(),
-                                                                 !fileStatus.isDir(), fileStatus.getLen(),
+    for (FileStatus fileStatus : FileSystemUtil.listAll(fs, new Path(defaultLocation, fromDir), false)) {
+      ShowFilesCommandResult result = new ShowFilesCommandResult(fileStatus.getPath().getName(), fileStatus.isDirectory(),
+                                                                 fileStatus.isFile(), fileStatus.getLen(),
                                                                  fileStatus.getOwner(), fileStatus.getGroup(),
                                                                  fileStatus.getPermission().toString(),
                                                                  fileStatus.getAccessTime(), fileStatus.getModificationTime());

http://git-wip-us.apache.org/repos/asf/drill/blob/a0c178ba/exec/java-exec/src/main/java/org/apache/drill/exec/store/dfs/DrillFileSystem.java
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/store/dfs/DrillFileSystem.java b/exec/java-exec/src/main/java/org/apache/drill/exec/store/dfs/DrillFileSystem.java
index e03cf22..52e1a96 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/store/dfs/DrillFileSystem.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/store/dfs/DrillFileSystem.java
@@ -1,4 +1,4 @@
-/**
+/*
  * Licensed to the Apache Software Foundation (ASF) under one
  * or more contributor license agreements.  See the NOTICE file
  * distributed with this work for additional information
@@ -62,7 +62,6 @@ import org.apache.hadoop.security.token.Token;
 import org.apache.hadoop.util.Progressable;
 
 import com.google.common.annotations.VisibleForTesting;
-import com.google.common.collect.Lists;
 import com.google.common.collect.Maps;
 
 /**
@@ -75,8 +74,8 @@ public class DrillFileSystem extends FileSystem implements OpenFileTracker {
   static final org.slf4j.Logger logger = org.slf4j.LoggerFactory.getLogger(DrillFileSystem.class);
   private final static boolean TRACKING_ENABLED = AssertionUtil.isAssertionsEnabled();
 
-  public static final String HIDDEN_FILE_PREFIX = "_";
-  public static final String DOT_FILE_PREFIX = ".";
+  public static final String UNDERSCORE_PREFIX = "_";
+  public static final String DOT_PREFIX = ".";
 
   private final ConcurrentMap<DrillFSDataInputStream, DebugStackTrace> openedFiles = Maps.newConcurrentMap();
 
@@ -747,35 +746,6 @@ public class DrillFileSystem extends FileSystem implements OpenFileTracker {
     underlyingFs.removeXAttr(path, name);
   }
 
-  public List<FileStatus> list(boolean recursive, Path... paths) throws IOException {
-    if (recursive) {
-      List<FileStatus> statuses = Lists.newArrayList();
-      for (Path p : paths) {
-        addRecursiveStatus(underlyingFs.getFileStatus(p), statuses);
-      }
-      return statuses;
-
-    } else {
-      return Lists.newArrayList(underlyingFs.listStatus(paths));
-    }
-  }
-
-  private void addRecursiveStatus(FileStatus parent, List<FileStatus> listToFill) throws IOException {
-    if (parent.isDir()) {
-      Path pattern = new Path(parent.getPath(), "*");
-      FileStatus[] sub = underlyingFs.globStatus(pattern, new DrillPathFilter());
-      for(FileStatus s : sub){
-        if (s.isDir()) {
-          addRecursiveStatus(s, listToFill);
-        } else {
-          listToFill.add(s);
-        }
-      }
-    } else {
-      listToFill.add(parent);
-    }
-  }
-
   public InputStream openPossiblyCompressedStream(Path path) throws IOException {
     CompressionCodec codec = codecFactory.getCodec(path); // infers from file ext.
     if (codec != null) {

http://git-wip-us.apache.org/repos/asf/drill/blob/a0c178ba/exec/java-exec/src/main/java/org/apache/drill/exec/store/dfs/DrillPathFilter.java
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/store/dfs/DrillPathFilter.java b/exec/java-exec/src/main/java/org/apache/drill/exec/store/dfs/DrillPathFilter.java
deleted file mode 100644
index 00f463d..0000000
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/store/dfs/DrillPathFilter.java
+++ /dev/null
@@ -1,34 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.drill.exec.store.dfs;
-
-import org.apache.hadoop.fs.Path;
-import org.apache.hadoop.mapred.Utils;
-
-public class DrillPathFilter extends Utils.OutputFileUtils.OutputFilesFilter {
-  @Override
-  public boolean accept(Path path) {
-    if (path.getName().startsWith(DrillFileSystem.HIDDEN_FILE_PREFIX)) {
-      return false;
-    }
-    if (path.getName().startsWith(DrillFileSystem.DOT_FILE_PREFIX)) {
-      return false;
-    }
-    return super.accept(path);
-  }
-}

http://git-wip-us.apache.org/repos/asf/drill/blob/a0c178ba/exec/java-exec/src/main/java/org/apache/drill/exec/store/dfs/FileSelection.java
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/store/dfs/FileSelection.java b/exec/java-exec/src/main/java/org/apache/drill/exec/store/dfs/FileSelection.java
index 3a89591..7682d69 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/store/dfs/FileSelection.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/store/dfs/FileSelection.java
@@ -20,20 +20,15 @@ package org.apache.drill.exec.store.dfs;
 import java.io.IOException;
 import java.net.URI;
 import java.util.List;
-import java.util.Map;
 import java.util.concurrent.TimeUnit;
 
-import javax.annotation.Nullable;
-
 import com.google.common.base.Preconditions;
-import com.google.common.base.Predicate;
 import com.google.common.base.Stopwatch;
 import com.google.common.base.Strings;
-import com.google.common.collect.Iterables;
 import com.google.common.collect.Lists;
-import com.google.common.collect.Maps;
 
 import org.apache.drill.common.exceptions.DrillRuntimeException;
+import org.apache.drill.exec.util.DrillFileSystemUtil;
 import org.apache.hadoop.fs.FileStatus;
 import org.apache.hadoop.fs.Path;
 
@@ -166,23 +161,16 @@ public class FileSelection {
       return this;
     }
     Stopwatch timer = Stopwatch.createStarted();
-    final List<FileStatus> statuses = getStatuses(fs);
-    final int total = statuses.size();
-    final Path[] paths = new Path[total];
-    for (int i=0; i<total; i++) {
-      paths[i] = statuses.get(i).getPath();
+    List<FileStatus> statuses = getStatuses(fs);
+
+    List<FileStatus> nonDirectories = Lists.newArrayList();
+    for (FileStatus status : statuses) {
+      nonDirectories.addAll(DrillFileSystemUtil.listFiles(fs, status.getPath(), true));
     }
-    final List<FileStatus> allStats = fs.list(true, paths);
-    final List<FileStatus> nonDirectories = Lists.newArrayList(Iterables.filter(allStats, new Predicate<FileStatus>() {
-      @Override
-      public boolean apply(@Nullable FileStatus status) {
-        return !status.isDirectory();
-      }
-    }));
 
     final FileSelection fileSel = create(nonDirectories, null, selectionRoot);
     logger.debug("FileSelection.minusDirectories() took {} ms, numFiles: {}",
-        timer.elapsed(TimeUnit.MILLISECONDS), total);
+        timer.elapsed(TimeUnit.MILLISECONDS), statuses.size());
 
     // fileSel will be null if we query an empty folder
     if (fileSel != null) {
@@ -425,7 +413,7 @@ public class FileSelection {
   @Override
   public String toString() {
     final StringBuilder sb = new StringBuilder();
-    sb.append("root=" + this.selectionRoot);
+    sb.append("root=").append(this.selectionRoot);
 
     sb.append("files=[");
     boolean isFirst = true;

http://git-wip-us.apache.org/repos/asf/drill/blob/a0c178ba/exec/java-exec/src/main/java/org/apache/drill/exec/store/dfs/FileSystemSchemaFactory.java
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/store/dfs/FileSystemSchemaFactory.java b/exec/java-exec/src/main/java/org/apache/drill/exec/store/dfs/FileSystemSchemaFactory.java
index e3e01c4..cf30162 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/store/dfs/FileSystemSchemaFactory.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/store/dfs/FileSystemSchemaFactory.java
@@ -37,6 +37,7 @@ import org.apache.drill.exec.store.dfs.WorkspaceSchemaFactory.WorkspaceSchema;
 
 import com.google.common.collect.ImmutableList;
 import com.google.common.collect.Maps;
+import org.apache.drill.exec.util.DrillFileSystemUtil;
 import org.apache.hadoop.fs.FileStatus;
 import org.apache.hadoop.fs.Path;
 
@@ -95,7 +96,7 @@ public class FileSystemSchemaFactory implements SchemaFactory{
                                             ) throws PartitionNotFoundException {
       List<FileStatus> fileStatuses;
       try {
-        fileStatuses = defaultSchema.getFS().list(false, new Path(defaultSchema.getDefaultLocation(), table));
+        fileStatuses = DrillFileSystemUtil.listDirectories(defaultSchema.getFS(), new Path(defaultSchema.getDefaultLocation(), table), false);
       } catch (IOException e) {
         throw new PartitionNotFoundException("Error finding partitions for table " + table, e);
       }

http://git-wip-us.apache.org/repos/asf/drill/blob/a0c178ba/exec/java-exec/src/main/java/org/apache/drill/exec/store/dfs/WorkspaceSchemaFactory.java
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/store/dfs/WorkspaceSchemaFactory.java b/exec/java-exec/src/main/java/org/apache/drill/exec/store/dfs/WorkspaceSchemaFactory.java
index 8416ed8..b2798a1 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/store/dfs/WorkspaceSchemaFactory.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/store/dfs/WorkspaceSchemaFactory.java
@@ -64,6 +64,7 @@ import org.apache.drill.exec.planner.sql.ExpandingConcurrentMap;
 import org.apache.drill.exec.store.AbstractSchema;
 import org.apache.drill.exec.store.PartitionNotFoundException;
 import org.apache.drill.exec.store.SchemaConfig;
+import org.apache.drill.exec.util.DrillFileSystemUtil;
 import org.apache.drill.exec.util.ImpersonationUtil;
 import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.fs.FileStatus;
@@ -404,7 +405,7 @@ public class WorkspaceSchemaFactory {
 
       List<FileStatus> fileStatuses;
       try {
-        fileStatuses = getFS().list(false, new Path(getDefaultLocation(), table));
+        fileStatuses = DrillFileSystemUtil.listDirectories(getFS(), new Path(getDefaultLocation(), table), false);
       } catch (IOException e) {
         throw new PartitionNotFoundException("Error finding partitions for table " + table, e);
       }
@@ -639,12 +640,12 @@ public class WorkspaceSchemaFactory {
     }
 
     /**
-     * Check if the table contains homogenenous files that can be read by Drill. Eg: parquet, json csv etc.
+     * Check if the table contains homogeneous files that can be read by Drill. Eg: parquet, json csv etc.
      * However if it contains more than one of these formats or a totally different file format that Drill cannot
      * understand then we will raise an exception.
-     * @param tableName - name of the table to be checked for homogeneous property
-     * @return
-     * @throws IOException
+     * @param tableName name of the table to be checked for homogeneous property
+     * @return true if table contains homogeneous files, false otherwise
+     * @throws IOException is case of problems accessing table files
      */
     private boolean isHomogeneous(String tableName) throws IOException {
       FileSelection fileSelection = FileSelection.create(fs, config.getLocation(), tableName);
@@ -663,7 +664,7 @@ public class WorkspaceSchemaFactory {
       while (!listOfFiles.isEmpty()) {
         FileStatus currentFile = listOfFiles.poll();
         if (currentFile.isDirectory()) {
-          listOfFiles.addAll(fs.list(true, currentFile.getPath()));
+          listOfFiles.addAll(DrillFileSystemUtil.listFiles(fs, currentFile.getPath(), true));
         } else {
           if (matcher != null) {
             if (!matcher.isFileReadable(fs, currentFile)) {
@@ -709,7 +710,7 @@ public class WorkspaceSchemaFactory {
         long time =  (System.currentTimeMillis()/1000);
         Long p1 = ((Integer.MAX_VALUE - time) << 32) + r.nextInt();
         Long p2 = r.nextLong();
-        final String fileNameDelimiter = DrillFileSystem.HIDDEN_FILE_PREFIX;
+        final String fileNameDelimiter = DrillFileSystem.UNDERSCORE_PREFIX;
         String[] pathSplit = table.split(Path.SEPARATOR);
         /*
          * Builds the string for the renamed table
@@ -718,7 +719,7 @@ public class WorkspaceSchemaFactory {
          * separated by underscores
          */
         tableRenameBuilder
-            .append(DrillFileSystem.HIDDEN_FILE_PREFIX)
+            .append(DrillFileSystem.UNDERSCORE_PREFIX)
             .append(pathSplit[pathSplit.length - 1])
             .append(fileNameDelimiter)
             .append(p1.toString())

http://git-wip-us.apache.org/repos/asf/drill/blob/a0c178ba/exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet/FooterGatherer.java
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet/FooterGatherer.java b/exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet/FooterGatherer.java
index b68ffbb..3ba6ff0 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet/FooterGatherer.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet/FooterGatherer.java
@@ -1,4 +1,4 @@
-/**
+/*
  * Licensed to the Apache Software Foundation (ASF) under one
  * or more contributor license agreements.  See the NOTICE file
  * distributed with this work for additional information
@@ -24,7 +24,7 @@ import java.util.List;
 
 import org.apache.commons.lang3.ArrayUtils;
 import org.apache.drill.exec.store.TimedRunnable;
-import org.apache.drill.exec.store.dfs.DrillPathFilter;
+import org.apache.drill.exec.util.DrillFileSystemUtil;
 import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.fs.FSDataInputStream;
 import org.apache.hadoop.fs.FileStatus;
@@ -68,10 +68,10 @@ public class FooterGatherer {
   public static List<Footer> getFooters(final Configuration conf, List<FileStatus> statuses, int parallelism) throws IOException {
     final List<TimedRunnable<Footer>> readers = Lists.newArrayList();
     List<Footer> foundFooters = Lists.newArrayList();
-    for(FileStatus status : statuses){
+    for (FileStatus status : statuses) {
 
 
-      if(status.isDirectory()){
+      if (status.isDirectory()){
         // first we check for summary file.
         FileSystem fs = status.getPath().getFileSystem(conf);
 
@@ -83,10 +83,10 @@ public class FooterGatherer {
         }
 
         // else we handle as normal file.
-        for(FileStatus inStatus : fs.listStatus(status.getPath(), new DrillPathFilter())){
+        for (FileStatus inStatus : DrillFileSystemUtil.listFiles(fs, status.getPath(), false)){
           readers.add(new FooterReader(conf, inStatus));
         }
-      }else{
+      } else {
         readers.add(new FooterReader(conf, status));
       }
 

http://git-wip-us.apache.org/repos/asf/drill/blob/a0c178ba/exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet/Metadata.java
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet/Metadata.java b/exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet/Metadata.java
index 0a4ce60..d9b99f5 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet/Metadata.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet/Metadata.java
@@ -31,7 +31,7 @@ import org.apache.drill.common.expression.SchemaPath;
 import org.apache.drill.common.util.DrillVersionInfo;
 import org.apache.drill.exec.store.AbstractRecordReader;
 import org.apache.drill.exec.store.TimedRunnable;
-import org.apache.drill.exec.store.dfs.DrillPathFilter;
+import org.apache.drill.exec.util.DrillFileSystemUtil;
 import org.apache.drill.exec.store.dfs.MetadataContext;
 import org.apache.drill.exec.util.ImpersonationUtil;
 import org.apache.hadoop.fs.BlockLocation;
@@ -179,7 +179,7 @@ public class Metadata {
 
     final List<FileStatus> childFiles = Lists.newArrayList();
 
-    for (final FileStatus file : fs.listStatus(p, new DrillPathFilter())) {
+    for (final FileStatus file : DrillFileSystemUtil.listAll(fs, p, false)) {
       if (file.isDirectory()) {
         ParquetTableMetadata_v3 subTableMetadata = (createMetaFilesRecursively(file.getPath().toString())).getLeft();
         metaDataList.addAll(subTableMetadata.files);
@@ -233,17 +233,22 @@ public class Metadata {
   }
 
   /**
-   * Get the parquet metadata for the parquet files in a directory
+   * Get the parquet metadata for the parquet files in a directory.
    *
    * @param path the path of the directory
-   * @return
-   * @throws IOException
+   * @return metadata object for an entire parquet directory structure
+   * @throws IOException in case of problems during accessing files
    */
   private ParquetTableMetadata_v3 getParquetTableMetadata(String path) throws IOException {
     Path p = new Path(path);
     FileStatus fileStatus = fs.getFileStatus(p);
     final Stopwatch watch = Stopwatch.createStarted();
-    List<FileStatus> fileStatuses = getFileStatuses(fileStatus);
+    List<FileStatus> fileStatuses = new ArrayList<>();
+    if (fileStatus.isFile()) {
+      fileStatuses.add(fileStatus);
+    } else {
+      fileStatuses.addAll(DrillFileSystemUtil.listFiles(fs, p, true));
+    }
     logger.info("Took {} ms to get file statuses", watch.elapsed(TimeUnit.MILLISECONDS));
     watch.reset();
     watch.start();
@@ -290,25 +295,6 @@ public class Metadata {
   }
 
   /**
-   * Recursively get a list of files
-   *
-   * @param fileStatus
-   * @return
-   * @throws IOException
-   */
-  private List<FileStatus> getFileStatuses(FileStatus fileStatus) throws IOException {
-    List<FileStatus> statuses = Lists.newArrayList();
-    if (fileStatus.isDirectory()) {
-      for (FileStatus child : fs.listStatus(fileStatus.getPath(), new DrillPathFilter())) {
-        statuses.addAll(getFileStatuses(child));
-      }
-    } else {
-      statuses.add(fileStatus);
-    }
-    return statuses;
-  }
-
-  /**
    * TimedRunnable that reads the footer from parquet and collects file metadata
    */
   private class MetadataGatherer extends TimedRunnable<ParquetFileMetadata_v3> {

http://git-wip-us.apache.org/repos/asf/drill/blob/a0c178ba/exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet/ParquetFormatPlugin.java
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet/ParquetFormatPlugin.java b/exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet/ParquetFormatPlugin.java
index 0eb4665..3f331b1 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet/ParquetFormatPlugin.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet/ParquetFormatPlugin.java
@@ -1,4 +1,4 @@
-/**
+/*
  * Licensed to the Apache Software Foundation (ASF) under one
  * or more contributor license agreements.  See the NOTICE file
  * distributed with this work for additional information
@@ -41,10 +41,10 @@ import org.apache.drill.exec.store.RecordWriter;
 import org.apache.drill.exec.store.StoragePluginOptimizerRule;
 import org.apache.drill.exec.store.dfs.BasicFormatMatcher;
 import org.apache.drill.exec.store.dfs.DrillFileSystem;
-import org.apache.drill.exec.store.dfs.DrillPathFilter;
 import org.apache.drill.exec.store.dfs.FileSelection;
 import org.apache.drill.exec.store.dfs.FileSystemConfig;
 import org.apache.drill.exec.store.dfs.FileSystemPlugin;
+import org.apache.drill.exec.util.DrillFileSystemUtil;
 import org.apache.drill.exec.store.dfs.FormatMatcher;
 import org.apache.drill.exec.store.dfs.FormatPlugin;
 import org.apache.drill.exec.store.dfs.FormatSelection;
@@ -56,7 +56,6 @@ import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.fs.FileStatus;
 import org.apache.hadoop.fs.FileSystem;
 import org.apache.hadoop.fs.Path;
-import org.apache.hadoop.fs.PathFilter;
 import org.apache.parquet.format.converter.ParquetMetadataConverter;
 import org.apache.parquet.hadoop.ParquetFileWriter;
 
@@ -260,13 +259,8 @@ public class ParquetFormatPlugin implements FormatPlugin{
           if (metaDataFileExists(fs, dir)) {
             return true;
           }
-          PathFilter filter = new DrillPathFilter();
-
-          FileStatus[] files = fs.listStatus(dir.getPath(), filter);
-          if (files.length == 0) {
-            return false;
-          }
-          return super.isFileReadable(fs, files[0]);
+          List<FileStatus> statuses = DrillFileSystemUtil.listFiles(fs, dir.getPath(), false);
+          return !statuses.isEmpty() && super.isFileReadable(fs, statuses.get(0));
         }
       } catch (IOException e) {
         logger.info("Failure while attempting to check for Parquet metadata file.", e);

http://git-wip-us.apache.org/repos/asf/drill/blob/a0c178ba/exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet/ParquetGroupScan.java
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet/ParquetGroupScan.java b/exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet/ParquetGroupScan.java
index 3d9cfb3..30f607d 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet/ParquetGroupScan.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet/ParquetGroupScan.java
@@ -57,8 +57,8 @@ import org.apache.drill.exec.server.options.OptionManager;
 import org.apache.drill.exec.store.ImplicitColumnExplorer;
 import org.apache.drill.exec.store.StoragePluginRegistry;
 import org.apache.drill.exec.store.dfs.DrillFileSystem;
-import org.apache.drill.exec.store.dfs.DrillPathFilter;
 import org.apache.drill.exec.store.dfs.FileSelection;
+import org.apache.drill.exec.util.DrillFileSystemUtil;
 import org.apache.drill.exec.store.dfs.MetadataContext;
 import org.apache.drill.exec.store.dfs.MetadataContext.PruneStatus;
 import org.apache.drill.exec.store.dfs.ReadEntryFromHDFS;
@@ -743,7 +743,7 @@ public class ParquetGroupScan extends AbstractFileGroupScan {
       } else {
         final List<FileStatus> fileStatuses = Lists.newArrayList();
         for (ReadEntryWithPath entry : entries) {
-          getFiles(entry.getPath(), fileStatuses);
+          fileStatuses.addAll(DrillFileSystemUtil.listFiles(fs, Path.getPathWithoutSchemeAndAuthority(new Path(entry.getPath())), true));
         }
         parquetTableMetadata = Metadata.getParquetTableMetadata(fs, fileStatuses, formatConfig);
       }
@@ -857,18 +857,6 @@ public class ParquetGroupScan extends AbstractFileGroupScan {
     return this.endpointAffinities;
   }
 
-  private void getFiles(String path, List<FileStatus> fileStatuses) throws IOException {
-    Path p = Path.getPathWithoutSchemeAndAuthority(new Path(path));
-    FileStatus fileStatus = fs.getFileStatus(p);
-    if (fileStatus.isDirectory()) {
-      for (FileStatus f : fs.listStatus(p, new DrillPathFilter())) {
-        getFiles(f.getPath().toString(), fileStatuses);
-      }
-    } else {
-      fileStatuses.add(fileStatus);
-    }
-  }
-
   @Override
   public void applyAssignments(List<DrillbitEndpoint> incomingEndpoints) throws PhysicalOperatorSetupException {
 

http://git-wip-us.apache.org/repos/asf/drill/blob/a0c178ba/exec/java-exec/src/main/java/org/apache/drill/exec/store/sys/store/LocalPersistentStore.java
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/store/sys/store/LocalPersistentStore.java b/exec/java-exec/src/main/java/org/apache/drill/exec/store/sys/store/LocalPersistentStore.java
index dc4c414..320a864 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/store/sys/store/LocalPersistentStore.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/store/sys/store/LocalPersistentStore.java
@@ -39,6 +39,7 @@ import org.apache.drill.common.concurrent.AutoCloseableLock;
 import org.apache.drill.common.config.DrillConfig;
 import org.apache.drill.exec.exception.VersionMismatchException;
 import org.apache.drill.exec.store.dfs.DrillFileSystem;
+import org.apache.drill.exec.util.DrillFileSystemUtil;
 import org.apache.drill.exec.store.sys.BasePersistentStore;
 import org.apache.drill.exec.store.sys.PersistentStoreConfig;
 import org.apache.drill.exec.store.sys.PersistentStoreMode;
@@ -51,6 +52,7 @@ import com.google.common.base.Function;
 import com.google.common.base.Preconditions;
 import com.google.common.collect.Iterables;
 import com.google.common.collect.Lists;
+import org.apache.hadoop.fs.PathFilter;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
@@ -114,17 +116,22 @@ public class LocalPersistentStore<V> extends BasePersistentStore<V> {
   public Iterator<Map.Entry<String, V>> getRange(int skip, int take) {
     try (AutoCloseableLock lock = readLock.open()) {
       try {
-        List<FileStatus> f = fs.list(false, basePath);
-        if (f == null || f.isEmpty()) {
+        // list only files with sys file suffix
+        PathFilter sysFileSuffixFilter = new PathFilter() {
+          @Override
+          public boolean accept(Path path) {
+            return path.getName().endsWith(DRILL_SYS_FILE_SUFFIX);
+          }
+        };
+        List<FileStatus> fileStatuses = DrillFileSystemUtil.listFiles(fs, basePath, false, sysFileSuffixFilter);
+        if (fileStatuses.isEmpty()) {
           return Collections.emptyIterator();
         }
-        List<String> files = Lists.newArrayList();
 
-        for (FileStatus stat : f) {
+        List<String> files = Lists.newArrayList();
+        for (FileStatus stat : fileStatuses) {
           String s = stat.getPath().getName();
-          if (s.endsWith(DRILL_SYS_FILE_SUFFIX)) {
-            files.add(s.substring(0, s.length() - DRILL_SYS_FILE_SUFFIX.length()));
-          }
+          files.add(s.substring(0, s.length() - DRILL_SYS_FILE_SUFFIX.length()));
         }
 
         Collections.sort(files);

http://git-wip-us.apache.org/repos/asf/drill/blob/a0c178ba/exec/java-exec/src/main/java/org/apache/drill/exec/util/DrillFileSystemUtil.java
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/util/DrillFileSystemUtil.java b/exec/java-exec/src/main/java/org/apache/drill/exec/util/DrillFileSystemUtil.java
new file mode 100644
index 0000000..56d9385
--- /dev/null
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/util/DrillFileSystemUtil.java
@@ -0,0 +1,91 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ * <p/>
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * <p/>
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.drill.exec.util;
+
+import org.apache.drill.exec.store.dfs.DrillFileSystem;
+import org.apache.hadoop.fs.FileStatus;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.fs.PathFilter;
+
+import java.io.IOException;
+import java.util.List;
+
+/**
+ * In Drill file system all directories and files that start with dot or underscore is ignored.
+ * This helper class that delegates all work to list directory and file statuses to {@link org.apache.drill.exec.util.FileSystemUtil} class,
+ * only adding Drill file system filter first.
+ */
+public class DrillFileSystemUtil {
+
+  /**
+   * Path filter that skips all files and folders that start with dot or underscore.
+   */
+  public static final PathFilter DRILL_SYSTEM_FILTER = new PathFilter() {
+    @Override
+    public boolean accept(Path path) {
+      return !path.getName().startsWith(DrillFileSystem.UNDERSCORE_PREFIX) && !path.getName().startsWith(DrillFileSystem.DOT_PREFIX);
+    }
+  };
+
+  /**
+   * Returns statuses of all directories present in given path applying custom filters if present.
+   * Directories that start with dot or underscore are skipped.
+   * Will also include nested directories if recursive flag is set to true.
+   *
+   * @param fs current file system
+   * @param path path to directory
+   * @param recursive true if nested directories should be included
+   * @param filters list of custom filters (optional)
+   * @return list of matching directory statuses
+   */
+  public static List<FileStatus> listDirectories(final FileSystem fs, Path path, boolean recursive, PathFilter... filters) throws IOException {
+    return FileSystemUtil.listDirectories(fs, path, recursive, FileSystemUtil.mergeFilters(DRILL_SYSTEM_FILTER, filters));
+  }
+
+  /**
+   * Returns statuses of all files present in given path applying custom filters if present.
+   * Files and nested directories that start with dot or underscore are skipped.
+   * Will also include files from nested directories if recursive flag is set to true.
+   *
+   * @param fs current file system
+   * @param path path to file or directory
+   * @param recursive true if files in nested directories should be included
+   * @param filters list of custom filters (optional)
+   * @return list of matching file statuses
+   */
+  public static List<FileStatus> listFiles(final FileSystem fs, Path path, boolean recursive, PathFilter... filters) throws IOException {
+    return FileSystemUtil.listFiles(fs, path, recursive, FileSystemUtil.mergeFilters(DRILL_SYSTEM_FILTER, filters));
+  }
+
+  /**
+   * Returns statuses of all directories and files present in given path applying custom filters if present.
+   * Directories and files that start with dot or underscore are skipped.
+   * Will also include nested directories and their files if recursive flag is set to true.
+   *
+   * @param fs current file system
+   * @param path path to file or directory
+   * @param recursive true if nested directories and their files should be included
+   * @param filters list of custom filters (optional)
+   * @return list of matching directory and file statuses
+   */
+  public static List<FileStatus> listAll(FileSystem fs, Path path, boolean recursive, PathFilter... filters) throws IOException {
+    return FileSystemUtil.listAll(fs, path, recursive, FileSystemUtil.mergeFilters(DRILL_SYSTEM_FILTER, filters));
+  }
+
+}

http://git-wip-us.apache.org/repos/asf/drill/blob/a0c178ba/exec/java-exec/src/main/java/org/apache/drill/exec/util/FileSystemUtil.java
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/util/FileSystemUtil.java b/exec/java-exec/src/main/java/org/apache/drill/exec/util/FileSystemUtil.java
new file mode 100644
index 0000000..84b22b6
--- /dev/null
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/util/FileSystemUtil.java
@@ -0,0 +1,207 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ * <p/>
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * <p/>
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.drill.exec.util;
+
+import org.apache.hadoop.fs.FileStatus;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.fs.PathFilter;
+
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.List;
+
+/**
+ * Helper class that provides methods to list directories or file or both statuses.
+ * Can list statuses recursive and apply custom filters.
+ */
+public class FileSystemUtil {
+
+  /**
+   * Filter that will accept all files and directories.
+   */
+  public static final PathFilter DUMMY_FILTER = new PathFilter() {
+    @Override
+    public boolean accept(Path path) {
+      return true;
+    }
+  };
+
+  /**
+   * Returns statuses of all directories present in given path applying custom filters if present.
+   * Will also include nested directories if recursive flag is set to true.
+   *
+   * @param fs current file system
+   * @param path path to directory
+   * @param recursive true if nested directories should be included
+   * @param filters list of custom filters (optional)
+   * @return list of matching directory statuses
+   */
+  public static List<FileStatus> listDirectories(final FileSystem fs, Path path, boolean recursive, PathFilter... filters) throws IOException {
+    List<FileStatus> statuses = new ArrayList<>();
+    listDirectories(fs, path, recursive, statuses, mergeFilters(filters));
+    return statuses;
+  }
+
+  /**
+   * Returns statuses of all files present in given path applying custom filters if present.
+   * Will also include files from nested directories if recursive flag is set to true.
+   *
+   * @param fs current file system
+   * @param path path to file or directory
+   * @param recursive true if files in nested directories should be included
+   * @param filters list of custom filters (optional)
+   * @return list of matching file statuses
+   */
+  public static List<FileStatus> listFiles(FileSystem fs, Path path, boolean recursive, PathFilter... filters) throws IOException {
+    List<FileStatus> statuses = new ArrayList<>();
+    listFiles(fs, path, recursive, statuses, mergeFilters(filters));
+    return statuses;
+  }
+
+  /**
+   * Returns statuses of all directories and files present in given path applying custom filters if present.
+   * Will also include nested directories and their files if recursive flag is set to true.
+   *
+   * @param fs current file system
+   * @param path path to file or directory
+   * @param recursive true if nested directories and their files should be included
+   * @param filters list of custom filters (optional)
+   * @return list of matching directory and file statuses
+   */
+  public static List<FileStatus> listAll(FileSystem fs, Path path, boolean recursive, PathFilter... filters) throws IOException {
+    List<FileStatus> statuses = new ArrayList<>();
+    listAll(fs, path, recursive, statuses, mergeFilters(filters));
+    return statuses;
+  }
+
+  /**
+   * Merges given filter with array of filters.
+   * If array of filters is null or empty, will return given filter.
+   *
+   * @param filter given filter
+   * @param filters array of filters
+   * @return one filter that combines all given filters
+   */
+  public static PathFilter mergeFilters(PathFilter filter, PathFilter[] filters) {
+    if (filters == null || filters.length == 0) {
+      return filter;
+    }
+
+    int length = filters.length;
+    PathFilter[] newFilters = Arrays.copyOf(filters, length + 1);
+    newFilters[length] = filter;
+    return mergeFilters(newFilters);
+  }
+
+  /**
+   * Will merge given array of filters into one.
+   * If given array of filters is empty, will return {@link #DUMMY_FILTER}.
+   *
+   * @param filters array of filters
+   * @return one filter that combines all given filters
+   */
+  public static PathFilter mergeFilters(final PathFilter... filters) {
+    if (filters.length == 0) {
+      return DUMMY_FILTER;
+    }
+
+    return new PathFilter() {
+      @Override
+      public boolean accept(Path path) {
+        for (PathFilter filter : filters) {
+          if (!filter.accept(path)) {
+            return false;
+          }
+        }
+        return true;
+      }
+    };
+  }
+
+  /**
+   * Helper method that will store in given holder statuses of all directories present in given path applying custom filter.
+   * If recursive flag is set to true, will call itself recursively to add statuses of nested directories.
+   *
+   * @param fs current file system
+   * @param path path to directory
+   * @param recursive true if nested directories should be included
+   * @param statuses holder for directory statuses
+   * @param filter custom filter
+   * @return holder with all matching directory statuses
+   */
+  private static List<FileStatus> listDirectories(FileSystem fs, Path path, boolean recursive, List<FileStatus> statuses, PathFilter filter) throws IOException {
+    FileStatus[] fileStatuses = fs.listStatus(path, filter);
+    for (FileStatus status: fileStatuses) {
+      if (status.isDirectory()) {
+        statuses.add(status);
+        if (recursive) {
+          listDirectories(fs, status.getPath(), true, statuses, filter);
+        }
+      }
+    }
+    return statuses;
+  }
+
+  /**
+   * Helper method that will store in given holder statuses of all files present in given path applying custom filter.
+   * If recursive flag is set to true, will call itself recursively to add file statuses from nested directories.
+   *
+   * @param fs current file system
+   * @param path path to file or directory
+   * @param recursive true if files in nested directories should be included
+   * @param statuses holder for file statuses
+   * @param filter custom filter
+   * @return holder with all matching file statuses
+   */
+  private static List<FileStatus> listFiles(FileSystem fs, Path path, boolean recursive, List<FileStatus> statuses, PathFilter filter) throws IOException {
+    FileStatus[] fileStatuses = fs.listStatus(path, filter);
+    for (FileStatus status: fileStatuses) {
+      if (status.isDirectory()) {
+        if (recursive) {
+          listFiles(fs, status.getPath(), true, statuses, filter);
+        }
+      } else {
+        statuses.add(status);
+      }
+    }
+    return statuses;
+  }
+
+  /**
+   * Helper method that will store in given holder statuses of all directories and files present in given path applying custom filter.
+   * If recursive flag is set to true, will call itself recursively to add nested directories and their file statuses.
+   *
+   * @param fs current file system
+   * @param path path to file or directory
+   * @param recursive true if nested directories and their files should be included
+   * @param statuses holder for directory and file statuses
+   * @param filter custom filter
+   * @return holder with all matching directory and file statuses
+   */
+  private static List<FileStatus> listAll(FileSystem fs, Path path, boolean recursive, List<FileStatus> statuses, PathFilter filter) throws IOException {
+    for (FileStatus status: fs.listStatus(path, filter)) {
+      statuses.add(status);
+      if (status.isDirectory() && recursive) {
+        listAll(fs, status.getPath(), true, statuses, filter);
+      }
+    }
+    return statuses;
+  }
+}

http://git-wip-us.apache.org/repos/asf/drill/blob/a0c178ba/exec/java-exec/src/test/java/org/apache/drill/exec/planner/TestDirectoryExplorerUDFs.java
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/test/java/org/apache/drill/exec/planner/TestDirectoryExplorerUDFs.java b/exec/java-exec/src/test/java/org/apache/drill/exec/planner/TestDirectoryExplorerUDFs.java
index a5916a5..4458d58 100644
--- a/exec/java-exec/src/test/java/org/apache/drill/exec/planner/TestDirectoryExplorerUDFs.java
+++ b/exec/java-exec/src/test/java/org/apache/drill/exec/planner/TestDirectoryExplorerUDFs.java
@@ -1,4 +1,4 @@
-/*******************************************************************************
+/*
  * Licensed to the Apache Software Foundation (ASF) under one
  * or more contributor license agreements.  See the NOTICE file
  * distributed with this work for additional information
@@ -14,20 +14,26 @@
  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
  * See the License for the specific language governing permissions and
  * limitations under the License.
- ******************************************************************************/
+*/
 package org.apache.drill.exec.planner;
 
+import java.io.IOException;
 import java.util.List;
 import java.util.Map;
 
 import com.google.common.collect.ImmutableMap;
 import org.apache.drill.PlanTestBase;
 import org.apache.drill.common.exceptions.UserRemoteException;
+import org.apache.drill.common.util.TestTools;
 import org.apache.drill.exec.fn.interp.TestConstantFolding;
 import org.apache.drill.exec.store.StoragePluginRegistry;
 import org.apache.drill.exec.util.JsonStringArrayList;
 import org.apache.drill.exec.util.TestUtilities;
 import org.apache.drill.exec.util.Text;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.FileUtil;
+import org.apache.hadoop.fs.Path;
 import org.junit.Before;
 import org.junit.BeforeClass;
 import org.junit.Rule;
@@ -188,4 +194,64 @@ public class TestDirectoryExplorerUDFs extends PlanTestBase {
     }
   }
 
+  @Test // DRILL-4720
+  public void testDirectoryUDFsWithAndWithoutMetadataCache() throws Exception {
+    FileSystem fs = null;
+    try {
+      fs = FileSystem.get(new Configuration());
+
+      // prepare test table with partitions
+      Path table = new Path(getTempDir("table_with_partitions"));
+      String tablePath = table.toUri().getPath();
+      Path dataFile = new Path(TestTools.getWorkingPath(),"src/test/resources/parquet/alltypes_required.parquet");
+      createPartitions(fs, table, dataFile, 2);
+
+      Map<String, String> configurations = ImmutableMap.<String, String>builder()
+          .put("mindir", "part_1")
+          .put("imindir", "part_1")
+          .put("maxdir", "part_2")
+          .put("imaxdir", "part_2")
+          .build();
+
+      String query = "select dir0 from dfs.`%s` where dir0 = %s('dfs', '%s') limit 1";
+
+      // run tests without metadata cache
+      for (Map.Entry<String, String> entry : configurations.entrySet()) {
+        testBuilder()
+            .sqlQuery(query, tablePath, entry.getKey(), tablePath)
+            .unOrdered()
+            .baselineColumns("dir0")
+            .baselineValues(entry.getValue())
+            .go()
+        ;
+      }
+
+      // generate metadata
+      test("refresh table metadata dfs.`%s`", tablePath);
+
+      // run tests with metadata cache
+      for (Map.Entry<String, String> entry : configurations.entrySet()) {
+        testBuilder()
+            .sqlQuery(query, tablePath, entry.getKey(), tablePath)
+            .unOrdered()
+            .baselineColumns("dir0")
+            .baselineValues(entry.getValue())
+            .go();
+      }
+
+    } finally {
+      if (fs != null) {
+        fs.close();
+      }
+    }
+  }
+
+  private void createPartitions(FileSystem fs, Path table, Path dataFile, int number) throws IOException {
+    for (int i = 1; i <= number; i++) {
+      Path partition = new Path(table, "part_" + i);
+      fs.mkdirs(partition);
+      FileUtil.copy(fs, dataFile, fs, partition, false, true, fs.getConf());
+    }
+  }
+
 }

http://git-wip-us.apache.org/repos/asf/drill/blob/a0c178ba/exec/java-exec/src/test/java/org/apache/drill/exec/util/DrillFileSystemUtilTest.java
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/test/java/org/apache/drill/exec/util/DrillFileSystemUtilTest.java b/exec/java-exec/src/test/java/org/apache/drill/exec/util/DrillFileSystemUtilTest.java
new file mode 100644
index 0000000..e26c5c6
--- /dev/null
+++ b/exec/java-exec/src/test/java/org/apache/drill/exec/util/DrillFileSystemUtilTest.java
@@ -0,0 +1,158 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to you under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.drill.exec.util;
+
+import org.apache.hadoop.fs.FileStatus;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.fs.PathFilter;
+import org.junit.Test;
+
+import java.io.IOException;
+import java.util.Collections;
+import java.util.List;
+
+import static org.junit.Assert.assertEquals;
+
+public class DrillFileSystemUtilTest extends FileSystemUtilTestBase {
+
+  @Test
+  public void testListDirectoriesWithoutFilter() throws IOException {
+    List<FileStatus> statuses = DrillFileSystemUtil.listDirectories(fs, base, false);
+    assertEquals("Directory count should match", 2, statuses.size());
+  }
+
+  @Test
+  public void testListDirectoriesWithFilter() throws IOException {
+    List<FileStatus> statuses = DrillFileSystemUtil.listDirectories(fs, base, false, new PathFilter() {
+      @Override
+      public boolean accept(Path path) {
+        return path.getName().endsWith("a");
+      }
+    });
+    assertEquals("Directory count should match", 1, statuses.size());
+    assertEquals("Directory name should match", "a", statuses.get(0).getPath().getName());
+  }
+
+  @Test
+  public void testListDirectoriesRecursiveWithoutFilter() throws IOException {
+    List<FileStatus> statuses = DrillFileSystemUtil.listDirectories(fs, base, true);
+    assertEquals("Directory count should match", 3, statuses.size());
+  }
+
+  @Test
+  public void testListDirectoriesRecursiveWithFilter() throws IOException {
+    List<FileStatus> statuses = DrillFileSystemUtil.listDirectories(fs, base, true, new PathFilter() {
+      @Override
+      public boolean accept(Path path) {
+        return path.getName().endsWith("a");
+      }
+    });
+    assertEquals("Directory count should match", 2, statuses.size());
+
+    Collections.sort(statuses);
+    assertEquals("Directory name should match", "a", statuses.get(0).getPath().getName());
+    assertEquals("Directory name should match", "aa", statuses.get(1).getPath().getName());
+  }
+
+  @Test
+  public void testListFilesWithoutFilter() throws IOException {
+    List<FileStatus> statuses = DrillFileSystemUtil.listFiles(fs, new Path(base, "a"), false);
+    assertEquals("File count should match", 1, statuses.size());
+    assertEquals("File name should match", "f.txt", statuses.get(0).getPath().getName());
+  }
+
+  @Test
+  public void testListFilesWithFilter() throws IOException {
+    List<FileStatus> statuses = DrillFileSystemUtil.listFiles(fs, new Path(base, "a"), false, new PathFilter() {
+      @Override
+      public boolean accept(Path path) {
+        return path.getName().endsWith(".txt");
+      }
+    });
+    assertEquals("File count should match", 1, statuses.size());
+    assertEquals("File name should match", "f.txt", statuses.get(0).getPath().getName());
+  }
+
+  @Test
+  public void testListFilesRecursiveWithoutFilter() throws IOException {
+    List<FileStatus> statuses = DrillFileSystemUtil.listFiles(fs, base, true);
+    assertEquals("File count should match", 3, statuses.size());
+  }
+
+  @Test
+  public void testListFilesRecursiveWithFilter() throws IOException {
+    List<FileStatus> statuses = DrillFileSystemUtil.listFiles(fs, base, true, new PathFilter() {
+      @Override
+      public boolean accept(Path path) {
+        return path.getName().endsWith("a") || path.getName().endsWith(".txt");
+      }
+    });
+    assertEquals("File count should match", 2, statuses.size());
+
+    Collections.sort(statuses);
+    assertEquals("File name should match", "f.txt", statuses.get(0).getPath().getName());
+    assertEquals("File name should match", "f.txt", statuses.get(1).getPath().getName());
+  }
+
+  @Test
+  public void testListAllWithoutFilter() throws IOException {
+    List<FileStatus> statuses = DrillFileSystemUtil.listAll(fs, new Path(base, "a"), false);
+    assertEquals("File count should match", 2, statuses.size());
+
+    Collections.sort(statuses);
+    assertEquals("File name should match", "aa", statuses.get(0).getPath().getName());
+    assertEquals("File name should match", "f.txt", statuses.get(1).getPath().getName());
+  }
+
+  @Test
+  public void testListAllWithFilter() throws IOException {
+    List<FileStatus> statuses = DrillFileSystemUtil.listAll(fs, new Path(base, "a"), false, new PathFilter() {
+      @Override
+      public boolean accept(Path path) {
+        return path.getName().endsWith("a") || path.getName().endsWith(".txt");
+      }
+    });
+    assertEquals("Directory and file count should match", 2, statuses.size());
+
+    Collections.sort(statuses);
+    assertEquals("Directory name should match", "aa", statuses.get(0).getPath().getName());
+    assertEquals("File name should match", "f.txt", statuses.get(1).getPath().getName());
+  }
+
+  @Test
+  public void testListAllRecursiveWithoutFilter() throws IOException {
+    List<FileStatus> statuses = DrillFileSystemUtil.listAll(fs, base, true);
+    assertEquals("Directory and file count should match", 6, statuses.size());
+  }
+
+  @Test
+  public void testListAllRecursiveWithFilter() throws IOException {
+    List<FileStatus> statuses = DrillFileSystemUtil.listAll(fs, new Path(base, "a"), true, new PathFilter() {
+      @Override
+      public boolean accept(Path path) {
+        return path.getName().startsWith("a") || path.getName().endsWith(".txt");
+      }
+    });
+    assertEquals("Directory and file count should match", 3, statuses.size());
+
+    Collections.sort(statuses);
+    assertEquals("Directory name should match", "aa", statuses.get(0).getPath().getName());
+    assertEquals("File name should match", "f.txt", statuses.get(1).getPath().getName());
+    assertEquals("File name should match", "f.txt", statuses.get(2).getPath().getName());
+  }
+
+}

http://git-wip-us.apache.org/repos/asf/drill/blob/a0c178ba/exec/java-exec/src/test/java/org/apache/drill/exec/util/FileSystemUtilTest.java
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/test/java/org/apache/drill/exec/util/FileSystemUtilTest.java b/exec/java-exec/src/test/java/org/apache/drill/exec/util/FileSystemUtilTest.java
new file mode 100644
index 0000000..47883e4
--- /dev/null
+++ b/exec/java-exec/src/test/java/org/apache/drill/exec/util/FileSystemUtilTest.java
@@ -0,0 +1,240 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ * <p/>
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * <p/>
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.drill.exec.util;
+
+import org.apache.hadoop.fs.FileStatus;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.fs.PathFilter;
+import org.junit.Test;
+
+import java.io.IOException;
+import java.util.Collections;
+import java.util.List;
+
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertFalse;
+import static org.junit.Assert.assertTrue;
+
+public class FileSystemUtilTest extends FileSystemUtilTestBase {
+
+  @Test
+  public void testListDirectoriesWithoutFilter() throws IOException {
+    List<FileStatus> statuses = FileSystemUtil.listDirectories(fs, base, false);
+    assertEquals("Directory count should match", 4, statuses.size());
+  }
+
+  @Test
+  public void testListDirectoriesWithFilter() throws IOException {
+    List<FileStatus> statuses = FileSystemUtil.listDirectories(fs, base, false, new PathFilter() {
+      @Override
+      public boolean accept(Path path) {
+        return path.getName().endsWith("a");
+      }
+    });
+    assertEquals("Directory count should match", 3, statuses.size());
+
+    Collections.sort(statuses);
+    assertEquals("Directory name should match", ".a", statuses.get(0).getPath().getName());
+    assertEquals("Directory name should match", "_a", statuses.get(1).getPath().getName());
+    assertEquals("Directory name should match", "a", statuses.get(2).getPath().getName());
+  }
+
+  @Test
+  public void testListDirectoriesRecursiveWithoutFilter() throws IOException {
+    List<FileStatus> statuses = FileSystemUtil.listDirectories(fs, base, true);
+    assertEquals("Directory count should match", 5, statuses.size());
+  }
+
+  @Test
+  public void testListDirectoriesRecursiveWithFilter() throws IOException {
+    List<FileStatus> statuses = FileSystemUtil.listDirectories(fs, base, true, new PathFilter() {
+      @Override
+      public boolean accept(Path path) {
+        return path.getName().endsWith("a");
+      }
+    });
+    assertEquals("Directory count should match", 4, statuses.size());
+
+    Collections.sort(statuses);
+    assertEquals("Directory name should match", ".a", statuses.get(0).getPath().getName());
+    assertEquals("Directory name should match", "_a", statuses.get(1).getPath().getName());
+    assertEquals("Directory name should match", "a", statuses.get(2).getPath().getName());
+    assertEquals("Directory name should match", "aa", statuses.get(3).getPath().getName());
+  }
+
+  @Test
+  public void testListDirectoriesEmptyResult() throws IOException {
+    List<FileStatus> statuses = FileSystemUtil.listDirectories(fs, base, false, new PathFilter() {
+      @Override
+      public boolean accept(Path path) {
+        return path.getName().startsWith("abc");
+      }
+    });
+    assertEquals("Directory count should match", 0, statuses.size());
+  }
+
+  @Test
+  public void testListFilesWithoutFilter() throws IOException {
+    List<FileStatus> statuses = FileSystemUtil.listFiles(fs, new Path(base, "a"), false);
+    assertEquals("File count should match", 3, statuses.size());
+  }
+
+  @Test
+  public void testListFilesWithFilter() throws IOException {
+    List<FileStatus> statuses = FileSystemUtil.listFiles(fs, new Path(base, "a"), false, new PathFilter() {
+      @Override
+      public boolean accept(Path path) {
+        return path.getName().endsWith(".txt");
+      }
+    });
+    assertEquals("File count should match", 3, statuses.size());
+
+    Collections.sort(statuses);
+    assertEquals("File name should match", ".f.txt", statuses.get(0).getPath().getName());
+    assertEquals("File name should match", "_f.txt", statuses.get(1).getPath().getName());
+    assertEquals("File name should match", "f.txt", statuses.get(2).getPath().getName());
+  }
+
+  @Test
+  public void testListFilesRecursiveWithoutFilter() throws IOException {
+    List<FileStatus> statuses = FileSystemUtil.listFiles(fs, base, true);
+    assertEquals("File count should match", 11, statuses.size());
+  }
+
+  @Test
+  public void testListFilesRecursiveWithFilter() throws IOException {
+    List<FileStatus> statuses = FileSystemUtil.listFiles(fs, base, true, new PathFilter() {
+      @Override
+      public boolean accept(Path path) {
+        return path.getName().endsWith("a") || path.getName().endsWith(".txt");
+      }
+    });
+
+    assertEquals("File count should match", 8, statuses.size());
+  }
+
+  @Test
+  public void testListFilesEmptyResult() throws IOException {
+    List<FileStatus> statuses = FileSystemUtil.listFiles(fs, base, false);
+    assertEquals("File count should match", 0, statuses.size());
+  }
+
+  @Test
+  public void testListAllWithoutFilter() throws IOException {
+    List<FileStatus> statuses = FileSystemUtil.listAll(fs, new Path(base, "a"), false);
+    assertEquals("Directory and file count should match", 4, statuses.size());
+  }
+
+  @Test
+  public void testListAllWithFilter() throws IOException {
+    List<FileStatus> statuses = FileSystemUtil.listAll(fs, new Path(base, "a"), false, new PathFilter() {
+      @Override
+      public boolean accept(Path path) {
+        return path.getName().endsWith("a") || path.getName().endsWith(".txt");
+      }
+    });
+    assertEquals("Directory and file count should match", 4, statuses.size());
+  }
+
+  @Test
+  public void testListAllRecursiveWithoutFilter() throws IOException {
+    List<FileStatus> statuses = FileSystemUtil.listAll(fs, new Path(base, "a"), true);
+    assertEquals("Directory and file count should match", 7, statuses.size());
+  }
+
+  @Test
+  public void testListAllRecursiveWithFilter() throws IOException {
+    List<FileStatus> statuses = FileSystemUtil.listAll(fs, new Path(base, "a"), true, new PathFilter() {
+      @Override
+      public boolean accept(Path path) {
+        return path.getName().endsWith("a") || path.getName().endsWith(".txt");
+      }
+    });
+    assertEquals("Directory and file count should match", 7, statuses.size());
+  }
+
+  @Test
+  public void testListAllEmptyResult() throws IOException {
+    List<FileStatus> statuses = FileSystemUtil.listAll(fs, base, false, new PathFilter() {
+      @Override
+      public boolean accept(Path path) {
+        return path.getName().startsWith("xyz");
+      }
+    });
+    assertEquals("Directory and file count should match", 0, statuses.size());
+  }
+
+  @Test
+  public void testMergeFiltersWithMissingParameters() {
+    PathFilter filter = new PathFilter() {
+      @Override
+      public boolean accept(Path path) {
+        return path.getName().startsWith("a");
+      }
+    };
+
+    assertEquals("Should have returned initial filter", filter, FileSystemUtil.mergeFilters(filter, null));
+    assertEquals("Should have returned initial filter", filter, FileSystemUtil.mergeFilters(filter, new PathFilter[]{}));
+    assertEquals("Should have returned dummy filter", FileSystemUtil.DUMMY_FILTER, FileSystemUtil.mergeFilters());
+  }
+
+  @Test
+  public void mergeFiltersTrue() {
+    Path file = new Path("abc.txt");
+
+    PathFilter firstFilter = new PathFilter() {
+      @Override
+      public boolean accept(Path path) {
+        return path.getName().startsWith("a");
+      }
+    };
+
+    PathFilter secondFilter = new PathFilter() {
+      @Override
+      public boolean accept(Path path) {
+        return path.getName().endsWith(".txt");
+      }
+    };
+
+    assertTrue("Path should have been included in the path list", FileSystemUtil.mergeFilters(firstFilter, secondFilter).accept(file));
+    assertTrue("Path should have been included in the path list", FileSystemUtil.mergeFilters(firstFilter, new PathFilter[] {secondFilter}).accept(file));
+  }
+
+  @Test
+  public void mergeFiltersFalse() {
+    Path file = new Path("abc.txt");
+
+    PathFilter firstFilter = new PathFilter() {
+      @Override
+      public boolean accept(Path path) {
+        return path.getName().startsWith("a");
+      }
+    };
+
+    PathFilter secondFilter = new PathFilter() {
+      @Override
+      public boolean accept(Path path) {
+        return path.getName().endsWith(".csv");
+      }
+    };
+
+    assertFalse("Path should have been excluded from the path list", FileSystemUtil.mergeFilters(firstFilter, secondFilter).accept(file));
+    assertFalse("Path should have been excluded from the path list", FileSystemUtil.mergeFilters(firstFilter, new PathFilter[] {secondFilter}).accept(file));
+  }
+
+}

http://git-wip-us.apache.org/repos/asf/drill/blob/a0c178ba/exec/java-exec/src/test/java/org/apache/drill/exec/util/FileSystemUtilTestBase.java
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/test/java/org/apache/drill/exec/util/FileSystemUtilTestBase.java b/exec/java-exec/src/test/java/org/apache/drill/exec/util/FileSystemUtilTestBase.java
new file mode 100644
index 0000000..1df25ee
--- /dev/null
+++ b/exec/java-exec/src/test/java/org/apache/drill/exec/util/FileSystemUtilTestBase.java
@@ -0,0 +1,112 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to you under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.drill.exec.util;
+
+import com.google.common.base.Strings;
+import com.google.common.io.Files;
+import org.apache.commons.io.FileUtils;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.Path;
+import org.junit.AfterClass;
+import org.junit.BeforeClass;
+
+import java.io.File;
+import java.io.IOException;
+import java.util.Arrays;
+
+/**
+ * Base test class for file system util classes that will during test initialization
+ * setup file system connection and create directories and files needed for unit tests.
+ */
+public class FileSystemUtilTestBase {
+
+  /*
+    Directory and file structure created during test initialization:
+    ../a
+    ../a/f.txt
+    ../a/.f.txt
+    ../a/_f.txt
+
+    ../a/aa
+    ../a/aa/f.txt
+    ../a/aa/.f.txt
+    ../a/aa/_f.txt
+
+    ../b
+    ../b/f.txt
+    ../b/.f.txt
+    ../b/_f.txt
+
+    ../.a
+    ../.a/f.txt
+
+    ../_a
+    ../_a/f.txt
+  */
+  protected static FileSystem fs;
+  protected static Path base;
+
+  @BeforeClass
+  public static void setup() throws IOException {
+    // initialize file system
+    fs = FileSystem.get(new Configuration());
+
+    // create temporary directory with sub-folders and files
+    final File tempDir = Files.createTempDir();
+    Runtime.getRuntime().addShutdownHook(new Thread() {
+      @Override
+      public void run() {
+        FileUtils.deleteQuietly(tempDir);
+      }
+    });
+    base = new Path(tempDir.toURI().getPath());
+
+    createDefaultStructure(fs, base, "a", 2);
+    createDefaultStructure(fs, base, "b", 1);
+
+    // create hidden directory with file
+    Path hiddenDirectory = new Path(base, ".a");
+    fs.mkdirs(hiddenDirectory);
+    fs.createNewFile(new Path(hiddenDirectory, "f.txt"));
+
+    // create underscore directory with file
+    Path underscoreDirectory = new Path(base, "_a");
+    fs.mkdirs(underscoreDirectory);
+    fs.createNewFile(new Path(underscoreDirectory, "f.txt"));
+  }
+
+  @AfterClass
+  public static void tearDown() throws Exception {
+    if (fs != null) {
+      fs.close();
+    }
+  }
+
+  private static void createDefaultStructure(FileSystem fs, Path base, String name, int nesting) throws IOException {
+    Path newBase = base;
+    for (int i = 1; i <= nesting; i++) {
+      Path path = new Path(newBase, Strings.repeat(name, i));
+      fs.mkdirs(path);
+      for (String fileName : Arrays.asList("f.txt", ".f.txt", "_f.txt")) {
+        fs.createNewFile(new Path(path, fileName));
+      }
+      newBase = path;
+    }
+  }
+
+}