You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@drill.apache.org by jn...@apache.org on 2016/01/30 05:38:32 UTC

[2/4] drill git commit: DRILL-2517: (Prototype from Mehant) Move directory based partition pruning to logical phase.

DRILL-2517: (Prototype from Mehant) Move directory based partition pruning to logical phase.


Project: http://git-wip-us.apache.org/repos/asf/drill/repo
Commit: http://git-wip-us.apache.org/repos/asf/drill/commit/c7dfba2e
Tree: http://git-wip-us.apache.org/repos/asf/drill/tree/c7dfba2e
Diff: http://git-wip-us.apache.org/repos/asf/drill/diff/c7dfba2e

Branch: refs/heads/master
Commit: c7dfba2e62fc6b063e953bd9cb1fc5b2903edc79
Parents: 09de31e
Author: Mehant Baid <me...@gmail.com>
Authored: Tue Nov 10 22:26:26 2015 -0800
Committer: Jinfeng Ni <jn...@apache.org>
Committed: Fri Jan 29 19:30:27 2016 -0800

----------------------------------------------------------------------
 .../HivePushPartitionFilterIntoScan.java        |   9 +-
 .../planner/FileSystemPartitionDescriptor.java  |  27 +-
 .../drill/exec/planner/logical/DrillTable.java  |   8 +-
 .../logical/partition/ParquetPruneScanRule.java |   9 +-
 .../logical/partition/PruneScanRule.java        | 268 +++++++++++++++++--
 .../drill/exec/store/dfs/FileSelection.java     |   4 +
 .../org/apache/drill/TestExampleQueries.java    |   7 +-
 7 files changed, 285 insertions(+), 47 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/drill/blob/c7dfba2e/contrib/storage-hive/core/src/main/java/org/apache/drill/exec/planner/sql/logical/HivePushPartitionFilterIntoScan.java
----------------------------------------------------------------------
diff --git a/contrib/storage-hive/core/src/main/java/org/apache/drill/exec/planner/sql/logical/HivePushPartitionFilterIntoScan.java b/contrib/storage-hive/core/src/main/java/org/apache/drill/exec/planner/sql/logical/HivePushPartitionFilterIntoScan.java
index fc2007e..17cd65a 100644
--- a/contrib/storage-hive/core/src/main/java/org/apache/drill/exec/planner/sql/logical/HivePushPartitionFilterIntoScan.java
+++ b/contrib/storage-hive/core/src/main/java/org/apache/drill/exec/planner/sql/logical/HivePushPartitionFilterIntoScan.java
@@ -18,6 +18,7 @@
 
 package org.apache.drill.exec.planner.sql.logical;
 
+import org.apache.calcite.rel.RelNode;
 import org.apache.drill.exec.ops.OptimizerRulesContext;
 import org.apache.drill.exec.physical.base.GroupScan;
 import org.apache.drill.exec.planner.PartitionDescriptor;
@@ -43,8 +44,8 @@ public abstract class HivePushPartitionFilterIntoScan {
         optimizerRulesContext) {
 
       @Override
-      public PartitionDescriptor getPartitionDescriptor(PlannerSettings settings, DrillScanRel scanRel) {
-        return new HivePartitionDescriptor(settings, scanRel, getOptimizerRulesContext().getManagedBuffer(),
+      public PartitionDescriptor getPartitionDescriptor(PlannerSettings settings, RelNode scanRel) {
+        return new HivePartitionDescriptor(settings, (DrillScanRel) scanRel, getOptimizerRulesContext().getManagedBuffer(),
             defaultPartitionValue);
       }
 
@@ -77,8 +78,8 @@ public abstract class HivePushPartitionFilterIntoScan {
         "HivePushPartitionFilterIntoScan:Filter_On_Scan_Hive", optimizerRulesContext) {
 
       @Override
-      public PartitionDescriptor getPartitionDescriptor(PlannerSettings settings, DrillScanRel scanRel) {
-        return new HivePartitionDescriptor(settings, scanRel, getOptimizerRulesContext().getManagedBuffer(),
+      public PartitionDescriptor getPartitionDescriptor(PlannerSettings settings, RelNode scanRel) {
+        return new HivePartitionDescriptor(settings, (DrillScanRel) scanRel, getOptimizerRulesContext().getManagedBuffer(),
             defaultPartitionValue);
       }
 

http://git-wip-us.apache.org/repos/asf/drill/blob/c7dfba2e/exec/java-exec/src/main/java/org/apache/drill/exec/planner/FileSystemPartitionDescriptor.java
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/planner/FileSystemPartitionDescriptor.java b/exec/java-exec/src/main/java/org/apache/drill/exec/planner/FileSystemPartitionDescriptor.java
index c4e4cb9..02658ed 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/planner/FileSystemPartitionDescriptor.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/planner/FileSystemPartitionDescriptor.java
@@ -27,6 +27,8 @@ import com.google.common.base.Charsets;
 import com.google.common.collect.Lists;
 import com.google.common.collect.Maps;
 
+import org.apache.calcite.adapter.enumerable.EnumerableTableScan;
+import org.apache.calcite.rel.RelNode;
 import org.apache.calcite.util.BitSets;
 import org.apache.drill.common.exceptions.DrillRuntimeException;
 import org.apache.drill.common.expression.SchemaPath;
@@ -35,6 +37,7 @@ import org.apache.drill.common.types.Types;
 import org.apache.drill.exec.physical.base.FileGroupScan;
 import org.apache.drill.exec.physical.base.GroupScan;
 import org.apache.drill.exec.planner.logical.DrillScanRel;
+import org.apache.drill.exec.planner.logical.DynamicDrillTable;
 import org.apache.drill.exec.planner.physical.PlannerSettings;
 import org.apache.drill.exec.store.dfs.FileSelection;
 import org.apache.drill.exec.store.dfs.FormatSelection;
@@ -50,12 +53,14 @@ public class FileSystemPartitionDescriptor extends AbstractPartitionDescriptor {
   private final String partitionLabel;
   private final int partitionLabelLength;
   private final Map<String, Integer> partitions = Maps.newHashMap();
-  private final DrillScanRel scanRel;
+  private final EnumerableTableScan scanRel;
+  private final DynamicDrillTable table;
 
-  public FileSystemPartitionDescriptor(PlannerSettings settings, DrillScanRel scanRel) {
+  public FileSystemPartitionDescriptor(PlannerSettings settings, RelNode scanRel) {
     this.partitionLabel = settings.getFsPartitionColumnLabel();
     this.partitionLabelLength = partitionLabel.length();
-    this.scanRel = scanRel;
+    this.scanRel = (EnumerableTableScan) scanRel;
+    table = scanRel.getTable().unwrap(DynamicDrillTable.class);
     for(int i =0; i < 10; i++){
       partitions.put(partitionLabel + i, i);
     }
@@ -84,9 +89,17 @@ public class FileSystemPartitionDescriptor extends AbstractPartitionDescriptor {
 
   @Override
   public GroupScan createNewGroupScan(List<String> newFiles) throws IOException {
-    final FileSelection newSelection = FileSelection.create(null, newFiles, getBaseTableLocation());
-    final FileGroupScan newScan = ((FileGroupScan)scanRel.getGroupScan()).clone(newSelection);
+    /*
+    THIS NEEDS TO CHANGE. WE SHOULD RETURN A ENUMERABLETABLESCAN??
+    final FileSelection newFileSelection = new FileSelection(newFiles, getBaseTableLocation(), true);
+    final FileGroupScan newScan = ((FileGroupScan)scanRel.getGroupScan()).clone(newFileSelection);
     return newScan;
+    */
+    return null;
+  }
+
+  public DynamicDrillTable getTable() {
+    return table;
   }
 
   @Override
@@ -124,13 +137,13 @@ public class FileSystemPartitionDescriptor extends AbstractPartitionDescriptor {
   }
 
   private String getBaseTableLocation() {
-    final FormatSelection origSelection = (FormatSelection) scanRel.getDrillTable().getSelection();
+    final FormatSelection origSelection = (FormatSelection) table.getSelection();
     return origSelection.getSelection().selectionRoot;
   }
 
   @Override
   protected void createPartitionSublists() {
-    List<String> fileLocations = ((FormatSelection) scanRel.getDrillTable().getSelection()).getAsFiles();
+    List<String> fileLocations = ((FormatSelection) table.getSelection()).getAsFiles();
     List<PartitionLocation> locations = new LinkedList<>();
     for (String file: fileLocations) {
       locations.add(new DFSPartitionLocation(MAX_NESTED_SUBDIRS, getBaseTableLocation(), file));

http://git-wip-us.apache.org/repos/asf/drill/blob/c7dfba2e/exec/java-exec/src/main/java/org/apache/drill/exec/planner/logical/DrillTable.java
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/planner/logical/DrillTable.java b/exec/java-exec/src/main/java/org/apache/drill/exec/planner/logical/DrillTable.java
index 106290d..1cb83b1 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/planner/logical/DrillTable.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/planner/logical/DrillTable.java
@@ -85,14 +85,14 @@ public abstract class DrillTable implements Table {
     return selection;
   }
 
-  public void modifySelection(Object selection) {
-    this.selection = selection;
-  }
-
   public String getStorageEngineName() {
     return storageEngineName;
   }
 
+  public String getUserName() {
+    return userName;
+  }
+
   @Override
   public Statistic getStatistic() {
     return Statistics.UNKNOWN;

http://git-wip-us.apache.org/repos/asf/drill/blob/c7dfba2e/exec/java-exec/src/main/java/org/apache/drill/exec/planner/logical/partition/ParquetPruneScanRule.java
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/planner/logical/partition/ParquetPruneScanRule.java b/exec/java-exec/src/main/java/org/apache/drill/exec/planner/logical/partition/ParquetPruneScanRule.java
index b4f4e95..d92d2b0 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/planner/logical/partition/ParquetPruneScanRule.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/planner/logical/partition/ParquetPruneScanRule.java
@@ -19,6 +19,7 @@ package org.apache.drill.exec.planner.logical.partition;
 
 import org.apache.calcite.plan.RelOptRule;
 import org.apache.calcite.plan.RelOptRuleCall;
+import org.apache.calcite.rel.RelNode;
 import org.apache.drill.exec.ops.OptimizerRulesContext;
 import org.apache.drill.exec.physical.base.FileGroupScan;
 import org.apache.drill.exec.physical.base.GroupScan;
@@ -41,8 +42,8 @@ public class ParquetPruneScanRule {
         optimizerRulesContext) {
 
       @Override
-      public PartitionDescriptor getPartitionDescriptor(PlannerSettings settings, DrillScanRel scanRel) {
-        return new ParquetPartitionDescriptor(settings, scanRel);
+      public PartitionDescriptor getPartitionDescriptor(PlannerSettings settings, RelNode scanRel) {
+        return new ParquetPartitionDescriptor(settings, (DrillScanRel) scanRel);
       }
 
       @Override
@@ -73,8 +74,8 @@ public class ParquetPruneScanRule {
         "PruneScanRule:Filter_On_Scan_Parquet", optimizerRulesContext) {
 
       @Override
-      public PartitionDescriptor getPartitionDescriptor(PlannerSettings settings, DrillScanRel scanRel) {
-        return new ParquetPartitionDescriptor(settings, scanRel);
+      public PartitionDescriptor getPartitionDescriptor(PlannerSettings settings, RelNode scanRel) {
+        return new ParquetPartitionDescriptor(settings, (DrillScanRel) scanRel);
       }
 
       @Override

http://git-wip-us.apache.org/repos/asf/drill/blob/c7dfba2e/exec/java-exec/src/main/java/org/apache/drill/exec/planner/logical/partition/PruneScanRule.java
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/planner/logical/partition/PruneScanRule.java b/exec/java-exec/src/main/java/org/apache/drill/exec/planner/logical/partition/PruneScanRule.java
index 4cc9c46..bfd6597 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/planner/logical/partition/PruneScanRule.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/planner/logical/partition/PruneScanRule.java
@@ -25,12 +25,14 @@ import java.util.concurrent.TimeUnit;
 
 import com.google.common.base.Stopwatch;
 import org.apache.calcite.adapter.enumerable.EnumerableTableScan;
-import org.apache.calcite.rel.core.Filter;
+import org.apache.calcite.prepare.RelOptTableImpl;
 import org.apache.calcite.rel.core.Project;
+import org.apache.calcite.rel.logical.LogicalFilter;
 import org.apache.calcite.rex.RexUtil;
 import org.apache.calcite.util.BitSets;
 
 import org.apache.drill.common.expression.ErrorCollectorImpl;
+import org.apache.drill.common.expression.ExpressionFunction;
 import org.apache.drill.common.expression.LogicalExpression;
 import org.apache.drill.common.expression.SchemaPath;
 import org.apache.drill.common.types.TypeProtos;
@@ -53,13 +55,15 @@ import org.apache.drill.exec.planner.logical.DrillParseContext;
 import org.apache.drill.exec.planner.logical.DrillProjectRel;
 import org.apache.drill.exec.planner.logical.DrillRel;
 import org.apache.drill.exec.planner.logical.DrillScanRel;
-import org.apache.drill.exec.planner.logical.DrillTable;
+import org.apache.drill.exec.planner.logical.DynamicDrillTable;
 import org.apache.drill.exec.planner.logical.RelOptHelper;
 import org.apache.drill.exec.planner.physical.PlannerSettings;
 import org.apache.drill.exec.planner.physical.PrelUtil;
 import org.apache.drill.exec.record.MaterializedField;
 import org.apache.drill.exec.record.VectorContainer;
 import org.apache.drill.exec.store.StoragePluginOptimizerRule;
+import org.apache.drill.exec.store.dfs.FileSelection;
+import org.apache.drill.exec.store.dfs.FormatSelection;
 import org.apache.drill.exec.vector.NullableBitVector;
 import org.apache.calcite.rel.RelNode;
 import org.apache.calcite.plan.RelOptRule;
@@ -72,6 +76,7 @@ import com.google.common.collect.Lists;
 import com.google.common.collect.Maps;
 import org.apache.drill.exec.vector.ValueVector;
 
+
 public abstract class PruneScanRule extends StoragePluginOptimizerRule {
   static final org.slf4j.Logger logger = org.slf4j.LoggerFactory.getLogger(PruneScanRule.class);
 
@@ -84,71 +89,280 @@ public abstract class PruneScanRule extends StoragePluginOptimizerRule {
 
   public static final RelOptRule getFilterOnProject(OptimizerRulesContext optimizerRulesContext) {
     return new PruneScanRule(
-        RelOptHelper.some(Filter.class, RelOptHelper.some(Project.class, RelOptHelper.any(EnumerableTableScan.class))),
+        RelOptHelper.some(LogicalFilter.class, RelOptHelper.some(Project.class, RelOptHelper.any(EnumerableTableScan.class))),
         "PruneScanRule:Filter_On_Project",
         optimizerRulesContext) {
 
       @Override
-      public PartitionDescriptor getPartitionDescriptor(PlannerSettings settings, DrillScanRel scanRel) {
+      public PartitionDescriptor getPartitionDescriptor(PlannerSettings settings, RelNode scanRel) {
         return new FileSystemPartitionDescriptor(settings, scanRel);
       }
 
       @Override
       public boolean matches(RelOptRuleCall call) {
+        /*
         final DrillScanRel scan = (DrillScanRel) call.rel(2);
         GroupScan groupScan = scan.getGroupScan();
         // this rule is applicable only for dfs based partition pruning
-        if (PrelUtil.getPlannerSettings(scan.getCluster().getPlanner()).isHepPartitionPruningEnabled()) {
-          return groupScan instanceof FileGroupScan && groupScan.supportsPartitionFilterPushdown() && !scan.partitionFilterPushdown();
-        } else {
-          return groupScan instanceof FileGroupScan && groupScan.supportsPartitionFilterPushdown();
-        }
+        return groupScan instanceof FileGroupScan && groupScan.supportsPartitionFilterPushdown();
+        */
+        return true;
       }
 
       @Override
       public void onMatch(RelOptRuleCall call) {
-        final FilterRel filterRel = (DrillFilterRel) call.rel(0);
-        final ProjectRel projectRel = (DrillProjectRel) call.rel(1);
-        final ScanRel scanRel = (DrillScanRel) call.rel(2);
-        doOnMatch(call, filterRel, projectRel, scanRel);
+        final LogicalFilter filterRel = call.rel(0);
+        final Project projectRel = call.rel(1);
+        final EnumerableTableScan scanRel = call.rel(2);
+        doOnMatchLogical(call, filterRel, projectRel, scanRel);
       }
     };
   }
 
   public static final RelOptRule getFilterOnScan(OptimizerRulesContext optimizerRulesContext) {
     return new PruneScanRule(
-        RelOptHelper.some(Filter.class, RelOptHelper.any(EnumerableTableScan.class)),
+        RelOptHelper.some(LogicalFilter.class, RelOptHelper.any(EnumerableTableScan.class)),
         "PruneScanRule:Filter_On_Scan", optimizerRulesContext) {
 
       @Override
-      public PartitionDescriptor getPartitionDescriptor(PlannerSettings settings, DrillScanRel scanRel) {
+      public PartitionDescriptor getPartitionDescriptor(PlannerSettings settings, RelNode scanRel) {
         return new FileSystemPartitionDescriptor(settings, scanRel);
       }
 
       @Override
       public boolean matches(RelOptRuleCall call) {
-        final DrillScanRel scan = (DrillScanRel) call.rel(1);
+        /* final DrillScanRel scan = (DrillScanRel) call.rel(1);
         GroupScan groupScan = scan.getGroupScan();
         // this rule is applicable only for dfs based partition pruning
-        if (PrelUtil.getPlannerSettings(scan.getCluster().getPlanner()).isHepPartitionPruningEnabled()) {
-          return groupScan instanceof FileGroupScan && groupScan.supportsPartitionFilterPushdown() && !scan.partitionFilterPushdown();
-        } else {
-          return groupScan instanceof FileGroupScan && groupScan.supportsPartitionFilterPushdown();
-        }
+        return groupScan instanceof FileGroupScan && groupScan.supportsPartitionFilterPushdown();
+        */
+        return true;
       }
 
       @Override
       public void onMatch(RelOptRuleCall call) {
-        final DrillFilterRel filterRel = (DrillFilterRel) call.rel(0);
-        final DrillScanRel scanRel = (DrillScanRel) call.rel(1);
-        doOnMatch(call, filterRel, null, scanRel);
+        final LogicalFilter filterRel = call.rel(0);
+        final EnumerableTableScan scanRel = call.rel(1);
+        doOnMatchLogical(call, filterRel, null, scanRel);
       }
     };
   }
 
-  protected void doOnMatch(RelOptRuleCall call, Filter filterRel, Project projectRel,  DrillScanRel scanRel) {
-    final DrillTable table = scanRel.getTable().unwrap(DrillTable.class);
+  // TODO: Combine the doOnMatch and doOnMatchLogical
+  protected void doOnMatchLogical(RelOptRuleCall call, LogicalFilter filterRel, Project projectRel, EnumerableTableScan scanRel) {
+    final String pruningClassName = getClass().getName();
+    logger.info("Beginning partition pruning, pruning class: {}", pruningClassName);
+    Stopwatch totalPruningTime = new Stopwatch();
+    totalPruningTime.start();
+
+
+    final PlannerSettings settings = PrelUtil.getPlannerSettings(call.getPlanner());
+    PartitionDescriptor descriptor = getPartitionDescriptor(settings, scanRel);
+    final BufferAllocator allocator = optimizerContext.getAllocator();
+
+
+    RexNode condition = null;
+    if (projectRel == null) {
+      condition = filterRel.getCondition();
+    } else {
+      // get the filter as if it were below the projection.
+      condition = RelOptUtil.pushFilterPastProject(filterRel.getCondition(), projectRel);
+    }
+
+    RewriteAsBinaryOperators visitor = new RewriteAsBinaryOperators(true, filterRel.getCluster().getRexBuilder());
+    condition = condition.accept(visitor);
+
+    Map<Integer, String> fieldNameMap = Maps.newHashMap();
+    List<String> fieldNames = scanRel.getRowType().getFieldNames();
+    BitSet columnBitset = new BitSet();
+    BitSet partitionColumnBitSet = new BitSet();
+
+    int relColIndex = 0;
+    for (String field : fieldNames) {
+      final Integer partitionIndex = descriptor.getIdIfValid(field);
+      if (partitionIndex != null) {
+        fieldNameMap.put(partitionIndex, field);
+        partitionColumnBitSet.set(partitionIndex);
+        columnBitset.set(relColIndex);
+      }
+      relColIndex++;
+    }
+
+    if (partitionColumnBitSet.isEmpty()) {
+      logger.info("No partition columns are projected from the scan..continue. " +
+          "Total pruning elapsed time: {} ms", totalPruningTime.elapsed(TimeUnit.MILLISECONDS));
+      return;
+    }
+
+    // stop watch to track how long we spend in different phases of pruning
+    Stopwatch miscTimer = new Stopwatch();
+
+    // track how long we spend building the filter tree
+    miscTimer.start();
+
+    FindPartitionConditions c = new FindPartitionConditions(columnBitset, filterRel.getCluster().getRexBuilder());
+    c.analyze(condition);
+    RexNode pruneCondition = c.getFinalCondition();
+
+    logger.info("Total elapsed time to build and analyze filter tree: {} ms",
+        miscTimer.elapsed(TimeUnit.MILLISECONDS));
+    miscTimer.reset();
+
+    if (pruneCondition == null) {
+      logger.info("No conditions were found eligible for partition pruning." +
+          "Total pruning elapsed time: {} ms", totalPruningTime.elapsed(TimeUnit.MILLISECONDS));
+      return;
+    }
+
+    // set up the partitions
+    List<String> newFiles = Lists.newArrayList();
+    long numTotal = 0; // total number of partitions
+    int batchIndex = 0;
+    String firstLocation = null;
+    LogicalExpression materializedExpr = null;
+
+    // Outer loop: iterate over a list of batches of PartitionLocations
+    for (List<PartitionLocation> partitions : descriptor) {
+      numTotal += partitions.size();
+      logger.debug("Evaluating partition pruning for batch {}", batchIndex);
+      if (batchIndex == 0) { // save the first location in case everything is pruned
+        firstLocation = partitions.get(0).getEntirePartitionLocation();
+      }
+      final NullableBitVector output = new NullableBitVector(MaterializedField.create("", Types.optional(MinorType.BIT)), allocator);
+      final VectorContainer container = new VectorContainer();
+
+      try {
+        final ValueVector[] vectors = new ValueVector[descriptor.getMaxHierarchyLevel()];
+        for (int partitionColumnIndex : BitSets.toIter(partitionColumnBitSet)) {
+          SchemaPath column = SchemaPath.getSimplePath(fieldNameMap.get(partitionColumnIndex));
+          MajorType type = descriptor.getVectorType(column, settings);
+          MaterializedField field = MaterializedField.create(column, type);
+          ValueVector v = TypeHelper.getNewVector(field, allocator);
+          v.allocateNew();
+          vectors[partitionColumnIndex] = v;
+          container.add(v);
+        }
+
+        // track how long we spend populating partition column vectors
+        miscTimer.start();
+
+        // populate partition vectors.
+        descriptor.populatePartitionVectors(vectors, partitions, partitionColumnBitSet, fieldNameMap);
+
+        logger.info("Elapsed time to populate partitioning column vectors: {} ms within batchIndex: {}",
+            miscTimer.elapsed(TimeUnit.MILLISECONDS), batchIndex);
+        miscTimer.reset();
+
+        // materialize the expression; only need to do this once
+        if (batchIndex == 0) {
+          materializedExpr = materializePruneExpr(pruneCondition, settings, scanRel, container);
+          if (materializedExpr == null) {
+            // continue without partition pruning; no need to log anything here since
+            // materializePruneExpr logs it already
+            logger.info("Total pruning elapsed time: {} ms",
+                totalPruningTime.elapsed(TimeUnit.MILLISECONDS));
+            return;
+          }
+        }
+
+        output.allocateNew(partitions.size());
+
+        // start the timer to evaluate how long we spend in the interpreter evaluation
+        miscTimer.start();
+
+        InterpreterEvaluator.evaluate(partitions.size(), optimizerContext, container, output, materializedExpr);
+
+        logger.info("Elapsed time in interpreter evaluation: {} ms within batchIndex: {}",
+            miscTimer.elapsed(TimeUnit.MILLISECONDS), batchIndex);
+        miscTimer.reset();
+
+        int recordCount = 0;
+        int qualifiedCount = 0;
+
+        // Inner loop: within each batch iterate over the PartitionLocations
+        for(PartitionLocation part: partitions){
+          if(!output.getAccessor().isNull(recordCount) && output.getAccessor().get(recordCount) == 1){
+            newFiles.add(part.getEntirePartitionLocation());
+            qualifiedCount++;
+          }
+          recordCount++;
+        }
+        logger.debug("Within batch {}: total records: {}, qualified records: {}", batchIndex, recordCount, qualifiedCount);
+        batchIndex++;
+      } catch (Exception e) {
+        logger.warn("Exception while trying to prune partition.", e);
+        logger.info("Total pruning elapsed time: {} ms", totalPruningTime.elapsed(TimeUnit.MILLISECONDS));
+        return; // continue without partition pruning
+      } finally {
+        container.clear();
+        if (output != null) {
+          output.clear();
+        }
+      }
+    }
+
+    try {
+
+      boolean canDropFilter = true;
+
+      if (newFiles.isEmpty()) {
+        assert firstLocation != null;
+        newFiles.add(firstLocation);
+        canDropFilter = false;
+      }
+
+      if (newFiles.size() == numTotal) {
+        logger.info("No partitions were eligible for pruning");
+        return;
+      }
+
+      logger.info("Pruned {} partitions down to {}", numTotal, newFiles.size());
+
+      List<RexNode> conjuncts = RelOptUtil.conjunctions(condition);
+      List<RexNode> pruneConjuncts = RelOptUtil.conjunctions(pruneCondition);
+      conjuncts.removeAll(pruneConjuncts);
+      RexNode newCondition = RexUtil.composeConjunction(filterRel.getCluster().getRexBuilder(), conjuncts, false);
+
+      RewriteCombineBinaryOperators reverseVisitor = new RewriteCombineBinaryOperators(true, filterRel.getCluster().getRexBuilder());
+
+      condition = condition.accept(reverseVisitor);
+      pruneCondition = pruneCondition.accept(reverseVisitor);
+
+      RelOptTableImpl t = (RelOptTableImpl) scanRel.getTable();
+      DynamicDrillTable oldTable = ((FileSystemPartitionDescriptor) descriptor).getTable();
+      FormatSelection formatSelection = (FormatSelection) oldTable.getSelection();
+      FileSelection oldFileSelection = formatSelection.getSelection();
+      FileSelection newFileSelection = new FileSelection(newFiles, oldFileSelection.selectionRoot, oldFileSelection.getParquetMetadata(), oldFileSelection.getFileStatuses());
+      FormatSelection newFormatSelection = new FormatSelection(formatSelection.getFormat(), newFileSelection);
+      DynamicDrillTable newTable = new DynamicDrillTable(oldTable.getPlugin(), oldTable.getStorageEngineName(),
+          oldTable.getUserName(), newFormatSelection);
+      RelOptTableImpl newOptTableImpl = RelOptTableImpl.create(t.getRelOptSchema(), t.getRowType(), newTable);
+
+      // TODO: The new scan should come from the PartitionDescriptor
+      // TODO: Update the PartitionDescriptor to return ScanRel instead of the GroupScan 
+      EnumerableTableScan newScan = EnumerableTableScan.create(scanRel.getCluster(), newOptTableImpl);
+
+      RelNode inputRel = newScan;
+
+      if (projectRel != null) {
+        inputRel = projectRel.copy(projectRel.getTraitSet(), Collections.singletonList(inputRel));
+      }
+
+      if (newCondition.isAlwaysTrue() && canDropFilter) {
+        call.transformTo(inputRel);
+      } else {
+        final RelNode newFilter = filterRel.copy(filterRel.getTraitSet(), Collections.singletonList(inputRel));
+        call.transformTo(newFilter);
+      }
+
+    } catch (Exception e) {
+      logger.warn("Exception while using the pruned partitions.", e);
+    } finally {
+      logger.info("Total pruning elapsed time: {} ms", totalPruningTime.elapsed(TimeUnit.MILLISECONDS));
+    }
+  }
 
+  protected void doOnMatch(RelOptRuleCall call, DrillFilterRel filterRel, DrillProjectRel projectRel, DrillScanRel scanRel) {
     final String pruningClassName = getClass().getName();
     logger.info("Beginning partition pruning, pruning class: {}", pruningClassName);
     Stopwatch totalPruningTime = new Stopwatch();
@@ -388,5 +602,5 @@ public abstract class PruneScanRule extends StoragePluginOptimizerRule {
     return optimizerContext;
   }
 
-  public abstract PartitionDescriptor getPartitionDescriptor(PlannerSettings settings, DrillScanRel scanRel);
+  public abstract PartitionDescriptor getPartitionDescriptor(PlannerSettings settings, RelNode scanRel);
 }

http://git-wip-us.apache.org/repos/asf/drill/blob/c7dfba2e/exec/java-exec/src/main/java/org/apache/drill/exec/store/dfs/FileSelection.java
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/store/dfs/FileSelection.java b/exec/java-exec/src/main/java/org/apache/drill/exec/store/dfs/FileSelection.java
index 6df3ffc..0d49b5c 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/store/dfs/FileSelection.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/store/dfs/FileSelection.java
@@ -242,4 +242,8 @@ public class FileSelection {
     }
   }
 
+  public List<FileStatus> getFileStatuses() {
+    return statuses;
+  }
+
 }

http://git-wip-us.apache.org/repos/asf/drill/blob/c7dfba2e/exec/java-exec/src/test/java/org/apache/drill/TestExampleQueries.java
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/test/java/org/apache/drill/TestExampleQueries.java b/exec/java-exec/src/test/java/org/apache/drill/TestExampleQueries.java
index 0b52b9e..0ca6bb9 100644
--- a/exec/java-exec/src/test/java/org/apache/drill/TestExampleQueries.java
+++ b/exec/java-exec/src/test/java/org/apache/drill/TestExampleQueries.java
@@ -1019,7 +1019,7 @@ public class TestExampleQueries extends BaseTestQuery {
         .sqlQuery(query)
         .expectsEmptyResultSet()
         .optionSettingQueriesForTestQuery("ALTER SESSION SET `planner.enable_hashjoin` = false; " +
-                                          "ALTER SESSION SET `planner.disable_exchanges` = true")
+            "ALTER SESSION SET `planner.disable_exchanges` = true")
         .build()
         .run();
 
@@ -1194,4 +1194,9 @@ public class TestExampleQueries extends BaseTestQuery {
         .build()
         .run();
   }
+
+  @Test
+  public void t() throws Exception {
+    test("explain plan for select a from dfs.tmp.foo where dir0 = 1");
+  }
 }
\ No newline at end of file