You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@drill.apache.org by jn...@apache.org on 2016/01/30 05:38:33 UTC

[3/4] drill git commit: DRILL-2517: Move directory-based partition pruning to Calcite logical planning phase.

DRILL-2517: Move directory-based partition pruning to Calcite logical planning phase.

1) Make directory-based pruning rule both work in calcite logical and drill logical planning phase.

2) Only apply directory-based pruning in logical phase when there is no metadata cache.

3) Make FileSelection constructor public, since FileSelection.create() would modify selectionRoot.


Project: http://git-wip-us.apache.org/repos/asf/drill/repo
Commit: http://git-wip-us.apache.org/repos/asf/drill/commit/9b4008dc
Tree: http://git-wip-us.apache.org/repos/asf/drill/tree/9b4008dc
Diff: http://git-wip-us.apache.org/repos/asf/drill/diff/9b4008dc

Branch: refs/heads/master
Commit: 9b4008dc384bc5e0b9af9a048bfc93c5bcb902b2
Parents: c7dfba2
Author: Jinfeng Ni <jn...@apache.org>
Authored: Fri Jan 8 10:28:53 2016 -0800
Committer: Jinfeng Ni <jn...@apache.org>
Committed: Fri Jan 29 19:30:27 2016 -0800

----------------------------------------------------------------------
 .../planner/sql/HivePartitionDescriptor.java    |  56 +--
 .../HivePushPartitionFilterIntoScan.java        |  15 +-
 .../planner/FileSystemPartitionDescriptor.java  |  81 ++++-
 .../planner/ParquetPartitionDescriptor.java     |  17 +-
 .../drill/exec/planner/PartitionDescriptor.java |  12 +-
 .../planner/logical/DrillPushProjIntoScan.java  |   8 +-
 .../exec/planner/logical/DrillRuleSets.java     |  21 +-
 .../logical/partition/ParquetPruneScanRule.java |  19 +-
 .../logical/partition/PruneScanRule.java        | 346 ++++---------------
 .../planner/sql/handlers/DefaultSqlHandler.java |   9 +
 .../drill/exec/store/dfs/FileSelection.java     |   6 +-
 .../drill/exec/store/dfs/FormatSelection.java   |   4 +
 .../store/parquet/ParquetFileSelection.java     |   4 +
 .../org/apache/drill/TestExampleQueries.java    |   4 -
 .../org/apache/drill/TestPartitionFilter.java   |  16 +
 .../1995/Q1/orders_95_q1.parquet                | Bin 0 -> 2180 bytes
 .../1996/Q1/badFormat.parquet                   |   1 +
 17 files changed, 281 insertions(+), 338 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/drill/blob/9b4008dc/contrib/storage-hive/core/src/main/java/org/apache/drill/exec/planner/sql/HivePartitionDescriptor.java
----------------------------------------------------------------------
diff --git a/contrib/storage-hive/core/src/main/java/org/apache/drill/exec/planner/sql/HivePartitionDescriptor.java b/contrib/storage-hive/core/src/main/java/org/apache/drill/exec/planner/sql/HivePartitionDescriptor.java
index e1eb25e..e531f38 100644
--- a/contrib/storage-hive/core/src/main/java/org/apache/drill/exec/planner/sql/HivePartitionDescriptor.java
+++ b/contrib/storage-hive/core/src/main/java/org/apache/drill/exec/planner/sql/HivePartitionDescriptor.java
@@ -19,6 +19,7 @@ package org.apache.drill.exec.planner.sql;
 
 import io.netty.buffer.DrillBuf;
 
+import org.apache.calcite.rel.core.TableScan;
 import org.apache.calcite.util.BitSets;
 import org.apache.drill.common.exceptions.ExecutionSetupException;
 import org.apache.drill.common.expression.SchemaPath;
@@ -27,6 +28,7 @@ import org.apache.drill.exec.physical.base.GroupScan;
 import org.apache.drill.exec.planner.AbstractPartitionDescriptor;
 import org.apache.drill.exec.planner.PartitionDescriptor;
 import org.apache.drill.exec.planner.PartitionLocation;
+import org.apache.drill.exec.planner.logical.DrillRel;
 import org.apache.drill.exec.planner.logical.DrillScanRel;
 import org.apache.drill.exec.planner.physical.PlannerSettings;
 import org.apache.drill.exec.store.hive.HiveUtilities;
@@ -90,27 +92,6 @@ public class HivePartitionDescriptor extends AbstractPartitionDescriptor {
   }
 
   @Override
-  public GroupScan createNewGroupScan(List<String> newFiles) throws ExecutionSetupException {
-    HiveScan hiveScan = (HiveScan) scanRel.getGroupScan();
-    HiveReadEntry origReadEntry = hiveScan.hiveReadEntry;
-    List<HiveTable.HivePartition> oldPartitions = origReadEntry.partitions;
-    List<HiveTable.HivePartition> newPartitions = new LinkedList<>();
-
-    for (HiveTable.HivePartition part: oldPartitions) {
-      String partitionLocation = part.getPartition().getSd().getLocation();
-      for (String newPartitionLocation: newFiles) {
-        if (partitionLocation.equals(newPartitionLocation)) {
-          newPartitions.add(part);
-        }
-      }
-    }
-
-    HiveReadEntry newReadEntry = new HiveReadEntry(origReadEntry.table, newPartitions);
-
-    return hiveScan.clone(newReadEntry);
-  }
-
-  @Override
   public void populatePartitionVectors(ValueVector[] vectors, List<PartitionLocation> partitions,
                                        BitSet partitionColumnBitSet, Map<Integer, String> fieldNameMap) {
     int record = 0;
@@ -169,4 +150,37 @@ public class HivePartitionDescriptor extends AbstractPartitionDescriptor {
     sublistsCreated = true;
   }
 
+  @Override
+  public TableScan createTableScan(List<String> newPartitions) throws Exception {
+    GroupScan newGroupScan = createNewGroupScan(newPartitions);
+    return new DrillScanRel(scanRel.getCluster(),
+        scanRel.getTraitSet().plus(DrillRel.DRILL_LOGICAL),
+        scanRel.getTable(),
+        newGroupScan,
+        scanRel.getRowType(),
+        scanRel.getColumns(),
+        true /*filter pushdown*/);
+  }
+
+  private GroupScan createNewGroupScan(List<String> newFiles) throws ExecutionSetupException {
+    HiveScan hiveScan = (HiveScan) scanRel.getGroupScan();
+    HiveReadEntry origReadEntry = hiveScan.hiveReadEntry;
+    List<HiveTable.HivePartition> oldPartitions = origReadEntry.partitions;
+    List<HiveTable.HivePartition> newPartitions = new LinkedList<>();
+
+    for (HiveTable.HivePartition part: oldPartitions) {
+      String partitionLocation = part.getPartition().getSd().getLocation();
+      for (String newPartitionLocation: newFiles) {
+        if (partitionLocation.equals(newPartitionLocation)) {
+          newPartitions.add(part);
+        }
+      }
+    }
+
+    HiveReadEntry newReadEntry = new HiveReadEntry(origReadEntry.table, newPartitions);
+
+    return hiveScan.clone(newReadEntry);
+  }
+
+
 }

http://git-wip-us.apache.org/repos/asf/drill/blob/9b4008dc/contrib/storage-hive/core/src/main/java/org/apache/drill/exec/planner/sql/logical/HivePushPartitionFilterIntoScan.java
----------------------------------------------------------------------
diff --git a/contrib/storage-hive/core/src/main/java/org/apache/drill/exec/planner/sql/logical/HivePushPartitionFilterIntoScan.java b/contrib/storage-hive/core/src/main/java/org/apache/drill/exec/planner/sql/logical/HivePushPartitionFilterIntoScan.java
index 17cd65a..0bdfe99 100644
--- a/contrib/storage-hive/core/src/main/java/org/apache/drill/exec/planner/sql/logical/HivePushPartitionFilterIntoScan.java
+++ b/contrib/storage-hive/core/src/main/java/org/apache/drill/exec/planner/sql/logical/HivePushPartitionFilterIntoScan.java
@@ -19,6 +19,7 @@
 package org.apache.drill.exec.planner.sql.logical;
 
 import org.apache.calcite.rel.RelNode;
+import org.apache.calcite.rel.core.TableScan;
 import org.apache.drill.exec.ops.OptimizerRulesContext;
 import org.apache.drill.exec.physical.base.GroupScan;
 import org.apache.drill.exec.planner.PartitionDescriptor;
@@ -44,7 +45,7 @@ public abstract class HivePushPartitionFilterIntoScan {
         optimizerRulesContext) {
 
       @Override
-      public PartitionDescriptor getPartitionDescriptor(PlannerSettings settings, RelNode scanRel) {
+      public PartitionDescriptor getPartitionDescriptor(PlannerSettings settings, TableScan scanRel) {
         return new HivePartitionDescriptor(settings, (DrillScanRel) scanRel, getOptimizerRulesContext().getManagedBuffer(),
             defaultPartitionValue);
       }
@@ -63,9 +64,9 @@ public abstract class HivePushPartitionFilterIntoScan {
 
       @Override
       public void onMatch(RelOptRuleCall call) {
-        final DrillFilterRel filterRel = (DrillFilterRel) call.rel(0);
-        final DrillProjectRel projectRel = (DrillProjectRel) call.rel(1);
-        final DrillScanRel scanRel = (DrillScanRel) call.rel(2);
+        final DrillFilterRel filterRel =  call.rel(0);
+        final DrillProjectRel projectRel = call.rel(1);
+        final DrillScanRel scanRel = call.rel(2);
         doOnMatch(call, filterRel, projectRel, scanRel);
       }
     };
@@ -78,7 +79,7 @@ public abstract class HivePushPartitionFilterIntoScan {
         "HivePushPartitionFilterIntoScan:Filter_On_Scan_Hive", optimizerRulesContext) {
 
       @Override
-      public PartitionDescriptor getPartitionDescriptor(PlannerSettings settings, RelNode scanRel) {
+      public PartitionDescriptor getPartitionDescriptor(PlannerSettings settings, TableScan scanRel) {
         return new HivePartitionDescriptor(settings, (DrillScanRel) scanRel, getOptimizerRulesContext().getManagedBuffer(),
             defaultPartitionValue);
       }
@@ -97,8 +98,8 @@ public abstract class HivePushPartitionFilterIntoScan {
 
       @Override
       public void onMatch(RelOptRuleCall call) {
-        final DrillFilterRel filterRel = (DrillFilterRel) call.rel(0);
-        final DrillScanRel scanRel = (DrillScanRel) call.rel(1);
+        final DrillFilterRel filterRel = call.rel(0);
+        final DrillScanRel scanRel = call.rel(1);
         doOnMatch(call, filterRel, null, scanRel);
       }
     };

http://git-wip-us.apache.org/repos/asf/drill/blob/9b4008dc/exec/java-exec/src/main/java/org/apache/drill/exec/planner/FileSystemPartitionDescriptor.java
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/planner/FileSystemPartitionDescriptor.java b/exec/java-exec/src/main/java/org/apache/drill/exec/planner/FileSystemPartitionDescriptor.java
index 02658ed..04a3f97 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/planner/FileSystemPartitionDescriptor.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/planner/FileSystemPartitionDescriptor.java
@@ -24,11 +24,14 @@ import java.util.List;
 import java.util.Map;
 
 import com.google.common.base.Charsets;
+import com.google.common.base.Preconditions;
 import com.google.common.collect.Lists;
 import com.google.common.collect.Maps;
 
 import org.apache.calcite.adapter.enumerable.EnumerableTableScan;
+import org.apache.calcite.prepare.RelOptTableImpl;
 import org.apache.calcite.rel.RelNode;
+import org.apache.calcite.rel.core.TableScan;
 import org.apache.calcite.util.BitSets;
 import org.apache.drill.common.exceptions.DrillRuntimeException;
 import org.apache.drill.common.expression.SchemaPath;
@@ -36,7 +39,10 @@ import org.apache.drill.common.types.TypeProtos;
 import org.apache.drill.common.types.Types;
 import org.apache.drill.exec.physical.base.FileGroupScan;
 import org.apache.drill.exec.physical.base.GroupScan;
+import org.apache.drill.exec.planner.logical.DrillRel;
 import org.apache.drill.exec.planner.logical.DrillScanRel;
+import org.apache.drill.exec.planner.logical.DrillTable;
+import org.apache.drill.exec.planner.logical.DrillTranslatableTable;
 import org.apache.drill.exec.planner.logical.DynamicDrillTable;
 import org.apache.drill.exec.planner.physical.PlannerSettings;
 import org.apache.drill.exec.store.dfs.FileSelection;
@@ -53,14 +59,22 @@ public class FileSystemPartitionDescriptor extends AbstractPartitionDescriptor {
   private final String partitionLabel;
   private final int partitionLabelLength;
   private final Map<String, Integer> partitions = Maps.newHashMap();
-  private final EnumerableTableScan scanRel;
-  private final DynamicDrillTable table;
+  private final TableScan scanRel;
+  private final DrillTable table;
 
-  public FileSystemPartitionDescriptor(PlannerSettings settings, RelNode scanRel) {
+  public FileSystemPartitionDescriptor(PlannerSettings settings, TableScan scanRel) {
+    Preconditions.checkArgument(scanRel instanceof DrillScanRel || scanRel instanceof EnumerableTableScan);
     this.partitionLabel = settings.getFsPartitionColumnLabel();
     this.partitionLabelLength = partitionLabel.length();
-    this.scanRel = (EnumerableTableScan) scanRel;
-    table = scanRel.getTable().unwrap(DynamicDrillTable.class);
+    this.scanRel = scanRel;
+    DrillTable unwrap;
+    unwrap = scanRel.getTable().unwrap(DrillTable.class);
+    if (unwrap == null) {
+      unwrap = scanRel.getTable().unwrap(DrillTranslatableTable.class).getDrillTable();
+    }
+
+    table = unwrap;
+
     for(int i =0; i < 10; i++){
       partitions.put(partitionLabel + i, i);
     }
@@ -87,18 +101,18 @@ public class FileSystemPartitionDescriptor extends AbstractPartitionDescriptor {
     return MAX_NESTED_SUBDIRS;
   }
 
-  @Override
-  public GroupScan createNewGroupScan(List<String> newFiles) throws IOException {
-    /*
-    THIS NEEDS TO CHANGE. WE SHOULD RETURN A ENUMERABLETABLESCAN??
-    final FileSelection newFileSelection = new FileSelection(newFiles, getBaseTableLocation(), true);
-    final FileGroupScan newScan = ((FileGroupScan)scanRel.getGroupScan()).clone(newFileSelection);
-    return newScan;
-    */
-    return null;
-  }
-
-  public DynamicDrillTable getTable() {
+//  @Override
+//  public GroupScan createNewGroupScan(List<String> newFiles) throws IOException {
+//    if (scanRel instanceof DrillScanRel) {
+//      final FileSelection newFileSelection = new FileSelection(null, newFiles, getBaseTableLocation());
+//      final FileGroupScan newScan = ((FileGroupScan)((DrillScanRel)scanRel).getGroupScan()).clone(newFileSelection);
+//      return newScan;
+//    } else {
+//      throw new UnsupportedOperationException("Does not allow to get groupScan for EnumerableTableScan");
+//    }
+//  }
+
+  public DrillTable getTable() {
     return table;
   }
 
@@ -152,4 +166,37 @@ public class FileSystemPartitionDescriptor extends AbstractPartitionDescriptor {
     sublistsCreated = true;
   }
 
+  @Override
+  public TableScan createTableScan(List<String> newFiles) throws Exception {
+    if (scanRel instanceof DrillScanRel) {
+      final FileSelection newFileSelection = new FileSelection(null, newFiles, getBaseTableLocation());
+      final FileGroupScan newGroupScan = ((FileGroupScan)((DrillScanRel)scanRel).getGroupScan()).clone(newFileSelection);
+      return new DrillScanRel(scanRel.getCluster(),
+                      scanRel.getTraitSet().plus(DrillRel.DRILL_LOGICAL),
+                      scanRel.getTable(),
+                      newGroupScan,
+                      scanRel.getRowType(),
+                      ((DrillScanRel) scanRel).getColumns(),
+                      true /*filter pushdown*/);
+    } else if (scanRel instanceof EnumerableTableScan) {
+      return createNewTableScanFromSelection((EnumerableTableScan)scanRel, newFiles);
+    } else {
+      throw new UnsupportedOperationException("Only DrillScanRel and EnumerableTableScan is allowed!");
+    }
+  }
+
+  private TableScan createNewTableScanFromSelection(EnumerableTableScan oldScan, List<String> newFiles) {
+    final RelOptTableImpl t = (RelOptTableImpl) oldScan.getTable();
+    final FormatSelection formatSelection = (FormatSelection) table.getSelection();
+    final FileSelection newFileSelection = new FileSelection(null, newFiles, getBaseTableLocation());
+    final FormatSelection newFormatSelection = new FormatSelection(formatSelection.getFormat(), newFileSelection);
+    final DrillTranslatableTable newTable = new DrillTranslatableTable(
+            new DynamicDrillTable(table.getPlugin(), table.getStorageEngineName(),
+            table.getUserName(),
+            newFormatSelection));
+    final RelOptTableImpl newOptTableImpl = RelOptTableImpl.create(t.getRelOptSchema(), t.getRowType(), newTable);
+
+    return EnumerableTableScan.create(oldScan.getCluster(), newOptTableImpl);
+  }
+
 }

http://git-wip-us.apache.org/repos/asf/drill/blob/9b4008dc/exec/java-exec/src/main/java/org/apache/drill/exec/planner/ParquetPartitionDescriptor.java
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/planner/ParquetPartitionDescriptor.java b/exec/java-exec/src/main/java/org/apache/drill/exec/planner/ParquetPartitionDescriptor.java
index cda5a5e..81bcf03 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/planner/ParquetPartitionDescriptor.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/planner/ParquetPartitionDescriptor.java
@@ -17,11 +17,13 @@
  */
 package org.apache.drill.exec.planner;
 
+import org.apache.calcite.rel.core.TableScan;
 import org.apache.calcite.util.BitSets;
 import org.apache.drill.common.expression.SchemaPath;
 import org.apache.drill.common.types.TypeProtos;
 import org.apache.drill.exec.physical.base.FileGroupScan;
 import org.apache.drill.exec.physical.base.GroupScan;
+import org.apache.drill.exec.planner.logical.DrillRel;
 import org.apache.drill.exec.planner.logical.DrillScanRel;
 import org.apache.drill.exec.planner.physical.PlannerSettings;
 import org.apache.drill.exec.store.dfs.FileSelection;
@@ -78,8 +80,7 @@ public class ParquetPartitionDescriptor extends AbstractPartitionDescriptor {
     return partitionColumns.size();
   }
 
-  @Override
-  public GroupScan createNewGroupScan(List<String> newFiles) throws IOException {
+  private GroupScan createNewGroupScan(List<String> newFiles) throws IOException {
     final FileSelection newSelection = FileSelection.create(null, newFiles, getBaseTableLocation());
     final FileGroupScan newScan = ((FileGroupScan)scanRel.getGroupScan()).clone(newSelection);
     return newScan;
@@ -128,4 +129,16 @@ public class ParquetPartitionDescriptor extends AbstractPartitionDescriptor {
     sublistsCreated = true;
   }
 
+  @Override
+  public TableScan createTableScan(List<String> newFiles) throws Exception {
+    final GroupScan newGroupScan = createNewGroupScan(newFiles);
+
+    return new DrillScanRel(scanRel.getCluster(),
+        scanRel.getTraitSet().plus(DrillRel.DRILL_LOGICAL),
+        scanRel.getTable(),
+        newGroupScan,
+        scanRel.getRowType(),
+        scanRel.getColumns(),
+        true /*filter pushdown*/);
+  }
 }

http://git-wip-us.apache.org/repos/asf/drill/blob/9b4008dc/exec/java-exec/src/main/java/org/apache/drill/exec/planner/PartitionDescriptor.java
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/planner/PartitionDescriptor.java b/exec/java-exec/src/main/java/org/apache/drill/exec/planner/PartitionDescriptor.java
index 726d8bc..dd3b084 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/planner/PartitionDescriptor.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/planner/PartitionDescriptor.java
@@ -17,6 +17,7 @@
  */
 package org.apache.drill.exec.planner;
 
+import org.apache.calcite.rel.core.TableScan;
 import org.apache.drill.common.expression.SchemaPath;
 import org.apache.drill.common.types.TypeProtos;
 import org.apache.drill.exec.physical.base.GroupScan;
@@ -53,8 +54,6 @@ public interface PartitionDescriptor extends Iterable<List<PartitionLocation>> {
   // Maximum level of partition nesting/ hierarchy supported
   public int getMaxHierarchyLevel();
 
-  public GroupScan createNewGroupScan(List<String> newFiles) throws Exception;
-
   /**
    * Method creates an in memory representation of all the partitions. For each level of partitioning we
    * will create a value vector which this method will populate for all the partitions with the values of the
@@ -74,4 +73,13 @@ public interface PartitionDescriptor extends Iterable<List<PartitionLocation>> {
    * @return
    */
   TypeProtos.MajorType getVectorType(SchemaPath column, PlannerSettings plannerSettings);
+
+  /**
+   * Methods create a new TableScan rel node, given the lists of new partitions or new files to SCAN.
+   * @param newPartitions
+   * @return
+   * @throws Exception
+   */
+  public TableScan createTableScan(List<String> newPartitions) throws Exception;
+
 }

http://git-wip-us.apache.org/repos/asf/drill/blob/9b4008dc/exec/java-exec/src/main/java/org/apache/drill/exec/planner/logical/DrillPushProjIntoScan.java
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/planner/logical/DrillPushProjIntoScan.java b/exec/java-exec/src/main/java/org/apache/drill/exec/planner/logical/DrillPushProjIntoScan.java
index 1fd1cd7..33c840b 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/planner/logical/DrillPushProjIntoScan.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/planner/logical/DrillPushProjIntoScan.java
@@ -54,8 +54,14 @@ public class DrillPushProjIntoScan extends RelOptRule {
     try {
       ProjectPushInfo columnInfo = PrelUtil.getColumns(scan.getRowType(), proj.getProjects());
 
+      // get DrillTable, either wrapped in RelOptTable, or DrillTranslatableTable.
+      DrillTable table = scan.getTable().unwrap(DrillTable.class);
+      if (table == null) {
+        table = scan.getTable().unwrap(DrillTranslatableTable.class).getDrillTable();
+      }
+
       if (columnInfo == null || columnInfo.isStarQuery() //
-          || !scan.getTable().unwrap(DrillTable.class) //
+          || !table //
           .getGroupScan().canPushdownProjects(columnInfo.columns)) {
         return;
       }

http://git-wip-us.apache.org/repos/asf/drill/blob/9b4008dc/exec/java-exec/src/main/java/org/apache/drill/exec/planner/logical/DrillRuleSets.java
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/planner/logical/DrillRuleSets.java b/exec/java-exec/src/main/java/org/apache/drill/exec/planner/logical/DrillRuleSets.java
index 6f1f995..d9609d2 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/planner/logical/DrillRuleSets.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/planner/logical/DrillRuleSets.java
@@ -204,8 +204,8 @@ public class DrillRuleSets {
   public static RuleSet getPruneScanRules(OptimizerRulesContext optimizerRulesContext) {
     final ImmutableSet<RelOptRule> pruneRules = ImmutableSet.<RelOptRule>builder()
         .add(
-            PruneScanRule.getFilterOnProject(optimizerRulesContext),
-            PruneScanRule.getFilterOnScan(optimizerRulesContext),
+            PruneScanRule.getDirFilterOnProject(optimizerRulesContext),
+            PruneScanRule.getDirFilterOnScan(optimizerRulesContext),
             ParquetPruneScanRule.getFilterOnProjectParquet(optimizerRulesContext),
             ParquetPruneScanRule.getFilterOnScanParquet(optimizerRulesContext)
         )
@@ -214,6 +214,23 @@ public class DrillRuleSets {
     return new DrillRuleSet(pruneRules);
   }
 
+  /**
+   *  Get an immutable list of directory-based partition pruing rules that will be used in Calcite logical planning.
+   * @param optimizerRulesContext
+   * @return
+   */
+  public static RuleSet getDirPruneScanRules(OptimizerRulesContext optimizerRulesContext) {
+    final ImmutableSet<RelOptRule> pruneRules = ImmutableSet.<RelOptRule>builder()
+        .add(
+            PruneScanRule.getDirFilterOnProject(optimizerRulesContext),
+            PruneScanRule.getDirFilterOnScan(optimizerRulesContext)
+        )
+        .build();
+
+    return new DrillRuleSet(pruneRules);
+
+  }
+
   // Ruleset for join permutation, used only in VolcanoPlanner.
   public static RuleSet getJoinPermRules(OptimizerRulesContext optimizerRulesContext) {
     return new DrillRuleSet(ImmutableSet.<RelOptRule> builder().add( //

http://git-wip-us.apache.org/repos/asf/drill/blob/9b4008dc/exec/java-exec/src/main/java/org/apache/drill/exec/planner/logical/partition/ParquetPruneScanRule.java
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/planner/logical/partition/ParquetPruneScanRule.java b/exec/java-exec/src/main/java/org/apache/drill/exec/planner/logical/partition/ParquetPruneScanRule.java
index d92d2b0..f5dbb9d 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/planner/logical/partition/ParquetPruneScanRule.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/planner/logical/partition/ParquetPruneScanRule.java
@@ -20,6 +20,7 @@ package org.apache.drill.exec.planner.logical.partition;
 import org.apache.calcite.plan.RelOptRule;
 import org.apache.calcite.plan.RelOptRuleCall;
 import org.apache.calcite.rel.RelNode;
+import org.apache.calcite.rel.core.TableScan;
 import org.apache.drill.exec.ops.OptimizerRulesContext;
 import org.apache.drill.exec.physical.base.FileGroupScan;
 import org.apache.drill.exec.physical.base.GroupScan;
@@ -42,13 +43,13 @@ public class ParquetPruneScanRule {
         optimizerRulesContext) {
 
       @Override
-      public PartitionDescriptor getPartitionDescriptor(PlannerSettings settings, RelNode scanRel) {
+      public PartitionDescriptor getPartitionDescriptor(PlannerSettings settings, TableScan scanRel) {
         return new ParquetPartitionDescriptor(settings, (DrillScanRel) scanRel);
       }
 
       @Override
       public boolean matches(RelOptRuleCall call) {
-        final DrillScanRel scan = (DrillScanRel) call.rel(2);
+        final DrillScanRel scan = call.rel(2);
         GroupScan groupScan = scan.getGroupScan();
         // this rule is applicable only for parquet based partition pruning
         if (PrelUtil.getPlannerSettings(scan.getCluster().getPlanner()).isHepPartitionPruningEnabled()) {
@@ -60,9 +61,9 @@ public class ParquetPruneScanRule {
 
       @Override
       public void onMatch(RelOptRuleCall call) {
-        final DrillFilterRel filterRel = (DrillFilterRel) call.rel(0);
-        final DrillProjectRel projectRel = (DrillProjectRel) call.rel(1);
-        final DrillScanRel scanRel = (DrillScanRel) call.rel(2);
+        final DrillFilterRel filterRel =  call.rel(0);
+        final DrillProjectRel projectRel = call.rel(1);
+        final DrillScanRel scanRel =  call.rel(2);
         doOnMatch(call, filterRel, projectRel, scanRel);
       }
     };
@@ -74,13 +75,13 @@ public class ParquetPruneScanRule {
         "PruneScanRule:Filter_On_Scan_Parquet", optimizerRulesContext) {
 
       @Override
-      public PartitionDescriptor getPartitionDescriptor(PlannerSettings settings, RelNode scanRel) {
+      public PartitionDescriptor getPartitionDescriptor(PlannerSettings settings, TableScan scanRel) {
         return new ParquetPartitionDescriptor(settings, (DrillScanRel) scanRel);
       }
 
       @Override
       public boolean matches(RelOptRuleCall call) {
-        final DrillScanRel scan = (DrillScanRel) call.rel(1);
+        final DrillScanRel scan = call.rel(1);
         GroupScan groupScan = scan.getGroupScan();
         // this rule is applicable only for parquet based partition pruning
         if (PrelUtil.getPlannerSettings(scan.getCluster().getPlanner()).isHepPartitionPruningEnabled()) {
@@ -92,8 +93,8 @@ public class ParquetPruneScanRule {
 
       @Override
       public void onMatch(RelOptRuleCall call) {
-        final DrillFilterRel filterRel = (DrillFilterRel) call.rel(0);
-        final DrillScanRel scanRel = (DrillScanRel) call.rel(1);
+        final DrillFilterRel filterRel = call.rel(0);
+        final DrillScanRel scanRel = call.rel(1);
         doOnMatch(call, filterRel, null, scanRel);
       }
     };

http://git-wip-us.apache.org/repos/asf/drill/blob/9b4008dc/exec/java-exec/src/main/java/org/apache/drill/exec/planner/logical/partition/PruneScanRule.java
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/planner/logical/partition/PruneScanRule.java b/exec/java-exec/src/main/java/org/apache/drill/exec/planner/logical/partition/PruneScanRule.java
index bfd6597..aefd247 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/planner/logical/partition/PruneScanRule.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/planner/logical/partition/PruneScanRule.java
@@ -25,8 +25,12 @@ import java.util.concurrent.TimeUnit;
 
 import com.google.common.base.Stopwatch;
 import org.apache.calcite.adapter.enumerable.EnumerableTableScan;
+import org.apache.calcite.jdbc.CalciteAbstractSchema;
+import org.apache.calcite.jdbc.CalciteSchema;
 import org.apache.calcite.prepare.RelOptTableImpl;
+import org.apache.calcite.rel.core.Filter;
 import org.apache.calcite.rel.core.Project;
+import org.apache.calcite.rel.core.TableScan;
 import org.apache.calcite.rel.logical.LogicalFilter;
 import org.apache.calcite.rex.RexUtil;
 import org.apache.calcite.util.BitSets;
@@ -55,6 +59,8 @@ import org.apache.drill.exec.planner.logical.DrillParseContext;
 import org.apache.drill.exec.planner.logical.DrillProjectRel;
 import org.apache.drill.exec.planner.logical.DrillRel;
 import org.apache.drill.exec.planner.logical.DrillScanRel;
+import org.apache.drill.exec.planner.logical.DrillTable;
+import org.apache.drill.exec.planner.logical.DrillTranslatableTable;
 import org.apache.drill.exec.planner.logical.DynamicDrillTable;
 import org.apache.drill.exec.planner.logical.RelOptHelper;
 import org.apache.drill.exec.planner.physical.PlannerSettings;
@@ -64,6 +70,7 @@ import org.apache.drill.exec.record.VectorContainer;
 import org.apache.drill.exec.store.StoragePluginOptimizerRule;
 import org.apache.drill.exec.store.dfs.FileSelection;
 import org.apache.drill.exec.store.dfs.FormatSelection;
+import org.apache.drill.exec.store.parquet.ParquetFileSelection;
 import org.apache.drill.exec.vector.NullableBitVector;
 import org.apache.calcite.rel.RelNode;
 import org.apache.calcite.plan.RelOptRule;
@@ -87,282 +94,64 @@ public abstract class PruneScanRule extends StoragePluginOptimizerRule {
     this.optimizerContext = optimizerContext;
   }
 
-  public static final RelOptRule getFilterOnProject(OptimizerRulesContext optimizerRulesContext) {
-    return new PruneScanRule(
-        RelOptHelper.some(LogicalFilter.class, RelOptHelper.some(Project.class, RelOptHelper.any(EnumerableTableScan.class))),
-        "PruneScanRule:Filter_On_Project",
-        optimizerRulesContext) {
-
-      @Override
-      public PartitionDescriptor getPartitionDescriptor(PlannerSettings settings, RelNode scanRel) {
-        return new FileSystemPartitionDescriptor(settings, scanRel);
-      }
-
-      @Override
-      public boolean matches(RelOptRuleCall call) {
-        /*
-        final DrillScanRel scan = (DrillScanRel) call.rel(2);
-        GroupScan groupScan = scan.getGroupScan();
-        // this rule is applicable only for dfs based partition pruning
-        return groupScan instanceof FileGroupScan && groupScan.supportsPartitionFilterPushdown();
-        */
-        return true;
-      }
-
-      @Override
-      public void onMatch(RelOptRuleCall call) {
-        final LogicalFilter filterRel = call.rel(0);
-        final Project projectRel = call.rel(1);
-        final EnumerableTableScan scanRel = call.rel(2);
-        doOnMatchLogical(call, filterRel, projectRel, scanRel);
-      }
-    };
-  }
-
-  public static final RelOptRule getFilterOnScan(OptimizerRulesContext optimizerRulesContext) {
-    return new PruneScanRule(
-        RelOptHelper.some(LogicalFilter.class, RelOptHelper.any(EnumerableTableScan.class)),
-        "PruneScanRule:Filter_On_Scan", optimizerRulesContext) {
-
-      @Override
-      public PartitionDescriptor getPartitionDescriptor(PlannerSettings settings, RelNode scanRel) {
-        return new FileSystemPartitionDescriptor(settings, scanRel);
-      }
-
-      @Override
-      public boolean matches(RelOptRuleCall call) {
-        /* final DrillScanRel scan = (DrillScanRel) call.rel(1);
-        GroupScan groupScan = scan.getGroupScan();
-        // this rule is applicable only for dfs based partition pruning
-        return groupScan instanceof FileGroupScan && groupScan.supportsPartitionFilterPushdown();
-        */
-        return true;
-      }
-
-      @Override
-      public void onMatch(RelOptRuleCall call) {
-        final LogicalFilter filterRel = call.rel(0);
-        final EnumerableTableScan scanRel = call.rel(1);
-        doOnMatchLogical(call, filterRel, null, scanRel);
-      }
-    };
-  }
-
-  // TODO: Combine the doOnMatch and doOnMatchLogical
-  protected void doOnMatchLogical(RelOptRuleCall call, LogicalFilter filterRel, Project projectRel, EnumerableTableScan scanRel) {
-    final String pruningClassName = getClass().getName();
-    logger.info("Beginning partition pruning, pruning class: {}", pruningClassName);
-    Stopwatch totalPruningTime = new Stopwatch();
-    totalPruningTime.start();
-
-
-    final PlannerSettings settings = PrelUtil.getPlannerSettings(call.getPlanner());
-    PartitionDescriptor descriptor = getPartitionDescriptor(settings, scanRel);
-    final BufferAllocator allocator = optimizerContext.getAllocator();
-
-
-    RexNode condition = null;
-    if (projectRel == null) {
-      condition = filterRel.getCondition();
-    } else {
-      // get the filter as if it were below the projection.
-      condition = RelOptUtil.pushFilterPastProject(filterRel.getCondition(), projectRel);
+  private static class DirPruneScanFilterOnProjectRule extends PruneScanRule {
+    public DirPruneScanFilterOnProjectRule(OptimizerRulesContext optimizerRulesContext) {
+      super(RelOptHelper.some(Filter.class, RelOptHelper.some(Project.class, RelOptHelper.any(TableScan.class))), "DirPruneScanRule:Filter_On_Project", optimizerRulesContext);
     }
 
-    RewriteAsBinaryOperators visitor = new RewriteAsBinaryOperators(true, filterRel.getCluster().getRexBuilder());
-    condition = condition.accept(visitor);
-
-    Map<Integer, String> fieldNameMap = Maps.newHashMap();
-    List<String> fieldNames = scanRel.getRowType().getFieldNames();
-    BitSet columnBitset = new BitSet();
-    BitSet partitionColumnBitSet = new BitSet();
-
-    int relColIndex = 0;
-    for (String field : fieldNames) {
-      final Integer partitionIndex = descriptor.getIdIfValid(field);
-      if (partitionIndex != null) {
-        fieldNameMap.put(partitionIndex, field);
-        partitionColumnBitSet.set(partitionIndex);
-        columnBitset.set(relColIndex);
-      }
-      relColIndex++;
+    @Override
+    public PartitionDescriptor getPartitionDescriptor(PlannerSettings settings, TableScan scanRel) {
+      return new FileSystemPartitionDescriptor(settings, scanRel);
     }
 
-    if (partitionColumnBitSet.isEmpty()) {
-      logger.info("No partition columns are projected from the scan..continue. " +
-          "Total pruning elapsed time: {} ms", totalPruningTime.elapsed(TimeUnit.MILLISECONDS));
-      return;
+    @Override
+    public boolean matches(RelOptRuleCall call) {
+      final TableScan scan = call.rel(2);
+      return isQualifiedDirPruning(scan);
     }
 
-    // stop watch to track how long we spend in different phases of pruning
-    Stopwatch miscTimer = new Stopwatch();
-
-    // track how long we spend building the filter tree
-    miscTimer.start();
-
-    FindPartitionConditions c = new FindPartitionConditions(columnBitset, filterRel.getCluster().getRexBuilder());
-    c.analyze(condition);
-    RexNode pruneCondition = c.getFinalCondition();
-
-    logger.info("Total elapsed time to build and analyze filter tree: {} ms",
-        miscTimer.elapsed(TimeUnit.MILLISECONDS));
-    miscTimer.reset();
-
-    if (pruneCondition == null) {
-      logger.info("No conditions were found eligible for partition pruning." +
-          "Total pruning elapsed time: {} ms", totalPruningTime.elapsed(TimeUnit.MILLISECONDS));
-      return;
+    @Override
+    public void onMatch(RelOptRuleCall call) {
+      final Filter filterRel = call.rel(0);
+      final Project projectRel = call.rel(1);
+      final TableScan scanRel = call.rel(2);
+      doOnMatch(call, filterRel, projectRel, scanRel);
     }
+  }
 
-    // set up the partitions
-    List<String> newFiles = Lists.newArrayList();
-    long numTotal = 0; // total number of partitions
-    int batchIndex = 0;
-    String firstLocation = null;
-    LogicalExpression materializedExpr = null;
-
-    // Outer loop: iterate over a list of batches of PartitionLocations
-    for (List<PartitionLocation> partitions : descriptor) {
-      numTotal += partitions.size();
-      logger.debug("Evaluating partition pruning for batch {}", batchIndex);
-      if (batchIndex == 0) { // save the first location in case everything is pruned
-        firstLocation = partitions.get(0).getEntirePartitionLocation();
-      }
-      final NullableBitVector output = new NullableBitVector(MaterializedField.create("", Types.optional(MinorType.BIT)), allocator);
-      final VectorContainer container = new VectorContainer();
-
-      try {
-        final ValueVector[] vectors = new ValueVector[descriptor.getMaxHierarchyLevel()];
-        for (int partitionColumnIndex : BitSets.toIter(partitionColumnBitSet)) {
-          SchemaPath column = SchemaPath.getSimplePath(fieldNameMap.get(partitionColumnIndex));
-          MajorType type = descriptor.getVectorType(column, settings);
-          MaterializedField field = MaterializedField.create(column, type);
-          ValueVector v = TypeHelper.getNewVector(field, allocator);
-          v.allocateNew();
-          vectors[partitionColumnIndex] = v;
-          container.add(v);
-        }
-
-        // track how long we spend populating partition column vectors
-        miscTimer.start();
-
-        // populate partition vectors.
-        descriptor.populatePartitionVectors(vectors, partitions, partitionColumnBitSet, fieldNameMap);
-
-        logger.info("Elapsed time to populate partitioning column vectors: {} ms within batchIndex: {}",
-            miscTimer.elapsed(TimeUnit.MILLISECONDS), batchIndex);
-        miscTimer.reset();
-
-        // materialize the expression; only need to do this once
-        if (batchIndex == 0) {
-          materializedExpr = materializePruneExpr(pruneCondition, settings, scanRel, container);
-          if (materializedExpr == null) {
-            // continue without partition pruning; no need to log anything here since
-            // materializePruneExpr logs it already
-            logger.info("Total pruning elapsed time: {} ms",
-                totalPruningTime.elapsed(TimeUnit.MILLISECONDS));
-            return;
-          }
-        }
-
-        output.allocateNew(partitions.size());
-
-        // start the timer to evaluate how long we spend in the interpreter evaluation
-        miscTimer.start();
-
-        InterpreterEvaluator.evaluate(partitions.size(), optimizerContext, container, output, materializedExpr);
-
-        logger.info("Elapsed time in interpreter evaluation: {} ms within batchIndex: {}",
-            miscTimer.elapsed(TimeUnit.MILLISECONDS), batchIndex);
-        miscTimer.reset();
-
-        int recordCount = 0;
-        int qualifiedCount = 0;
-
-        // Inner loop: within each batch iterate over the PartitionLocations
-        for(PartitionLocation part: partitions){
-          if(!output.getAccessor().isNull(recordCount) && output.getAccessor().get(recordCount) == 1){
-            newFiles.add(part.getEntirePartitionLocation());
-            qualifiedCount++;
-          }
-          recordCount++;
-        }
-        logger.debug("Within batch {}: total records: {}, qualified records: {}", batchIndex, recordCount, qualifiedCount);
-        batchIndex++;
-      } catch (Exception e) {
-        logger.warn("Exception while trying to prune partition.", e);
-        logger.info("Total pruning elapsed time: {} ms", totalPruningTime.elapsed(TimeUnit.MILLISECONDS));
-        return; // continue without partition pruning
-      } finally {
-        container.clear();
-        if (output != null) {
-          output.clear();
-        }
-      }
+  private static class DirPruneScanFilterOnScanRule extends PruneScanRule {
+    public DirPruneScanFilterOnScanRule(OptimizerRulesContext optimizerRulesContext) {
+      super(RelOptHelper.some(Filter.class, RelOptHelper.any(TableScan.class)), "DirPruneScanRule:Filter_On_Scan", optimizerRulesContext);
     }
 
-    try {
-
-      boolean canDropFilter = true;
-
-      if (newFiles.isEmpty()) {
-        assert firstLocation != null;
-        newFiles.add(firstLocation);
-        canDropFilter = false;
-      }
-
-      if (newFiles.size() == numTotal) {
-        logger.info("No partitions were eligible for pruning");
-        return;
-      }
-
-      logger.info("Pruned {} partitions down to {}", numTotal, newFiles.size());
-
-      List<RexNode> conjuncts = RelOptUtil.conjunctions(condition);
-      List<RexNode> pruneConjuncts = RelOptUtil.conjunctions(pruneCondition);
-      conjuncts.removeAll(pruneConjuncts);
-      RexNode newCondition = RexUtil.composeConjunction(filterRel.getCluster().getRexBuilder(), conjuncts, false);
-
-      RewriteCombineBinaryOperators reverseVisitor = new RewriteCombineBinaryOperators(true, filterRel.getCluster().getRexBuilder());
-
-      condition = condition.accept(reverseVisitor);
-      pruneCondition = pruneCondition.accept(reverseVisitor);
-
-      RelOptTableImpl t = (RelOptTableImpl) scanRel.getTable();
-      DynamicDrillTable oldTable = ((FileSystemPartitionDescriptor) descriptor).getTable();
-      FormatSelection formatSelection = (FormatSelection) oldTable.getSelection();
-      FileSelection oldFileSelection = formatSelection.getSelection();
-      FileSelection newFileSelection = new FileSelection(newFiles, oldFileSelection.selectionRoot, oldFileSelection.getParquetMetadata(), oldFileSelection.getFileStatuses());
-      FormatSelection newFormatSelection = new FormatSelection(formatSelection.getFormat(), newFileSelection);
-      DynamicDrillTable newTable = new DynamicDrillTable(oldTable.getPlugin(), oldTable.getStorageEngineName(),
-          oldTable.getUserName(), newFormatSelection);
-      RelOptTableImpl newOptTableImpl = RelOptTableImpl.create(t.getRelOptSchema(), t.getRowType(), newTable);
-
-      // TODO: The new scan should come from the PartitionDescriptor
-      // TODO: Update the PartitionDescriptor to return ScanRel instead of the GroupScan 
-      EnumerableTableScan newScan = EnumerableTableScan.create(scanRel.getCluster(), newOptTableImpl);
+    @Override
+    public PartitionDescriptor getPartitionDescriptor(PlannerSettings settings, TableScan scanRel) {
+      return new FileSystemPartitionDescriptor(settings, scanRel);
+    }
 
-      RelNode inputRel = newScan;
+    @Override
+    public boolean matches(RelOptRuleCall call) {
+      final TableScan scan = call.rel(1);
+      return isQualifiedDirPruning(scan);
+    }
 
-      if (projectRel != null) {
-        inputRel = projectRel.copy(projectRel.getTraitSet(), Collections.singletonList(inputRel));
-      }
+    @Override
+    public void onMatch(RelOptRuleCall call) {
+      final Filter filterRel = call.rel(0);
+      final TableScan scanRel = call.rel(1);
+      doOnMatch(call, filterRel, null, scanRel);
+    }
+  }
 
-      if (newCondition.isAlwaysTrue() && canDropFilter) {
-        call.transformTo(inputRel);
-      } else {
-        final RelNode newFilter = filterRel.copy(filterRel.getTraitSet(), Collections.singletonList(inputRel));
-        call.transformTo(newFilter);
-      }
+  public static final RelOptRule getDirFilterOnProject(OptimizerRulesContext optimizerRulesContext) {
+    return new DirPruneScanFilterOnProjectRule(optimizerRulesContext);
+  }
 
-    } catch (Exception e) {
-      logger.warn("Exception while using the pruned partitions.", e);
-    } finally {
-      logger.info("Total pruning elapsed time: {} ms", totalPruningTime.elapsed(TimeUnit.MILLISECONDS));
-    }
+  public static final RelOptRule getDirFilterOnScan(OptimizerRulesContext optimizerRulesContext) {
+    return new DirPruneScanFilterOnScanRule(optimizerRulesContext);
   }
 
-  protected void doOnMatch(RelOptRuleCall call, DrillFilterRel filterRel, DrillProjectRel projectRel, DrillScanRel scanRel) {
+  protected void doOnMatch(RelOptRuleCall call, Filter filterRel, Project projectRel, TableScan scanRel) {
     final String pruningClassName = getClass().getName();
     logger.info("Beginning partition pruning, pruning class: {}", pruningClassName);
     Stopwatch totalPruningTime = new Stopwatch();
@@ -373,7 +162,6 @@ public abstract class PruneScanRule extends StoragePluginOptimizerRule {
     PartitionDescriptor descriptor = getPartitionDescriptor(settings, scanRel);
     final BufferAllocator allocator = optimizerContext.getAllocator();
 
-
     RexNode condition = null;
     if (projectRel == null) {
       condition = filterRel.getCondition();
@@ -541,16 +329,7 @@ public abstract class PruneScanRule extends StoragePluginOptimizerRule {
       condition = condition.accept(reverseVisitor);
       pruneCondition = pruneCondition.accept(reverseVisitor);
 
-      final DrillScanRel newScanRel =
-          new DrillScanRel(scanRel.getCluster(),
-              scanRel.getTraitSet().plus(DrillRel.DRILL_LOGICAL),
-              scanRel.getTable(),
-              descriptor.createNewGroupScan(newFiles),
-              scanRel.getRowType(),
-              scanRel.getColumns(),
-              true /*filter pushdown*/);
-
-      RelNode inputRel = newScanRel;
+      RelNode inputRel = descriptor.createTableScan(newFiles);
 
       if (projectRel != null) {
         inputRel = projectRel.copy(projectRel.getTraitSet(), Collections.singletonList(inputRel));
@@ -602,5 +381,28 @@ public abstract class PruneScanRule extends StoragePluginOptimizerRule {
     return optimizerContext;
   }
 
-  public abstract PartitionDescriptor getPartitionDescriptor(PlannerSettings settings, RelNode scanRel);
+  public abstract PartitionDescriptor getPartitionDescriptor(PlannerSettings settings, TableScan scanRel);
+
+  private static boolean isQualifiedDirPruning(final TableScan scan) {
+    if (scan instanceof EnumerableTableScan) {
+      DrillTable drillTable;
+      drillTable = scan.getTable().unwrap(DrillTable.class);
+      if (drillTable == null) {
+        drillTable = scan.getTable().unwrap(DrillTranslatableTable.class).getDrillTable();
+      }
+      final Object selection = drillTable.getSelection();
+      if (selection instanceof FormatSelection
+          && ((FormatSelection)selection).supportDirPruning()) {
+        return true;  // Do directory-based pruning in Calcite logical
+      } else {
+        return false; // Do not do directory-based pruning in Calcite logical
+      }
+    } else if (scan instanceof DrillScanRel) {
+      final GroupScan groupScan = ((DrillScanRel) scan).getGroupScan();
+      // this rule is applicable only for dfs based partition pruning in Drill Logical
+      return groupScan instanceof FileGroupScan && groupScan.supportsPartitionFilterPushdown() && !((DrillScanRel)scan).partitionFilterPushdown();
+    }
+    return false;
+  }
+
 }

http://git-wip-us.apache.org/repos/asf/drill/blob/9b4008dc/exec/java-exec/src/main/java/org/apache/drill/exec/planner/sql/handlers/DefaultSqlHandler.java
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/planner/sql/handlers/DefaultSqlHandler.java b/exec/java-exec/src/main/java/org/apache/drill/exec/planner/sql/handlers/DefaultSqlHandler.java
index 686f7d7..d6bdc78 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/planner/sql/handlers/DefaultSqlHandler.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/planner/sql/handlers/DefaultSqlHandler.java
@@ -534,6 +534,15 @@ public class DefaultSqlHandler extends AbstractSqlHandler {
 
 
   private RelNode doLogicalPlanning(RelNode relNode) throws RelConversionException, SqlUnsupportedException {
+    // 1. Call HepPlanner with directory-based partition pruning, in Calcite logical rel
+    // Partition pruning .
+    ImmutableSet<RelOptRule> dirPruneScanRules = ImmutableSet.<RelOptRule>builder()
+        .addAll(DrillRuleSets.getDirPruneScanRules(context))
+        .build();
+
+    relNode = doHepPlan(relNode, dirPruneScanRules, HepMatchOrder.BOTTOM_UP);
+    log("Post-Dir-Pruning", relNode, logger);
+
     if (! context.getPlannerSettings().isHepOptEnabled()) {
       return planner.transform(DrillSqlWorker.LOGICAL_RULES, relNode.getTraitSet().plus(DrillRel.DRILL_LOGICAL), relNode);
     } else {

http://git-wip-us.apache.org/repos/asf/drill/blob/9b4008dc/exec/java-exec/src/main/java/org/apache/drill/exec/store/dfs/FileSelection.java
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/store/dfs/FileSelection.java b/exec/java-exec/src/main/java/org/apache/drill/exec/store/dfs/FileSelection.java
index 0d49b5c..bc3cef3 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/store/dfs/FileSelection.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/store/dfs/FileSelection.java
@@ -52,7 +52,7 @@ public class FileSelection {
    * @param files  list of files
    * @param selectionRoot  root path for selections
    */
-  protected FileSelection(final List<FileStatus> statuses, final List<String> files, final String selectionRoot) {
+  public FileSelection(final List<FileStatus> statuses, final List<String> files, final String selectionRoot) {
     this.statuses = statuses;
     this.files = files;
     this.selectionRoot = Preconditions.checkNotNull(selectionRoot);
@@ -246,4 +246,8 @@ public class FileSelection {
     return statuses;
   }
 
+  public boolean supportDirPrunig() {
+    return true;
+  }
+
 }

http://git-wip-us.apache.org/repos/asf/drill/blob/9b4008dc/exec/java-exec/src/main/java/org/apache/drill/exec/store/dfs/FormatSelection.java
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/store/dfs/FormatSelection.java b/exec/java-exec/src/main/java/org/apache/drill/exec/store/dfs/FormatSelection.java
index 4473c5c..f802fb4 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/store/dfs/FormatSelection.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/store/dfs/FormatSelection.java
@@ -61,4 +61,8 @@ public class FormatSelection {
     return selection;
   }
 
+  @JsonIgnore
+  public boolean supportDirPruning() {
+    return selection.supportDirPrunig();
+  }
 }

http://git-wip-us.apache.org/repos/asf/drill/blob/9b4008dc/exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet/ParquetFileSelection.java
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet/ParquetFileSelection.java b/exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet/ParquetFileSelection.java
index 33dccd6..93201bb 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet/ParquetFileSelection.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet/ParquetFileSelection.java
@@ -59,4 +59,8 @@ public class ParquetFileSelection extends FileSelection {
     return new ParquetFileSelection(selection, metadata);
   }
 
+  @Override
+  public boolean supportDirPrunig() {
+    return false;
+  }
 }

http://git-wip-us.apache.org/repos/asf/drill/blob/9b4008dc/exec/java-exec/src/test/java/org/apache/drill/TestExampleQueries.java
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/test/java/org/apache/drill/TestExampleQueries.java b/exec/java-exec/src/test/java/org/apache/drill/TestExampleQueries.java
index 0ca6bb9..97df2ee 100644
--- a/exec/java-exec/src/test/java/org/apache/drill/TestExampleQueries.java
+++ b/exec/java-exec/src/test/java/org/apache/drill/TestExampleQueries.java
@@ -1195,8 +1195,4 @@ public class TestExampleQueries extends BaseTestQuery {
         .run();
   }
 
-  @Test
-  public void t() throws Exception {
-    test("explain plan for select a from dfs.tmp.foo where dir0 = 1");
-  }
 }
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/drill/blob/9b4008dc/exec/java-exec/src/test/java/org/apache/drill/TestPartitionFilter.java
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/test/java/org/apache/drill/TestPartitionFilter.java b/exec/java-exec/src/test/java/org/apache/drill/TestPartitionFilter.java
index ba70788..e5d6603 100644
--- a/exec/java-exec/src/test/java/org/apache/drill/TestPartitionFilter.java
+++ b/exec/java-exec/src/test/java/org/apache/drill/TestPartitionFilter.java
@@ -345,4 +345,20 @@ public class TestPartitionFilter extends PlanTestBase {
     testIncludeFilter(query, 1, "Filter", 1);
   }
 
+  @Test
+  public void testLogicalDirPruning() throws Exception {
+    // 1995/Q1 contains one valid parquet, while 1996/Q1 contains bad format parquet.
+    // If dir pruning happens in logical, the query will run fine, since the bad parquet has been pruned before we build ParquetGroupScan.
+    String query = String.format("select dir0, o_custkey from dfs_test.`%s/multilevel/parquetWithBadFormat` where dir0=1995", TEST_RES_PATH);
+    testExcludeFilter(query, 1, "Filter", 10);
+  }
+
+  @Test
+  public void testLogicalDirPruning2() throws Exception {
+    // 1995/Q1 contains one valid parquet, while 1996/Q1 contains bad format parquet.
+    // If dir pruning happens in logical, the query will run fine, since the bad parquet has been pruned before we build ParquetGroupScan.
+    String query = String.format("select dir0, o_custkey from dfs_test.`%s/multilevel/parquetWithBadFormat` where dir0=1995 and o_custkey > 0", TEST_RES_PATH);
+    testIncludeFilter(query, 1, "Filter", 10);
+  }
+
 }
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/drill/blob/9b4008dc/exec/java-exec/src/test/resources/multilevel/parquetWithBadFormat/1995/Q1/orders_95_q1.parquet
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/test/resources/multilevel/parquetWithBadFormat/1995/Q1/orders_95_q1.parquet b/exec/java-exec/src/test/resources/multilevel/parquetWithBadFormat/1995/Q1/orders_95_q1.parquet
new file mode 100644
index 0000000..93514c4
Binary files /dev/null and b/exec/java-exec/src/test/resources/multilevel/parquetWithBadFormat/1995/Q1/orders_95_q1.parquet differ

http://git-wip-us.apache.org/repos/asf/drill/blob/9b4008dc/exec/java-exec/src/test/resources/multilevel/parquetWithBadFormat/1996/Q1/badFormat.parquet
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/test/resources/multilevel/parquetWithBadFormat/1996/Q1/badFormat.parquet b/exec/java-exec/src/test/resources/multilevel/parquetWithBadFormat/1996/Q1/badFormat.parquet
new file mode 100644
index 0000000..f62d37d
--- /dev/null
+++ b/exec/java-exec/src/test/resources/multilevel/parquetWithBadFormat/1996/Q1/badFormat.parquet
@@ -0,0 +1 @@
+BAD FORMAT!!!