You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@drill.apache.org by am...@apache.org on 2016/08/17 20:14:14 UTC

drill git commit: DRILL-4846: Fix a few performance issues for metadata access:

Repository: drill
Updated Branches:
  refs/heads/master 0a4c21cc1 -> 57dc9f43b


DRILL-4846: Fix a few performance issues for metadata access:

 - Create a MetadataContext that can be shared among multiple invocations of the Metadata APIs.
 - Check directory modification time only if not previously checked.
 - Remove a redundant call for metadata read.
 - Added more logging.
 - Consolidate couple of metadata methods.

close apache/drill#569


Project: http://git-wip-us.apache.org/repos/asf/drill/repo
Commit: http://git-wip-us.apache.org/repos/asf/drill/commit/57dc9f43
Tree: http://git-wip-us.apache.org/repos/asf/drill/tree/57dc9f43
Diff: http://git-wip-us.apache.org/repos/asf/drill/diff/57dc9f43

Branch: refs/heads/master
Commit: 57dc9f43b6c578c1f90fb7f9e2495b184ab17b21
Parents: 0a4c21c
Author: Aman Sinha <as...@maprtech.com>
Authored: Wed Aug 3 09:00:51 2016 -0700
Committer: Aman Sinha <as...@maprtech.com>
Committed: Wed Aug 17 12:04:00 2016 -0700

----------------------------------------------------------------------
 .../planner/AbstractPartitionDescriptor.java    |   3 +-
 .../planner/FileSystemPartitionDescriptor.java  |  11 +-
 .../planner/ParquetPartitionDescriptor.java     |  10 +-
 .../drill/exec/planner/PartitionDescriptor.java |   4 +-
 .../logical/partition/PruneScanRule.java        |  24 +++-
 .../drill/exec/store/dfs/FileSelection.java     |  21 ++-
 .../drill/exec/store/dfs/MetadataContext.java   |  60 +++++++++
 .../drill/exec/store/parquet/Metadata.java      | 127 +++++++++++--------
 .../exec/store/parquet/ParquetFormatPlugin.java |   8 +-
 .../exec/store/parquet/ParquetGroupScan.java    |  18 +--
 10 files changed, 202 insertions(+), 84 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/drill/blob/57dc9f43/exec/java-exec/src/main/java/org/apache/drill/exec/planner/AbstractPartitionDescriptor.java
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/planner/AbstractPartitionDescriptor.java b/exec/java-exec/src/main/java/org/apache/drill/exec/planner/AbstractPartitionDescriptor.java
index d8872dc..ed62c91 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/planner/AbstractPartitionDescriptor.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/planner/AbstractPartitionDescriptor.java
@@ -21,6 +21,7 @@ import java.util.Iterator;
 import java.util.List;
 
 import org.apache.calcite.rel.core.TableScan;
+import org.apache.drill.exec.store.dfs.MetadataContext;
 
 /**
  * Abstract base class for file system based partition descriptors and Hive partition descriptors.
@@ -65,7 +66,7 @@ public abstract class AbstractPartitionDescriptor implements PartitionDescriptor
 
   @Override
   public TableScan createTableScan(List<PartitionLocation> newPartitions, String cacheFileRoot,
-      boolean isAllPruned) throws Exception {
+      boolean isAllPruned, MetadataContext metaContext) throws Exception {
     throw new UnsupportedOperationException();
   }
 

http://git-wip-us.apache.org/repos/asf/drill/blob/57dc9f43/exec/java-exec/src/main/java/org/apache/drill/exec/planner/FileSystemPartitionDescriptor.java
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/planner/FileSystemPartitionDescriptor.java b/exec/java-exec/src/main/java/org/apache/drill/exec/planner/FileSystemPartitionDescriptor.java
index 7796212..a04f34d 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/planner/FileSystemPartitionDescriptor.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/planner/FileSystemPartitionDescriptor.java
@@ -49,6 +49,7 @@ import org.apache.drill.exec.planner.logical.DynamicDrillTable;
 import org.apache.drill.exec.planner.physical.PlannerSettings;
 import org.apache.drill.exec.store.dfs.FileSelection;
 import org.apache.drill.exec.store.dfs.FormatSelection;
+import org.apache.drill.exec.store.dfs.MetadataContext;
 import org.apache.drill.exec.vector.NullableVarCharVector;
 import org.apache.drill.exec.vector.ValueVector;
 
@@ -211,7 +212,7 @@ public class FileSystemPartitionDescriptor extends AbstractPartitionDescriptor {
 
   @Override
   public TableScan createTableScan(List<PartitionLocation> newPartitionLocation, String cacheFileRoot,
-      boolean wasAllPartitionsPruned) throws Exception {
+      boolean wasAllPartitionsPruned, MetadataContext metaContext) throws Exception {
     List<String> newFiles = Lists.newArrayList();
     for (final PartitionLocation location : newPartitionLocation) {
       if (!location.isCompositePartition()) {
@@ -228,6 +229,7 @@ public class FileSystemPartitionDescriptor extends AbstractPartitionDescriptor {
       final FormatSelection formatSelection = (FormatSelection)table.getSelection();
       final FileSelection newFileSelection = new FileSelection(null, newFiles, getBaseTableLocation(),
           cacheFileRoot, wasAllPartitionsPruned, formatSelection.getSelection().getDirStatus());
+      newFileSelection.setMetaContext(metaContext);
       final FileGroupScan newGroupScan =
           ((FileGroupScan)((DrillScanRel)scanRel).getGroupScan()).clone(newFileSelection);
       return new DrillScanRel(scanRel.getCluster(),
@@ -239,18 +241,19 @@ public class FileSystemPartitionDescriptor extends AbstractPartitionDescriptor {
                       true /*filter pushdown*/);
     } else if (scanRel instanceof EnumerableTableScan) {
       return createNewTableScanFromSelection((EnumerableTableScan)scanRel, newFiles, cacheFileRoot,
-          wasAllPartitionsPruned);
+          wasAllPartitionsPruned, metaContext);
     } else {
       throw new UnsupportedOperationException("Only DrillScanRel and EnumerableTableScan is allowed!");
     }
   }
 
   private TableScan createNewTableScanFromSelection(EnumerableTableScan oldScan, List<String> newFiles, String cacheFileRoot,
-      boolean wasAllPartitionsPruned) {
+      boolean wasAllPartitionsPruned, MetadataContext metaContext) {
     final RelOptTableImpl t = (RelOptTableImpl) oldScan.getTable();
     final FormatSelection formatSelection = (FormatSelection) table.getSelection();
     final FileSelection newFileSelection = new FileSelection(null, newFiles, getBaseTableLocation(),
             cacheFileRoot, wasAllPartitionsPruned, formatSelection.getSelection().getDirStatus());
+    newFileSelection.setMetaContext(metaContext);
     final FormatSelection newFormatSelection = new FormatSelection(formatSelection.getFormat(), newFileSelection);
     final DrillTranslatableTable newTable = new DrillTranslatableTable(
             new DynamicDrillTable(table.getPlugin(), table.getStorageEngineName(),
@@ -265,7 +268,7 @@ public class FileSystemPartitionDescriptor extends AbstractPartitionDescriptor {
   @Override
   public TableScan createTableScan(List<PartitionLocation> newPartitionLocation,
       boolean wasAllPartitionsPruned) throws Exception {
-    return createTableScan(newPartitionLocation, null, wasAllPartitionsPruned);
+    return createTableScan(newPartitionLocation, null, wasAllPartitionsPruned, null);
   }
 
   @Override

http://git-wip-us.apache.org/repos/asf/drill/blob/57dc9f43/exec/java-exec/src/main/java/org/apache/drill/exec/planner/ParquetPartitionDescriptor.java
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/planner/ParquetPartitionDescriptor.java b/exec/java-exec/src/main/java/org/apache/drill/exec/planner/ParquetPartitionDescriptor.java
index 2c8ca95..534eb5c 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/planner/ParquetPartitionDescriptor.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/planner/ParquetPartitionDescriptor.java
@@ -28,6 +28,7 @@ import org.apache.drill.exec.planner.logical.DrillScanRel;
 import org.apache.drill.exec.planner.physical.PlannerSettings;
 import org.apache.drill.exec.store.dfs.FileSelection;
 import org.apache.drill.exec.store.dfs.FormatSelection;
+import org.apache.drill.exec.store.dfs.MetadataContext;
 import org.apache.drill.exec.store.parquet.ParquetGroupScan;
 import org.apache.drill.exec.vector.ValueVector;
 
@@ -81,9 +82,10 @@ public class ParquetPartitionDescriptor extends AbstractPartitionDescriptor {
   }
 
   private GroupScan createNewGroupScan(List<String> newFiles, String cacheFileRoot,
-      boolean wasAllPartitionsPruned) throws IOException {
+      boolean wasAllPartitionsPruned, MetadataContext metaContext) throws IOException {
     final FileSelection newSelection = FileSelection.create(null, newFiles, getBaseTableLocation(),
         cacheFileRoot, wasAllPartitionsPruned);
+    newSelection.setMetaContext(metaContext);
     final FileGroupScan newScan = ((FileGroupScan)scanRel.getGroupScan()).clone(newSelection);
     return newScan;
   }
@@ -134,13 +136,13 @@ public class ParquetPartitionDescriptor extends AbstractPartitionDescriptor {
 
   @Override
   public TableScan createTableScan(List<PartitionLocation> newPartitionLocation, String cacheFileRoot,
-      boolean wasAllPartitionsPruned) throws Exception {
+      boolean wasAllPartitionsPruned, MetadataContext metaContext) throws Exception {
     List<String> newFiles = Lists.newArrayList();
     for (final PartitionLocation location : newPartitionLocation) {
       newFiles.add(location.getEntirePartitionLocation());
     }
 
-    final GroupScan newGroupScan = createNewGroupScan(newFiles, cacheFileRoot, wasAllPartitionsPruned);
+    final GroupScan newGroupScan = createNewGroupScan(newFiles, cacheFileRoot, wasAllPartitionsPruned, metaContext);
 
     return new DrillScanRel(scanRel.getCluster(),
         scanRel.getTraitSet().plus(DrillRel.DRILL_LOGICAL),
@@ -154,7 +156,7 @@ public class ParquetPartitionDescriptor extends AbstractPartitionDescriptor {
   @Override
   public TableScan createTableScan(List<PartitionLocation> newPartitionLocation,
       boolean wasAllPartitionsPruned) throws Exception {
-    return createTableScan(newPartitionLocation, null, wasAllPartitionsPruned);
+    return createTableScan(newPartitionLocation, null, wasAllPartitionsPruned, null);
   }
 
 }

http://git-wip-us.apache.org/repos/asf/drill/blob/57dc9f43/exec/java-exec/src/main/java/org/apache/drill/exec/planner/PartitionDescriptor.java
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/planner/PartitionDescriptor.java b/exec/java-exec/src/main/java/org/apache/drill/exec/planner/PartitionDescriptor.java
index 83cb146..daee249 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/planner/PartitionDescriptor.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/planner/PartitionDescriptor.java
@@ -22,6 +22,7 @@ import org.apache.drill.common.expression.SchemaPath;
 import org.apache.drill.common.types.TypeProtos;
 import org.apache.drill.exec.physical.base.GroupScan;
 import org.apache.drill.exec.planner.physical.PlannerSettings;
+import org.apache.drill.exec.store.dfs.MetadataContext;
 import org.apache.drill.exec.vector.ValueVector;
 
 import java.util.BitSet;
@@ -90,11 +91,12 @@ public interface PartitionDescriptor extends Iterable<List<PartitionLocation>> {
    * @param newPartitions
    * @param cacheFileRoot
    * @param wasAllPartitionsPruned
+   * @param metaContext
    * @return
    * @throws Exception
    */
   public TableScan createTableScan(List<PartitionLocation> newPartitions, String cacheFileRoot,
-      boolean wasAllPartitionsPruned) throws Exception;
+      boolean wasAllPartitionsPruned, MetadataContext metaContext) throws Exception;
 
   public boolean supportsMetadataCachePruning();
 

http://git-wip-us.apache.org/repos/asf/drill/blob/57dc9f43/exec/java-exec/src/main/java/org/apache/drill/exec/planner/logical/partition/PruneScanRule.java
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/planner/logical/partition/PruneScanRule.java b/exec/java-exec/src/main/java/org/apache/drill/exec/planner/logical/partition/PruneScanRule.java
index d3e38fa..011c783 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/planner/logical/partition/PruneScanRule.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/planner/logical/partition/PruneScanRule.java
@@ -60,6 +60,7 @@ import org.apache.drill.exec.record.MaterializedField;
 import org.apache.drill.exec.record.VectorContainer;
 import org.apache.drill.exec.store.StoragePluginOptimizerRule;
 import org.apache.drill.exec.store.dfs.FormatSelection;
+import org.apache.drill.exec.store.dfs.MetadataContext;
 import org.apache.drill.exec.vector.NullableBitVector;
 import org.apache.calcite.rel.RelNode;
 import org.apache.calcite.plan.RelOptRule;
@@ -406,8 +407,13 @@ public abstract class PruneScanRule extends StoragePluginOptimizerRule {
 
       }
 
+      final Object selection = getDrillTable(scanRel).getSelection();
+      MetadataContext metaContext = null;
+      if (selection instanceof FormatSelection) {
+           metaContext = ((FormatSelection)selection).getSelection().getMetaContext();
+      }
       RelNode inputRel = descriptor.supportsMetadataCachePruning() ?
-          descriptor.createTableScan(newPartitions, cacheFileRoot, wasAllPartitionsPruned) :
+          descriptor.createTableScan(newPartitions, cacheFileRoot, wasAllPartitionsPruned, metaContext) :
             descriptor.createTableScan(newPartitions, wasAllPartitionsPruned);
 
       if (projectRel != null) {
@@ -488,14 +494,18 @@ public abstract class PruneScanRule extends StoragePluginOptimizerRule {
 
   public abstract PartitionDescriptor getPartitionDescriptor(PlannerSettings settings, TableScan scanRel);
 
+  private static DrillTable getDrillTable(final TableScan scan) {
+    DrillTable drillTable;
+    drillTable = scan.getTable().unwrap(DrillTable.class);
+    if (drillTable == null) {
+      drillTable = scan.getTable().unwrap(DrillTranslatableTable.class).getDrillTable();
+    }
+    return drillTable;
+  }
+
   private static boolean isQualifiedDirPruning(final TableScan scan) {
     if (scan instanceof EnumerableTableScan) {
-      DrillTable drillTable;
-      drillTable = scan.getTable().unwrap(DrillTable.class);
-      if (drillTable == null) {
-        drillTable = scan.getTable().unwrap(DrillTranslatableTable.class).getDrillTable();
-      }
-      final Object selection = drillTable.getSelection();
+      final Object selection = getDrillTable(scan).getSelection();
       if (selection instanceof FormatSelection
           && ((FormatSelection)selection).supportDirPruning()) {
         return true;  // Do directory-based pruning in Calcite logical

http://git-wip-us.apache.org/repos/asf/drill/blob/57dc9f43/exec/java-exec/src/main/java/org/apache/drill/exec/store/dfs/FileSelection.java
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/store/dfs/FileSelection.java b/exec/java-exec/src/main/java/org/apache/drill/exec/store/dfs/FileSelection.java
index 8904c82..3ab6277 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/store/dfs/FileSelection.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/store/dfs/FileSelection.java
@@ -20,6 +20,7 @@ package org.apache.drill.exec.store.dfs;
 import java.io.IOException;
 import java.net.URI;
 import java.util.List;
+import java.util.Map;
 import java.util.concurrent.TimeUnit;
 
 import javax.annotation.Nullable;
@@ -30,7 +31,7 @@ import com.google.common.base.Stopwatch;
 import com.google.common.base.Strings;
 import com.google.common.collect.Iterables;
 import com.google.common.collect.Lists;
-
+import com.google.common.collect.Maps;
 
 import org.apache.drill.common.exceptions.DrillRuntimeException;
 import org.apache.hadoop.fs.FileStatus;
@@ -56,6 +57,11 @@ public class FileSelection {
    */
   public final String cacheFileRoot;
 
+  /**
+   * metadata context useful for metadata operations (if any)
+   */
+  private MetadataContext metaContext = null;
+
   private enum StatusType {
     NOT_CHECKED,         // initial state
     NO_DIRS,             // no directories in this selection
@@ -106,6 +112,7 @@ public class FileSelection {
     this.selectionRoot = selection.selectionRoot;
     this.dirStatus = selection.dirStatus;
     this.cacheFileRoot = selection.cacheFileRoot;
+    this.metaContext = selection.metaContext;
     this.hadWildcard = selection.hadWildcard;
     this.wasAllPartitionsPruned = selection.wasAllPartitionsPruned;
   }
@@ -124,7 +131,7 @@ public class FileSelection {
       }
       statuses = newStatuses;
     }
-    logger.debug("FileSelection.getStatuses() took {} ms, numFiles: {}",
+    logger.info("FileSelection.getStatuses() took {} ms, numFiles: {}",
         timer.elapsed(TimeUnit.MILLISECONDS), statuses == null ? 0 : statuses.size());
 
     return statuses;
@@ -328,6 +335,7 @@ public class FileSelection {
 
   public static FileSelection createFromDirectories(final List<String> dirPaths, final FileSelection selection,
       final String cacheFileRoot) {
+    Stopwatch timer = Stopwatch.createStarted();
     final String root = selection.getSelectionRoot();
     if (Strings.isNullOrEmpty(root)) {
       throw new DrillRuntimeException("Selection root is null or empty" + root);
@@ -354,6 +362,7 @@ public class FileSelection {
     final Path path = new Path(uri.getScheme(), uri.getAuthority(), rootPath.toUri().getPath());
     FileSelection fileSel = new FileSelection(null, dirs, path.toString(), cacheFileRoot, false);
     fileSel.setHadWildcard(selection.hadWildcard());
+    logger.info("FileSelection.createFromDirectories() took {} ms ", timer.elapsed(TimeUnit.MILLISECONDS));
     return fileSel;
   }
 
@@ -402,6 +411,14 @@ public class FileSelection {
     return cacheFileRoot;
   }
 
+  public void setMetaContext(MetadataContext context) {
+    metaContext = context;
+  }
+
+  public MetadataContext getMetaContext() {
+    return metaContext;
+  }
+
   @Override
   public String toString() {
     final StringBuilder sb = new StringBuilder();

http://git-wip-us.apache.org/repos/asf/drill/blob/57dc9f43/exec/java-exec/src/main/java/org/apache/drill/exec/store/dfs/MetadataContext.java
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/store/dfs/MetadataContext.java b/exec/java-exec/src/main/java/org/apache/drill/exec/store/dfs/MetadataContext.java
new file mode 100644
index 0000000..17852ab
--- /dev/null
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/store/dfs/MetadataContext.java
@@ -0,0 +1,60 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.drill.exec.store.dfs;
+
+import java.util.Map;
+
+import com.google.common.collect.Maps;
+
+/**
+ * A metadata context that holds state across multiple invocations of
+ * the Parquet metadata APIs.
+ */
+public class MetadataContext {
+
+  /** Map of directory path to the status of whether modification time was already checked.
+   *  Note: the #directories is typically a small percentage of the #files, so the memory footprint
+   *  is expected to be relatively small.
+   */
+  private Map<String, Boolean> dirModifCheckMap = Maps.newHashMap();
+
+  public MetadataContext() {
+  }
+
+  public void setStatus(String dir) {
+    dirModifCheckMap.put(dir,  true);
+  }
+
+  public void clearStatus(String dir) {
+    dirModifCheckMap.put(dir,  false);
+  }
+
+  public boolean getStatus(String dir) {
+    if (dirModifCheckMap.containsKey(dir)) {
+      return dirModifCheckMap.get(dir);
+    }
+    return false;
+  }
+
+  public void clear() {
+    dirModifCheckMap.clear();
+  }
+
+}
+
+

http://git-wip-us.apache.org/repos/asf/drill/blob/57dc9f43/exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet/Metadata.java
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet/Metadata.java b/exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet/Metadata.java
index 45f7ca2..86b860a 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet/Metadata.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet/Metadata.java
@@ -27,8 +27,8 @@ import java.util.concurrent.TimeUnit;
 
 import org.apache.drill.common.expression.SchemaPath;
 import org.apache.drill.exec.store.TimedRunnable;
-import org.apache.drill.exec.store.dfs.DrillFileSystem;
 import org.apache.drill.exec.store.dfs.DrillPathFilter;
+import org.apache.drill.exec.store.dfs.MetadataContext;
 import org.apache.drill.exec.util.ImpersonationUtil;
 import org.apache.hadoop.fs.BlockLocation;
 import org.apache.hadoop.fs.FSDataInputStream;
@@ -81,6 +81,9 @@ public class Metadata {
 
   private final FileSystem fs;
 
+  private ParquetTableMetadataBase parquetTableMetadata;
+  private ParquetTableMetadataDirs parquetTableMetadataDirs;
+
   /**
    * Create the parquet metadata file for the directory at the given path, and for any subdirectories
    *
@@ -129,14 +132,16 @@ public class Metadata {
    * @return
    * @throws IOException
    */
-  public static ParquetTableMetadataBase readBlockMeta(FileSystem fs, String path) throws IOException {
+  public static ParquetTableMetadataBase readBlockMeta(FileSystem fs, String path, MetadataContext metaContext) throws IOException {
     Metadata metadata = new Metadata(fs);
-    return metadata.readBlockMeta(path);
+    metadata.readBlockMeta(path, false, metaContext);
+    return metadata.parquetTableMetadata;
   }
 
-  public static ParquetTableMetadataDirs readMetadataDirs(FileSystem fs, String path) throws IOException {
+  public static ParquetTableMetadataDirs readMetadataDirs(FileSystem fs, String path, MetadataContext metaContext) throws IOException {
     Metadata metadata = new Metadata(fs);
-    return metadata.readMetadataDirs(path);
+    metadata.readBlockMeta(path, true, metaContext);
+    return metadata.parquetTableMetadataDirs;
   }
 
   private Metadata(FileSystem fs) {
@@ -151,6 +156,7 @@ public class Metadata {
    */
   private Pair<ParquetTableMetadata_v2, ParquetTableMetadataDirs>
   createMetaFilesRecursively(final String path) throws IOException {
+    Stopwatch timer = Stopwatch.createStarted();
     List<ParquetFileMetadata_v2> metaDataList = Lists.newArrayList();
     List<String> directoryList = Lists.newArrayList();
     ConcurrentHashMap<ColumnTypeMetadata_v2.Key, ColumnTypeMetadata_v2> columnTypeInfoSet =
@@ -199,9 +205,13 @@ public class Metadata {
     if (directoryList.size() > 0 && childFiles.size() == 0) {
       ParquetTableMetadataDirs parquetTableMetadataDirs = new ParquetTableMetadataDirs(directoryList);
       writeFile(parquetTableMetadataDirs, new Path(p, METADATA_DIRECTORIES_FILENAME));
+      logger.info("Creating metadata files recursively took {} ms", timer.elapsed(TimeUnit.MILLISECONDS));
+      timer.stop();
       return Pair.of(parquetTableMetadata, parquetTableMetadataDirs);
     }
     List<String> emptyDirList = Lists.newArrayList();
+    logger.info("Creating metadata files recursively took {} ms", timer.elapsed(TimeUnit.MILLISECONDS));
+    timer.stop();
     return Pair.of(parquetTableMetadata, new ParquetTableMetadataDirs(emptyDirList));
   }
 
@@ -453,9 +463,12 @@ public class Metadata {
    * @return
    * @throws IOException
    */
-  private ParquetTableMetadataBase readBlockMeta(String path) throws IOException {
+  private void readBlockMeta(String path,
+      boolean dirsOnly,
+      MetadataContext metaContext) throws IOException {
     Stopwatch timer = Stopwatch.createStarted();
     Path p = new Path(path);
+    Path parentDir = p.getParent(); // parent directory of the metadata file
     ObjectMapper mapper = new ObjectMapper();
 
     final SimpleModule serialModule = new SimpleModule();
@@ -470,41 +483,38 @@ public class Metadata {
     mapper.configure(DeserializationFeature.FAIL_ON_UNKNOWN_PROPERTIES, false);
     FSDataInputStream is = fs.open(p);
 
-    ParquetTableMetadataBase parquetTableMetadata = mapper.readValue(is, ParquetTableMetadataBase.class);
-    logger.info("Took {} ms to read metadata from cache file", timer.elapsed(TimeUnit.MILLISECONDS));
-    timer.stop();
-    if (tableModified(parquetTableMetadata, p)) {
-      parquetTableMetadata =
-          (createMetaFilesRecursively(Path.getPathWithoutSchemeAndAuthority(p.getParent()).toString())).getLeft();
-    }
-    return parquetTableMetadata;
-  }
-
-  private ParquetTableMetadataDirs readMetadataDirs(String path) throws IOException {
-    Stopwatch timer = Stopwatch.createStarted();
-    Path p = new Path(path);
-    ObjectMapper mapper = new ObjectMapper();
-
-    final SimpleModule serialModule = new SimpleModule();
-    serialModule.addDeserializer(SchemaPath.class, new SchemaPath.De());
-
-    AfterburnerModule module = new AfterburnerModule();
-    module.setUseOptimizedBeanDeserializer(true);
+    boolean alreadyCheckedModification = false;
+    boolean newMetadata = false;
 
-    mapper.registerModule(serialModule);
-    mapper.registerModule(module);
-    mapper.configure(DeserializationFeature.FAIL_ON_UNKNOWN_PROPERTIES, false);
-    FSDataInputStream is = fs.open(p);
+    if (metaContext != null) {
+      alreadyCheckedModification = metaContext.getStatus(parentDir.toString());
+    }
 
-    ParquetTableMetadataDirs parquetTableMetadataDirs = mapper.readValue(is, ParquetTableMetadataDirs.class);
-    logger.info("Took {} ms to read directories from directory cache file", timer.elapsed(TimeUnit.MILLISECONDS));
-    timer.stop();
+    if (dirsOnly) {
+      parquetTableMetadataDirs = mapper.readValue(is, ParquetTableMetadataDirs.class);
+      logger.info("Took {} ms to read directories from directory cache file", timer.elapsed(TimeUnit.MILLISECONDS));
+      timer.stop();
+      if (!alreadyCheckedModification && tableModified(parquetTableMetadataDirs.getDirectories(), p, parentDir, metaContext)) {
+        parquetTableMetadataDirs =
+            (createMetaFilesRecursively(Path.getPathWithoutSchemeAndAuthority(p.getParent()).toString())).getRight();
+        newMetadata = true;
+      }
+    } else {
+      parquetTableMetadata = mapper.readValue(is, ParquetTableMetadataBase.class);
+      logger.info("Took {} ms to read metadata from cache file", timer.elapsed(TimeUnit.MILLISECONDS));
+      timer.stop();
+      if (!alreadyCheckedModification && tableModified(parquetTableMetadata.getDirectories(), p, parentDir, metaContext)) {
+        parquetTableMetadata =
+            (createMetaFilesRecursively(Path.getPathWithoutSchemeAndAuthority(p.getParent()).toString())).getLeft();
+        newMetadata = true;
+      }
+    }
 
-    if (tableModified(parquetTableMetadataDirs, p)) {
-      parquetTableMetadataDirs =
-          (createMetaFilesRecursively(Path.getPathWithoutSchemeAndAuthority(p.getParent()).toString())).getRight();
+    if (newMetadata && metaContext != null) {
+      // if new metadata files were created, invalidate the existing metadata context
+      metaContext.clear();
     }
-    return parquetTableMetadataDirs;
+
   }
 
   /**
@@ -516,35 +526,42 @@ public class Metadata {
    * @return
    * @throws IOException
    */
-  private boolean tableModified(ParquetTableMetadataBase tableMetadata, Path metaFilePath)
+  private boolean tableModified(List<String> directories, Path metaFilePath,
+      Path parentDir,
+      MetadataContext metaContext)
       throws IOException {
-    long metaFileModifyTime = fs.getFileStatus(metaFilePath).getModificationTime();
-    FileStatus directoryStatus = fs.getFileStatus(metaFilePath.getParent());
-    if (directoryStatus.getModificationTime() > metaFileModifyTime) {
-      return true;
-    }
-    for (String directory : tableMetadata.getDirectories()) {
-      directoryStatus = fs.getFileStatus(new Path(directory));
-      if (directoryStatus.getModificationTime() > metaFileModifyTime) {
-        return true;
-      }
-    }
-    return false;
-  }
 
-  private boolean tableModified(ParquetTableMetadataDirs tableMetadataDirs, Path metaFilePath)
-      throws IOException {
+    Stopwatch timer = Stopwatch.createStarted();
+
+    if (metaContext != null) {
+      metaContext.setStatus(parentDir.toString());
+    }
     long metaFileModifyTime = fs.getFileStatus(metaFilePath).getModificationTime();
-    FileStatus directoryStatus = fs.getFileStatus(metaFilePath.getParent());
+    FileStatus directoryStatus = fs.getFileStatus(parentDir);
+    int numDirs = 1;
     if (directoryStatus.getModificationTime() > metaFileModifyTime) {
+      logger.info("Directory {} was modified. Took {} ms to check modification time of {} directories", directoryStatus.getPath().toString(),
+          timer.elapsed(TimeUnit.MILLISECONDS),
+          numDirs);
+      timer.stop();
       return true;
     }
-    for (String directory : tableMetadataDirs.getDirectories()) {
+    for (String directory : directories) {
+      numDirs++;
+      if (metaContext != null) {
+        metaContext.setStatus(directory);
+      }
       directoryStatus = fs.getFileStatus(new Path(directory));
       if (directoryStatus.getModificationTime() > metaFileModifyTime) {
+        logger.info("Directory {} was modified. Took {} ms to check modification time of {} directories", directoryStatus.getPath().toString(),
+            timer.elapsed(TimeUnit.MILLISECONDS),
+            numDirs);
+        timer.stop();
         return true;
       }
     }
+    logger.info("No directories were modified. Took {} ms to check modification time of {} directories", timer.elapsed(TimeUnit.MILLISECONDS), numDirs);
+    timer.stop();
     return false;
   }
 

http://git-wip-us.apache.org/repos/asf/drill/blob/57dc9f43/exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet/ParquetFormatPlugin.java
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet/ParquetFormatPlugin.java b/exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet/ParquetFormatPlugin.java
index 5174893..1ab621b 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet/ParquetFormatPlugin.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet/ParquetFormatPlugin.java
@@ -49,6 +49,7 @@ import org.apache.drill.exec.store.dfs.FormatMatcher;
 import org.apache.drill.exec.store.dfs.FormatPlugin;
 import org.apache.drill.exec.store.dfs.FormatSelection;
 import org.apache.drill.exec.store.dfs.MagicString;
+import org.apache.drill.exec.store.dfs.MetadataContext;
 import org.apache.drill.exec.store.mock.MockStorageEngine;
 import org.apache.drill.exec.store.parquet.Metadata.ParquetTableMetadataDirs;
 import org.apache.hadoop.conf.Configuration;
@@ -214,11 +215,16 @@ public class ParquetFormatPlugin implements FormatPlugin{
         // the directory is readable since the metadata 'directories' file cannot be created otherwise.  Note
         // that isDirReadable() does a similar check with the metadata 'cache' file.
         if (fs.exists(dirMetaPath)) {
-          ParquetTableMetadataDirs mDirs = Metadata.readMetadataDirs(fs, dirMetaPath.toString());
+          // create a metadata context that will be used for the duration of the query for this table
+          MetadataContext metaContext = new MetadataContext();
+
+          ParquetTableMetadataDirs mDirs = Metadata.readMetadataDirs(fs, dirMetaPath.toString(), metaContext);
           if (mDirs.getDirectories().size() > 0) {
             FileSelection dirSelection = FileSelection.createFromDirectories(mDirs.getDirectories(), selection,
                 selection.getSelectionRoot() /* cacheFileRoot initially points to selectionRoot */);
             dirSelection.setExpandedPartial();
+            dirSelection.setMetaContext(metaContext);
+
             return new DynamicDrillTable(fsPlugin, storageEngineName, userName,
                 new FormatSelection(plugin.getConfig(), dirSelection));
           }

http://git-wip-us.apache.org/repos/asf/drill/blob/57dc9f43/exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet/ParquetGroupScan.java
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet/ParquetGroupScan.java b/exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet/ParquetGroupScan.java
index f666439..21227e4 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet/ParquetGroupScan.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet/ParquetGroupScan.java
@@ -48,6 +48,7 @@ import org.apache.drill.exec.store.StoragePluginRegistry;
 import org.apache.drill.exec.store.dfs.DrillFileSystem;
 import org.apache.drill.exec.store.dfs.DrillPathFilter;
 import org.apache.drill.exec.store.dfs.FileSelection;
+import org.apache.drill.exec.store.dfs.MetadataContext;
 import org.apache.drill.exec.store.dfs.ReadEntryFromHDFS;
 import org.apache.drill.exec.store.dfs.ReadEntryWithPath;
 import org.apache.drill.exec.store.dfs.easy.FileWork;
@@ -149,7 +150,7 @@ public class ParquetGroupScan extends AbstractFileGroupScan {
     this.selectionRoot = selectionRoot;
     this.cacheFileRoot = cacheFileRoot;
 
-    init();
+    init(null);
   }
 
   public ParquetGroupScan( //
@@ -176,7 +177,7 @@ public class ParquetGroupScan extends AbstractFileGroupScan {
       entries.add(new ReadEntryWithPath(fileName));
     }
 
-    init();
+    init(fileSelection.getMetaContext());
   }
 
   /*
@@ -570,7 +571,7 @@ public class ParquetGroupScan extends AbstractFileGroupScan {
     // we only select the files that are part of selection (by setting fileSet appropriately)
 
     // get (and set internal field) the metadata for the directory by reading the metadata file
-    this.parquetTableMetadata = Metadata.readBlockMeta(fs, metaFilePath.toString());
+    this.parquetTableMetadata = Metadata.readBlockMeta(fs, metaFilePath.toString(), selection.getMetaContext());
     List<FileStatus> fileStatuses = selection.getStatuses(fs);
 
     if (fileSet == null) {
@@ -586,7 +587,6 @@ public class ParquetGroupScan extends AbstractFileGroupScan {
 
     } else if (selection.isExpandedPartial() && !selection.hadWildcard() &&
         cacheFileRoot != null) {
-      this.parquetTableMetadata = Metadata.readBlockMeta(fs, metaFilePath.toString());
       if (selection.wasAllPartitionsPruned()) {
         // if all partitions were previously pruned, we only need to read 1 file (for the schema)
         fileSet.add(this.parquetTableMetadata.getFiles().get(0).getPath());
@@ -605,7 +605,7 @@ public class ParquetGroupScan extends AbstractFileGroupScan {
         if (status.isDirectory()) {
           //TODO [DRILL-4496] read the metadata cache files in parallel
           final Path metaPath = new Path(status.getPath(), Metadata.METADATA_FILENAME);
-          final Metadata.ParquetTableMetadataBase metadata = Metadata.readBlockMeta(fs, metaPath.toString());
+          final Metadata.ParquetTableMetadataBase metadata = Metadata.readBlockMeta(fs, metaPath.toString(), selection.getMetaContext());
           for (Metadata.ParquetFileMetadata file : metadata.getFiles()) {
             fileSet.add(file.getPath());
           }
@@ -641,7 +641,7 @@ public class ParquetGroupScan extends AbstractFileGroupScan {
     return newSelection;
   }
 
-  private void init() throws IOException {
+  private void init(MetadataContext metaContext) throws IOException {
     if (entries.size() == 1 && parquetTableMetadata == null) {
       Path p = Path.getPathWithoutSchemeAndAuthority(new Path(entries.get(0).getPath()));
       Path metaPath = null;
@@ -652,7 +652,7 @@ public class ParquetGroupScan extends AbstractFileGroupScan {
       }
       if (metaPath != null && fs.exists(metaPath)) {
         usedMetadataCache = true;
-        parquetTableMetadata = Metadata.readBlockMeta(fs, metaPath.toString());
+        parquetTableMetadata = Metadata.readBlockMeta(fs, metaPath.toString(), metaContext);
       } else {
         parquetTableMetadata = Metadata.getParquetTableMetadata(fs, p.toString());
       }
@@ -662,7 +662,7 @@ public class ParquetGroupScan extends AbstractFileGroupScan {
       if (fs.isDirectory(new Path(selectionRoot)) && fs.exists(metaPath)) {
         usedMetadataCache = true;
         if (parquetTableMetadata == null) {
-          parquetTableMetadata = Metadata.readBlockMeta(fs, metaPath.toString());
+          parquetTableMetadata = Metadata.readBlockMeta(fs, metaPath.toString(), metaContext);
         }
         if (fileSet != null) {
           parquetTableMetadata = removeUnneededRowGroups(parquetTableMetadata);
@@ -883,7 +883,7 @@ public class ParquetGroupScan extends AbstractFileGroupScan {
     ParquetGroupScan newScan = new ParquetGroupScan(this);
     newScan.modifyFileSelection(selection);
     newScan.cacheFileRoot = selection.cacheFileRoot;
-    newScan.init();
+    newScan.init(selection.getMetaContext());
     return newScan;
   }