You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@drill.apache.org by jn...@apache.org on 2016/01/30 05:38:33 UTC
[3/4] drill git commit: DRILL-2517: Move directory-based partition
pruning to Calcite logical planning phase.
DRILL-2517: Move directory-based partition pruning to Calcite logical planning phase.
1) Make directory-based pruning rule both work in calcite logical and drill logical planning phase.
2) Only apply directory-based pruning in logical phase when there is no metadata cache.
3) Make FileSelection constructor public, since FileSelection.create() would modify selectionRoot.
Project: http://git-wip-us.apache.org/repos/asf/drill/repo
Commit: http://git-wip-us.apache.org/repos/asf/drill/commit/9b4008dc
Tree: http://git-wip-us.apache.org/repos/asf/drill/tree/9b4008dc
Diff: http://git-wip-us.apache.org/repos/asf/drill/diff/9b4008dc
Branch: refs/heads/master
Commit: 9b4008dc384bc5e0b9af9a048bfc93c5bcb902b2
Parents: c7dfba2
Author: Jinfeng Ni <jn...@apache.org>
Authored: Fri Jan 8 10:28:53 2016 -0800
Committer: Jinfeng Ni <jn...@apache.org>
Committed: Fri Jan 29 19:30:27 2016 -0800
----------------------------------------------------------------------
.../planner/sql/HivePartitionDescriptor.java | 56 +--
.../HivePushPartitionFilterIntoScan.java | 15 +-
.../planner/FileSystemPartitionDescriptor.java | 81 ++++-
.../planner/ParquetPartitionDescriptor.java | 17 +-
.../drill/exec/planner/PartitionDescriptor.java | 12 +-
.../planner/logical/DrillPushProjIntoScan.java | 8 +-
.../exec/planner/logical/DrillRuleSets.java | 21 +-
.../logical/partition/ParquetPruneScanRule.java | 19 +-
.../logical/partition/PruneScanRule.java | 346 ++++---------------
.../planner/sql/handlers/DefaultSqlHandler.java | 9 +
.../drill/exec/store/dfs/FileSelection.java | 6 +-
.../drill/exec/store/dfs/FormatSelection.java | 4 +
.../store/parquet/ParquetFileSelection.java | 4 +
.../org/apache/drill/TestExampleQueries.java | 4 -
.../org/apache/drill/TestPartitionFilter.java | 16 +
.../1995/Q1/orders_95_q1.parquet | Bin 0 -> 2180 bytes
.../1996/Q1/badFormat.parquet | 1 +
17 files changed, 281 insertions(+), 338 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/drill/blob/9b4008dc/contrib/storage-hive/core/src/main/java/org/apache/drill/exec/planner/sql/HivePartitionDescriptor.java
----------------------------------------------------------------------
diff --git a/contrib/storage-hive/core/src/main/java/org/apache/drill/exec/planner/sql/HivePartitionDescriptor.java b/contrib/storage-hive/core/src/main/java/org/apache/drill/exec/planner/sql/HivePartitionDescriptor.java
index e1eb25e..e531f38 100644
--- a/contrib/storage-hive/core/src/main/java/org/apache/drill/exec/planner/sql/HivePartitionDescriptor.java
+++ b/contrib/storage-hive/core/src/main/java/org/apache/drill/exec/planner/sql/HivePartitionDescriptor.java
@@ -19,6 +19,7 @@ package org.apache.drill.exec.planner.sql;
import io.netty.buffer.DrillBuf;
+import org.apache.calcite.rel.core.TableScan;
import org.apache.calcite.util.BitSets;
import org.apache.drill.common.exceptions.ExecutionSetupException;
import org.apache.drill.common.expression.SchemaPath;
@@ -27,6 +28,7 @@ import org.apache.drill.exec.physical.base.GroupScan;
import org.apache.drill.exec.planner.AbstractPartitionDescriptor;
import org.apache.drill.exec.planner.PartitionDescriptor;
import org.apache.drill.exec.planner.PartitionLocation;
+import org.apache.drill.exec.planner.logical.DrillRel;
import org.apache.drill.exec.planner.logical.DrillScanRel;
import org.apache.drill.exec.planner.physical.PlannerSettings;
import org.apache.drill.exec.store.hive.HiveUtilities;
@@ -90,27 +92,6 @@ public class HivePartitionDescriptor extends AbstractPartitionDescriptor {
}
@Override
- public GroupScan createNewGroupScan(List<String> newFiles) throws ExecutionSetupException {
- HiveScan hiveScan = (HiveScan) scanRel.getGroupScan();
- HiveReadEntry origReadEntry = hiveScan.hiveReadEntry;
- List<HiveTable.HivePartition> oldPartitions = origReadEntry.partitions;
- List<HiveTable.HivePartition> newPartitions = new LinkedList<>();
-
- for (HiveTable.HivePartition part: oldPartitions) {
- String partitionLocation = part.getPartition().getSd().getLocation();
- for (String newPartitionLocation: newFiles) {
- if (partitionLocation.equals(newPartitionLocation)) {
- newPartitions.add(part);
- }
- }
- }
-
- HiveReadEntry newReadEntry = new HiveReadEntry(origReadEntry.table, newPartitions);
-
- return hiveScan.clone(newReadEntry);
- }
-
- @Override
public void populatePartitionVectors(ValueVector[] vectors, List<PartitionLocation> partitions,
BitSet partitionColumnBitSet, Map<Integer, String> fieldNameMap) {
int record = 0;
@@ -169,4 +150,37 @@ public class HivePartitionDescriptor extends AbstractPartitionDescriptor {
sublistsCreated = true;
}
+ @Override
+ public TableScan createTableScan(List<String> newPartitions) throws Exception {
+ GroupScan newGroupScan = createNewGroupScan(newPartitions);
+ return new DrillScanRel(scanRel.getCluster(),
+ scanRel.getTraitSet().plus(DrillRel.DRILL_LOGICAL),
+ scanRel.getTable(),
+ newGroupScan,
+ scanRel.getRowType(),
+ scanRel.getColumns(),
+ true /*filter pushdown*/);
+ }
+
+ private GroupScan createNewGroupScan(List<String> newFiles) throws ExecutionSetupException {
+ HiveScan hiveScan = (HiveScan) scanRel.getGroupScan();
+ HiveReadEntry origReadEntry = hiveScan.hiveReadEntry;
+ List<HiveTable.HivePartition> oldPartitions = origReadEntry.partitions;
+ List<HiveTable.HivePartition> newPartitions = new LinkedList<>();
+
+ for (HiveTable.HivePartition part: oldPartitions) {
+ String partitionLocation = part.getPartition().getSd().getLocation();
+ for (String newPartitionLocation: newFiles) {
+ if (partitionLocation.equals(newPartitionLocation)) {
+ newPartitions.add(part);
+ }
+ }
+ }
+
+ HiveReadEntry newReadEntry = new HiveReadEntry(origReadEntry.table, newPartitions);
+
+ return hiveScan.clone(newReadEntry);
+ }
+
+
}
http://git-wip-us.apache.org/repos/asf/drill/blob/9b4008dc/contrib/storage-hive/core/src/main/java/org/apache/drill/exec/planner/sql/logical/HivePushPartitionFilterIntoScan.java
----------------------------------------------------------------------
diff --git a/contrib/storage-hive/core/src/main/java/org/apache/drill/exec/planner/sql/logical/HivePushPartitionFilterIntoScan.java b/contrib/storage-hive/core/src/main/java/org/apache/drill/exec/planner/sql/logical/HivePushPartitionFilterIntoScan.java
index 17cd65a..0bdfe99 100644
--- a/contrib/storage-hive/core/src/main/java/org/apache/drill/exec/planner/sql/logical/HivePushPartitionFilterIntoScan.java
+++ b/contrib/storage-hive/core/src/main/java/org/apache/drill/exec/planner/sql/logical/HivePushPartitionFilterIntoScan.java
@@ -19,6 +19,7 @@
package org.apache.drill.exec.planner.sql.logical;
import org.apache.calcite.rel.RelNode;
+import org.apache.calcite.rel.core.TableScan;
import org.apache.drill.exec.ops.OptimizerRulesContext;
import org.apache.drill.exec.physical.base.GroupScan;
import org.apache.drill.exec.planner.PartitionDescriptor;
@@ -44,7 +45,7 @@ public abstract class HivePushPartitionFilterIntoScan {
optimizerRulesContext) {
@Override
- public PartitionDescriptor getPartitionDescriptor(PlannerSettings settings, RelNode scanRel) {
+ public PartitionDescriptor getPartitionDescriptor(PlannerSettings settings, TableScan scanRel) {
return new HivePartitionDescriptor(settings, (DrillScanRel) scanRel, getOptimizerRulesContext().getManagedBuffer(),
defaultPartitionValue);
}
@@ -63,9 +64,9 @@ public abstract class HivePushPartitionFilterIntoScan {
@Override
public void onMatch(RelOptRuleCall call) {
- final DrillFilterRel filterRel = (DrillFilterRel) call.rel(0);
- final DrillProjectRel projectRel = (DrillProjectRel) call.rel(1);
- final DrillScanRel scanRel = (DrillScanRel) call.rel(2);
+ final DrillFilterRel filterRel = call.rel(0);
+ final DrillProjectRel projectRel = call.rel(1);
+ final DrillScanRel scanRel = call.rel(2);
doOnMatch(call, filterRel, projectRel, scanRel);
}
};
@@ -78,7 +79,7 @@ public abstract class HivePushPartitionFilterIntoScan {
"HivePushPartitionFilterIntoScan:Filter_On_Scan_Hive", optimizerRulesContext) {
@Override
- public PartitionDescriptor getPartitionDescriptor(PlannerSettings settings, RelNode scanRel) {
+ public PartitionDescriptor getPartitionDescriptor(PlannerSettings settings, TableScan scanRel) {
return new HivePartitionDescriptor(settings, (DrillScanRel) scanRel, getOptimizerRulesContext().getManagedBuffer(),
defaultPartitionValue);
}
@@ -97,8 +98,8 @@ public abstract class HivePushPartitionFilterIntoScan {
@Override
public void onMatch(RelOptRuleCall call) {
- final DrillFilterRel filterRel = (DrillFilterRel) call.rel(0);
- final DrillScanRel scanRel = (DrillScanRel) call.rel(1);
+ final DrillFilterRel filterRel = call.rel(0);
+ final DrillScanRel scanRel = call.rel(1);
doOnMatch(call, filterRel, null, scanRel);
}
};
http://git-wip-us.apache.org/repos/asf/drill/blob/9b4008dc/exec/java-exec/src/main/java/org/apache/drill/exec/planner/FileSystemPartitionDescriptor.java
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/planner/FileSystemPartitionDescriptor.java b/exec/java-exec/src/main/java/org/apache/drill/exec/planner/FileSystemPartitionDescriptor.java
index 02658ed..04a3f97 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/planner/FileSystemPartitionDescriptor.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/planner/FileSystemPartitionDescriptor.java
@@ -24,11 +24,14 @@ import java.util.List;
import java.util.Map;
import com.google.common.base.Charsets;
+import com.google.common.base.Preconditions;
import com.google.common.collect.Lists;
import com.google.common.collect.Maps;
import org.apache.calcite.adapter.enumerable.EnumerableTableScan;
+import org.apache.calcite.prepare.RelOptTableImpl;
import org.apache.calcite.rel.RelNode;
+import org.apache.calcite.rel.core.TableScan;
import org.apache.calcite.util.BitSets;
import org.apache.drill.common.exceptions.DrillRuntimeException;
import org.apache.drill.common.expression.SchemaPath;
@@ -36,7 +39,10 @@ import org.apache.drill.common.types.TypeProtos;
import org.apache.drill.common.types.Types;
import org.apache.drill.exec.physical.base.FileGroupScan;
import org.apache.drill.exec.physical.base.GroupScan;
+import org.apache.drill.exec.planner.logical.DrillRel;
import org.apache.drill.exec.planner.logical.DrillScanRel;
+import org.apache.drill.exec.planner.logical.DrillTable;
+import org.apache.drill.exec.planner.logical.DrillTranslatableTable;
import org.apache.drill.exec.planner.logical.DynamicDrillTable;
import org.apache.drill.exec.planner.physical.PlannerSettings;
import org.apache.drill.exec.store.dfs.FileSelection;
@@ -53,14 +59,22 @@ public class FileSystemPartitionDescriptor extends AbstractPartitionDescriptor {
private final String partitionLabel;
private final int partitionLabelLength;
private final Map<String, Integer> partitions = Maps.newHashMap();
- private final EnumerableTableScan scanRel;
- private final DynamicDrillTable table;
+ private final TableScan scanRel;
+ private final DrillTable table;
- public FileSystemPartitionDescriptor(PlannerSettings settings, RelNode scanRel) {
+ public FileSystemPartitionDescriptor(PlannerSettings settings, TableScan scanRel) {
+ Preconditions.checkArgument(scanRel instanceof DrillScanRel || scanRel instanceof EnumerableTableScan);
this.partitionLabel = settings.getFsPartitionColumnLabel();
this.partitionLabelLength = partitionLabel.length();
- this.scanRel = (EnumerableTableScan) scanRel;
- table = scanRel.getTable().unwrap(DynamicDrillTable.class);
+ this.scanRel = scanRel;
+ DrillTable unwrap;
+ unwrap = scanRel.getTable().unwrap(DrillTable.class);
+ if (unwrap == null) {
+ unwrap = scanRel.getTable().unwrap(DrillTranslatableTable.class).getDrillTable();
+ }
+
+ table = unwrap;
+
for(int i =0; i < 10; i++){
partitions.put(partitionLabel + i, i);
}
@@ -87,18 +101,18 @@ public class FileSystemPartitionDescriptor extends AbstractPartitionDescriptor {
return MAX_NESTED_SUBDIRS;
}
- @Override
- public GroupScan createNewGroupScan(List<String> newFiles) throws IOException {
- /*
- THIS NEEDS TO CHANGE. WE SHOULD RETURN A ENUMERABLETABLESCAN??
- final FileSelection newFileSelection = new FileSelection(newFiles, getBaseTableLocation(), true);
- final FileGroupScan newScan = ((FileGroupScan)scanRel.getGroupScan()).clone(newFileSelection);
- return newScan;
- */
- return null;
- }
-
- public DynamicDrillTable getTable() {
+// @Override
+// public GroupScan createNewGroupScan(List<String> newFiles) throws IOException {
+// if (scanRel instanceof DrillScanRel) {
+// final FileSelection newFileSelection = new FileSelection(null, newFiles, getBaseTableLocation());
+// final FileGroupScan newScan = ((FileGroupScan)((DrillScanRel)scanRel).getGroupScan()).clone(newFileSelection);
+// return newScan;
+// } else {
+// throw new UnsupportedOperationException("Does not allow to get groupScan for EnumerableTableScan");
+// }
+// }
+
+ public DrillTable getTable() {
return table;
}
@@ -152,4 +166,37 @@ public class FileSystemPartitionDescriptor extends AbstractPartitionDescriptor {
sublistsCreated = true;
}
+ @Override
+ public TableScan createTableScan(List<String> newFiles) throws Exception {
+ if (scanRel instanceof DrillScanRel) {
+ final FileSelection newFileSelection = new FileSelection(null, newFiles, getBaseTableLocation());
+ final FileGroupScan newGroupScan = ((FileGroupScan)((DrillScanRel)scanRel).getGroupScan()).clone(newFileSelection);
+ return new DrillScanRel(scanRel.getCluster(),
+ scanRel.getTraitSet().plus(DrillRel.DRILL_LOGICAL),
+ scanRel.getTable(),
+ newGroupScan,
+ scanRel.getRowType(),
+ ((DrillScanRel) scanRel).getColumns(),
+ true /*filter pushdown*/);
+ } else if (scanRel instanceof EnumerableTableScan) {
+ return createNewTableScanFromSelection((EnumerableTableScan)scanRel, newFiles);
+ } else {
+ throw new UnsupportedOperationException("Only DrillScanRel and EnumerableTableScan is allowed!");
+ }
+ }
+
+ private TableScan createNewTableScanFromSelection(EnumerableTableScan oldScan, List<String> newFiles) {
+ final RelOptTableImpl t = (RelOptTableImpl) oldScan.getTable();
+ final FormatSelection formatSelection = (FormatSelection) table.getSelection();
+ final FileSelection newFileSelection = new FileSelection(null, newFiles, getBaseTableLocation());
+ final FormatSelection newFormatSelection = new FormatSelection(formatSelection.getFormat(), newFileSelection);
+ final DrillTranslatableTable newTable = new DrillTranslatableTable(
+ new DynamicDrillTable(table.getPlugin(), table.getStorageEngineName(),
+ table.getUserName(),
+ newFormatSelection));
+ final RelOptTableImpl newOptTableImpl = RelOptTableImpl.create(t.getRelOptSchema(), t.getRowType(), newTable);
+
+ return EnumerableTableScan.create(oldScan.getCluster(), newOptTableImpl);
+ }
+
}
http://git-wip-us.apache.org/repos/asf/drill/blob/9b4008dc/exec/java-exec/src/main/java/org/apache/drill/exec/planner/ParquetPartitionDescriptor.java
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/planner/ParquetPartitionDescriptor.java b/exec/java-exec/src/main/java/org/apache/drill/exec/planner/ParquetPartitionDescriptor.java
index cda5a5e..81bcf03 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/planner/ParquetPartitionDescriptor.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/planner/ParquetPartitionDescriptor.java
@@ -17,11 +17,13 @@
*/
package org.apache.drill.exec.planner;
+import org.apache.calcite.rel.core.TableScan;
import org.apache.calcite.util.BitSets;
import org.apache.drill.common.expression.SchemaPath;
import org.apache.drill.common.types.TypeProtos;
import org.apache.drill.exec.physical.base.FileGroupScan;
import org.apache.drill.exec.physical.base.GroupScan;
+import org.apache.drill.exec.planner.logical.DrillRel;
import org.apache.drill.exec.planner.logical.DrillScanRel;
import org.apache.drill.exec.planner.physical.PlannerSettings;
import org.apache.drill.exec.store.dfs.FileSelection;
@@ -78,8 +80,7 @@ public class ParquetPartitionDescriptor extends AbstractPartitionDescriptor {
return partitionColumns.size();
}
- @Override
- public GroupScan createNewGroupScan(List<String> newFiles) throws IOException {
+ private GroupScan createNewGroupScan(List<String> newFiles) throws IOException {
final FileSelection newSelection = FileSelection.create(null, newFiles, getBaseTableLocation());
final FileGroupScan newScan = ((FileGroupScan)scanRel.getGroupScan()).clone(newSelection);
return newScan;
@@ -128,4 +129,16 @@ public class ParquetPartitionDescriptor extends AbstractPartitionDescriptor {
sublistsCreated = true;
}
+ @Override
+ public TableScan createTableScan(List<String> newFiles) throws Exception {
+ final GroupScan newGroupScan = createNewGroupScan(newFiles);
+
+ return new DrillScanRel(scanRel.getCluster(),
+ scanRel.getTraitSet().plus(DrillRel.DRILL_LOGICAL),
+ scanRel.getTable(),
+ newGroupScan,
+ scanRel.getRowType(),
+ scanRel.getColumns(),
+ true /*filter pushdown*/);
+ }
}
http://git-wip-us.apache.org/repos/asf/drill/blob/9b4008dc/exec/java-exec/src/main/java/org/apache/drill/exec/planner/PartitionDescriptor.java
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/planner/PartitionDescriptor.java b/exec/java-exec/src/main/java/org/apache/drill/exec/planner/PartitionDescriptor.java
index 726d8bc..dd3b084 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/planner/PartitionDescriptor.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/planner/PartitionDescriptor.java
@@ -17,6 +17,7 @@
*/
package org.apache.drill.exec.planner;
+import org.apache.calcite.rel.core.TableScan;
import org.apache.drill.common.expression.SchemaPath;
import org.apache.drill.common.types.TypeProtos;
import org.apache.drill.exec.physical.base.GroupScan;
@@ -53,8 +54,6 @@ public interface PartitionDescriptor extends Iterable<List<PartitionLocation>> {
// Maximum level of partition nesting/ hierarchy supported
public int getMaxHierarchyLevel();
- public GroupScan createNewGroupScan(List<String> newFiles) throws Exception;
-
/**
* Method creates an in memory representation of all the partitions. For each level of partitioning we
* will create a value vector which this method will populate for all the partitions with the values of the
@@ -74,4 +73,13 @@ public interface PartitionDescriptor extends Iterable<List<PartitionLocation>> {
* @return
*/
TypeProtos.MajorType getVectorType(SchemaPath column, PlannerSettings plannerSettings);
+
+ /**
+ * Methods create a new TableScan rel node, given the lists of new partitions or new files to SCAN.
+ * @param newPartitions
+ * @return
+ * @throws Exception
+ */
+ public TableScan createTableScan(List<String> newPartitions) throws Exception;
+
}
http://git-wip-us.apache.org/repos/asf/drill/blob/9b4008dc/exec/java-exec/src/main/java/org/apache/drill/exec/planner/logical/DrillPushProjIntoScan.java
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/planner/logical/DrillPushProjIntoScan.java b/exec/java-exec/src/main/java/org/apache/drill/exec/planner/logical/DrillPushProjIntoScan.java
index 1fd1cd7..33c840b 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/planner/logical/DrillPushProjIntoScan.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/planner/logical/DrillPushProjIntoScan.java
@@ -54,8 +54,14 @@ public class DrillPushProjIntoScan extends RelOptRule {
try {
ProjectPushInfo columnInfo = PrelUtil.getColumns(scan.getRowType(), proj.getProjects());
+ // get DrillTable, either wrapped in RelOptTable, or DrillTranslatableTable.
+ DrillTable table = scan.getTable().unwrap(DrillTable.class);
+ if (table == null) {
+ table = scan.getTable().unwrap(DrillTranslatableTable.class).getDrillTable();
+ }
+
if (columnInfo == null || columnInfo.isStarQuery() //
- || !scan.getTable().unwrap(DrillTable.class) //
+ || !table //
.getGroupScan().canPushdownProjects(columnInfo.columns)) {
return;
}
http://git-wip-us.apache.org/repos/asf/drill/blob/9b4008dc/exec/java-exec/src/main/java/org/apache/drill/exec/planner/logical/DrillRuleSets.java
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/planner/logical/DrillRuleSets.java b/exec/java-exec/src/main/java/org/apache/drill/exec/planner/logical/DrillRuleSets.java
index 6f1f995..d9609d2 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/planner/logical/DrillRuleSets.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/planner/logical/DrillRuleSets.java
@@ -204,8 +204,8 @@ public class DrillRuleSets {
public static RuleSet getPruneScanRules(OptimizerRulesContext optimizerRulesContext) {
final ImmutableSet<RelOptRule> pruneRules = ImmutableSet.<RelOptRule>builder()
.add(
- PruneScanRule.getFilterOnProject(optimizerRulesContext),
- PruneScanRule.getFilterOnScan(optimizerRulesContext),
+ PruneScanRule.getDirFilterOnProject(optimizerRulesContext),
+ PruneScanRule.getDirFilterOnScan(optimizerRulesContext),
ParquetPruneScanRule.getFilterOnProjectParquet(optimizerRulesContext),
ParquetPruneScanRule.getFilterOnScanParquet(optimizerRulesContext)
)
@@ -214,6 +214,23 @@ public class DrillRuleSets {
return new DrillRuleSet(pruneRules);
}
+ /**
+ * Get an immutable list of directory-based partition pruing rules that will be used in Calcite logical planning.
+ * @param optimizerRulesContext
+ * @return
+ */
+ public static RuleSet getDirPruneScanRules(OptimizerRulesContext optimizerRulesContext) {
+ final ImmutableSet<RelOptRule> pruneRules = ImmutableSet.<RelOptRule>builder()
+ .add(
+ PruneScanRule.getDirFilterOnProject(optimizerRulesContext),
+ PruneScanRule.getDirFilterOnScan(optimizerRulesContext)
+ )
+ .build();
+
+ return new DrillRuleSet(pruneRules);
+
+ }
+
// Ruleset for join permutation, used only in VolcanoPlanner.
public static RuleSet getJoinPermRules(OptimizerRulesContext optimizerRulesContext) {
return new DrillRuleSet(ImmutableSet.<RelOptRule> builder().add( //
http://git-wip-us.apache.org/repos/asf/drill/blob/9b4008dc/exec/java-exec/src/main/java/org/apache/drill/exec/planner/logical/partition/ParquetPruneScanRule.java
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/planner/logical/partition/ParquetPruneScanRule.java b/exec/java-exec/src/main/java/org/apache/drill/exec/planner/logical/partition/ParquetPruneScanRule.java
index d92d2b0..f5dbb9d 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/planner/logical/partition/ParquetPruneScanRule.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/planner/logical/partition/ParquetPruneScanRule.java
@@ -20,6 +20,7 @@ package org.apache.drill.exec.planner.logical.partition;
import org.apache.calcite.plan.RelOptRule;
import org.apache.calcite.plan.RelOptRuleCall;
import org.apache.calcite.rel.RelNode;
+import org.apache.calcite.rel.core.TableScan;
import org.apache.drill.exec.ops.OptimizerRulesContext;
import org.apache.drill.exec.physical.base.FileGroupScan;
import org.apache.drill.exec.physical.base.GroupScan;
@@ -42,13 +43,13 @@ public class ParquetPruneScanRule {
optimizerRulesContext) {
@Override
- public PartitionDescriptor getPartitionDescriptor(PlannerSettings settings, RelNode scanRel) {
+ public PartitionDescriptor getPartitionDescriptor(PlannerSettings settings, TableScan scanRel) {
return new ParquetPartitionDescriptor(settings, (DrillScanRel) scanRel);
}
@Override
public boolean matches(RelOptRuleCall call) {
- final DrillScanRel scan = (DrillScanRel) call.rel(2);
+ final DrillScanRel scan = call.rel(2);
GroupScan groupScan = scan.getGroupScan();
// this rule is applicable only for parquet based partition pruning
if (PrelUtil.getPlannerSettings(scan.getCluster().getPlanner()).isHepPartitionPruningEnabled()) {
@@ -60,9 +61,9 @@ public class ParquetPruneScanRule {
@Override
public void onMatch(RelOptRuleCall call) {
- final DrillFilterRel filterRel = (DrillFilterRel) call.rel(0);
- final DrillProjectRel projectRel = (DrillProjectRel) call.rel(1);
- final DrillScanRel scanRel = (DrillScanRel) call.rel(2);
+ final DrillFilterRel filterRel = call.rel(0);
+ final DrillProjectRel projectRel = call.rel(1);
+ final DrillScanRel scanRel = call.rel(2);
doOnMatch(call, filterRel, projectRel, scanRel);
}
};
@@ -74,13 +75,13 @@ public class ParquetPruneScanRule {
"PruneScanRule:Filter_On_Scan_Parquet", optimizerRulesContext) {
@Override
- public PartitionDescriptor getPartitionDescriptor(PlannerSettings settings, RelNode scanRel) {
+ public PartitionDescriptor getPartitionDescriptor(PlannerSettings settings, TableScan scanRel) {
return new ParquetPartitionDescriptor(settings, (DrillScanRel) scanRel);
}
@Override
public boolean matches(RelOptRuleCall call) {
- final DrillScanRel scan = (DrillScanRel) call.rel(1);
+ final DrillScanRel scan = call.rel(1);
GroupScan groupScan = scan.getGroupScan();
// this rule is applicable only for parquet based partition pruning
if (PrelUtil.getPlannerSettings(scan.getCluster().getPlanner()).isHepPartitionPruningEnabled()) {
@@ -92,8 +93,8 @@ public class ParquetPruneScanRule {
@Override
public void onMatch(RelOptRuleCall call) {
- final DrillFilterRel filterRel = (DrillFilterRel) call.rel(0);
- final DrillScanRel scanRel = (DrillScanRel) call.rel(1);
+ final DrillFilterRel filterRel = call.rel(0);
+ final DrillScanRel scanRel = call.rel(1);
doOnMatch(call, filterRel, null, scanRel);
}
};
http://git-wip-us.apache.org/repos/asf/drill/blob/9b4008dc/exec/java-exec/src/main/java/org/apache/drill/exec/planner/logical/partition/PruneScanRule.java
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/planner/logical/partition/PruneScanRule.java b/exec/java-exec/src/main/java/org/apache/drill/exec/planner/logical/partition/PruneScanRule.java
index bfd6597..aefd247 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/planner/logical/partition/PruneScanRule.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/planner/logical/partition/PruneScanRule.java
@@ -25,8 +25,12 @@ import java.util.concurrent.TimeUnit;
import com.google.common.base.Stopwatch;
import org.apache.calcite.adapter.enumerable.EnumerableTableScan;
+import org.apache.calcite.jdbc.CalciteAbstractSchema;
+import org.apache.calcite.jdbc.CalciteSchema;
import org.apache.calcite.prepare.RelOptTableImpl;
+import org.apache.calcite.rel.core.Filter;
import org.apache.calcite.rel.core.Project;
+import org.apache.calcite.rel.core.TableScan;
import org.apache.calcite.rel.logical.LogicalFilter;
import org.apache.calcite.rex.RexUtil;
import org.apache.calcite.util.BitSets;
@@ -55,6 +59,8 @@ import org.apache.drill.exec.planner.logical.DrillParseContext;
import org.apache.drill.exec.planner.logical.DrillProjectRel;
import org.apache.drill.exec.planner.logical.DrillRel;
import org.apache.drill.exec.planner.logical.DrillScanRel;
+import org.apache.drill.exec.planner.logical.DrillTable;
+import org.apache.drill.exec.planner.logical.DrillTranslatableTable;
import org.apache.drill.exec.planner.logical.DynamicDrillTable;
import org.apache.drill.exec.planner.logical.RelOptHelper;
import org.apache.drill.exec.planner.physical.PlannerSettings;
@@ -64,6 +70,7 @@ import org.apache.drill.exec.record.VectorContainer;
import org.apache.drill.exec.store.StoragePluginOptimizerRule;
import org.apache.drill.exec.store.dfs.FileSelection;
import org.apache.drill.exec.store.dfs.FormatSelection;
+import org.apache.drill.exec.store.parquet.ParquetFileSelection;
import org.apache.drill.exec.vector.NullableBitVector;
import org.apache.calcite.rel.RelNode;
import org.apache.calcite.plan.RelOptRule;
@@ -87,282 +94,64 @@ public abstract class PruneScanRule extends StoragePluginOptimizerRule {
this.optimizerContext = optimizerContext;
}
- public static final RelOptRule getFilterOnProject(OptimizerRulesContext optimizerRulesContext) {
- return new PruneScanRule(
- RelOptHelper.some(LogicalFilter.class, RelOptHelper.some(Project.class, RelOptHelper.any(EnumerableTableScan.class))),
- "PruneScanRule:Filter_On_Project",
- optimizerRulesContext) {
-
- @Override
- public PartitionDescriptor getPartitionDescriptor(PlannerSettings settings, RelNode scanRel) {
- return new FileSystemPartitionDescriptor(settings, scanRel);
- }
-
- @Override
- public boolean matches(RelOptRuleCall call) {
- /*
- final DrillScanRel scan = (DrillScanRel) call.rel(2);
- GroupScan groupScan = scan.getGroupScan();
- // this rule is applicable only for dfs based partition pruning
- return groupScan instanceof FileGroupScan && groupScan.supportsPartitionFilterPushdown();
- */
- return true;
- }
-
- @Override
- public void onMatch(RelOptRuleCall call) {
- final LogicalFilter filterRel = call.rel(0);
- final Project projectRel = call.rel(1);
- final EnumerableTableScan scanRel = call.rel(2);
- doOnMatchLogical(call, filterRel, projectRel, scanRel);
- }
- };
- }
-
- public static final RelOptRule getFilterOnScan(OptimizerRulesContext optimizerRulesContext) {
- return new PruneScanRule(
- RelOptHelper.some(LogicalFilter.class, RelOptHelper.any(EnumerableTableScan.class)),
- "PruneScanRule:Filter_On_Scan", optimizerRulesContext) {
-
- @Override
- public PartitionDescriptor getPartitionDescriptor(PlannerSettings settings, RelNode scanRel) {
- return new FileSystemPartitionDescriptor(settings, scanRel);
- }
-
- @Override
- public boolean matches(RelOptRuleCall call) {
- /* final DrillScanRel scan = (DrillScanRel) call.rel(1);
- GroupScan groupScan = scan.getGroupScan();
- // this rule is applicable only for dfs based partition pruning
- return groupScan instanceof FileGroupScan && groupScan.supportsPartitionFilterPushdown();
- */
- return true;
- }
-
- @Override
- public void onMatch(RelOptRuleCall call) {
- final LogicalFilter filterRel = call.rel(0);
- final EnumerableTableScan scanRel = call.rel(1);
- doOnMatchLogical(call, filterRel, null, scanRel);
- }
- };
- }
-
- // TODO: Combine the doOnMatch and doOnMatchLogical
- protected void doOnMatchLogical(RelOptRuleCall call, LogicalFilter filterRel, Project projectRel, EnumerableTableScan scanRel) {
- final String pruningClassName = getClass().getName();
- logger.info("Beginning partition pruning, pruning class: {}", pruningClassName);
- Stopwatch totalPruningTime = new Stopwatch();
- totalPruningTime.start();
-
-
- final PlannerSettings settings = PrelUtil.getPlannerSettings(call.getPlanner());
- PartitionDescriptor descriptor = getPartitionDescriptor(settings, scanRel);
- final BufferAllocator allocator = optimizerContext.getAllocator();
-
-
- RexNode condition = null;
- if (projectRel == null) {
- condition = filterRel.getCondition();
- } else {
- // get the filter as if it were below the projection.
- condition = RelOptUtil.pushFilterPastProject(filterRel.getCondition(), projectRel);
+ private static class DirPruneScanFilterOnProjectRule extends PruneScanRule {
+ public DirPruneScanFilterOnProjectRule(OptimizerRulesContext optimizerRulesContext) {
+ super(RelOptHelper.some(Filter.class, RelOptHelper.some(Project.class, RelOptHelper.any(TableScan.class))), "DirPruneScanRule:Filter_On_Project", optimizerRulesContext);
}
- RewriteAsBinaryOperators visitor = new RewriteAsBinaryOperators(true, filterRel.getCluster().getRexBuilder());
- condition = condition.accept(visitor);
-
- Map<Integer, String> fieldNameMap = Maps.newHashMap();
- List<String> fieldNames = scanRel.getRowType().getFieldNames();
- BitSet columnBitset = new BitSet();
- BitSet partitionColumnBitSet = new BitSet();
-
- int relColIndex = 0;
- for (String field : fieldNames) {
- final Integer partitionIndex = descriptor.getIdIfValid(field);
- if (partitionIndex != null) {
- fieldNameMap.put(partitionIndex, field);
- partitionColumnBitSet.set(partitionIndex);
- columnBitset.set(relColIndex);
- }
- relColIndex++;
+ @Override
+ public PartitionDescriptor getPartitionDescriptor(PlannerSettings settings, TableScan scanRel) {
+ return new FileSystemPartitionDescriptor(settings, scanRel);
}
- if (partitionColumnBitSet.isEmpty()) {
- logger.info("No partition columns are projected from the scan..continue. " +
- "Total pruning elapsed time: {} ms", totalPruningTime.elapsed(TimeUnit.MILLISECONDS));
- return;
+ @Override
+ public boolean matches(RelOptRuleCall call) {
+ final TableScan scan = call.rel(2);
+ return isQualifiedDirPruning(scan);
}
- // stop watch to track how long we spend in different phases of pruning
- Stopwatch miscTimer = new Stopwatch();
-
- // track how long we spend building the filter tree
- miscTimer.start();
-
- FindPartitionConditions c = new FindPartitionConditions(columnBitset, filterRel.getCluster().getRexBuilder());
- c.analyze(condition);
- RexNode pruneCondition = c.getFinalCondition();
-
- logger.info("Total elapsed time to build and analyze filter tree: {} ms",
- miscTimer.elapsed(TimeUnit.MILLISECONDS));
- miscTimer.reset();
-
- if (pruneCondition == null) {
- logger.info("No conditions were found eligible for partition pruning." +
- "Total pruning elapsed time: {} ms", totalPruningTime.elapsed(TimeUnit.MILLISECONDS));
- return;
+ @Override
+ public void onMatch(RelOptRuleCall call) {
+ final Filter filterRel = call.rel(0);
+ final Project projectRel = call.rel(1);
+ final TableScan scanRel = call.rel(2);
+ doOnMatch(call, filterRel, projectRel, scanRel);
}
+ }
- // set up the partitions
- List<String> newFiles = Lists.newArrayList();
- long numTotal = 0; // total number of partitions
- int batchIndex = 0;
- String firstLocation = null;
- LogicalExpression materializedExpr = null;
-
- // Outer loop: iterate over a list of batches of PartitionLocations
- for (List<PartitionLocation> partitions : descriptor) {
- numTotal += partitions.size();
- logger.debug("Evaluating partition pruning for batch {}", batchIndex);
- if (batchIndex == 0) { // save the first location in case everything is pruned
- firstLocation = partitions.get(0).getEntirePartitionLocation();
- }
- final NullableBitVector output = new NullableBitVector(MaterializedField.create("", Types.optional(MinorType.BIT)), allocator);
- final VectorContainer container = new VectorContainer();
-
- try {
- final ValueVector[] vectors = new ValueVector[descriptor.getMaxHierarchyLevel()];
- for (int partitionColumnIndex : BitSets.toIter(partitionColumnBitSet)) {
- SchemaPath column = SchemaPath.getSimplePath(fieldNameMap.get(partitionColumnIndex));
- MajorType type = descriptor.getVectorType(column, settings);
- MaterializedField field = MaterializedField.create(column, type);
- ValueVector v = TypeHelper.getNewVector(field, allocator);
- v.allocateNew();
- vectors[partitionColumnIndex] = v;
- container.add(v);
- }
-
- // track how long we spend populating partition column vectors
- miscTimer.start();
-
- // populate partition vectors.
- descriptor.populatePartitionVectors(vectors, partitions, partitionColumnBitSet, fieldNameMap);
-
- logger.info("Elapsed time to populate partitioning column vectors: {} ms within batchIndex: {}",
- miscTimer.elapsed(TimeUnit.MILLISECONDS), batchIndex);
- miscTimer.reset();
-
- // materialize the expression; only need to do this once
- if (batchIndex == 0) {
- materializedExpr = materializePruneExpr(pruneCondition, settings, scanRel, container);
- if (materializedExpr == null) {
- // continue without partition pruning; no need to log anything here since
- // materializePruneExpr logs it already
- logger.info("Total pruning elapsed time: {} ms",
- totalPruningTime.elapsed(TimeUnit.MILLISECONDS));
- return;
- }
- }
-
- output.allocateNew(partitions.size());
-
- // start the timer to evaluate how long we spend in the interpreter evaluation
- miscTimer.start();
-
- InterpreterEvaluator.evaluate(partitions.size(), optimizerContext, container, output, materializedExpr);
-
- logger.info("Elapsed time in interpreter evaluation: {} ms within batchIndex: {}",
- miscTimer.elapsed(TimeUnit.MILLISECONDS), batchIndex);
- miscTimer.reset();
-
- int recordCount = 0;
- int qualifiedCount = 0;
-
- // Inner loop: within each batch iterate over the PartitionLocations
- for(PartitionLocation part: partitions){
- if(!output.getAccessor().isNull(recordCount) && output.getAccessor().get(recordCount) == 1){
- newFiles.add(part.getEntirePartitionLocation());
- qualifiedCount++;
- }
- recordCount++;
- }
- logger.debug("Within batch {}: total records: {}, qualified records: {}", batchIndex, recordCount, qualifiedCount);
- batchIndex++;
- } catch (Exception e) {
- logger.warn("Exception while trying to prune partition.", e);
- logger.info("Total pruning elapsed time: {} ms", totalPruningTime.elapsed(TimeUnit.MILLISECONDS));
- return; // continue without partition pruning
- } finally {
- container.clear();
- if (output != null) {
- output.clear();
- }
- }
+ private static class DirPruneScanFilterOnScanRule extends PruneScanRule {
+ public DirPruneScanFilterOnScanRule(OptimizerRulesContext optimizerRulesContext) {
+ super(RelOptHelper.some(Filter.class, RelOptHelper.any(TableScan.class)), "DirPruneScanRule:Filter_On_Scan", optimizerRulesContext);
}
- try {
-
- boolean canDropFilter = true;
-
- if (newFiles.isEmpty()) {
- assert firstLocation != null;
- newFiles.add(firstLocation);
- canDropFilter = false;
- }
-
- if (newFiles.size() == numTotal) {
- logger.info("No partitions were eligible for pruning");
- return;
- }
-
- logger.info("Pruned {} partitions down to {}", numTotal, newFiles.size());
-
- List<RexNode> conjuncts = RelOptUtil.conjunctions(condition);
- List<RexNode> pruneConjuncts = RelOptUtil.conjunctions(pruneCondition);
- conjuncts.removeAll(pruneConjuncts);
- RexNode newCondition = RexUtil.composeConjunction(filterRel.getCluster().getRexBuilder(), conjuncts, false);
-
- RewriteCombineBinaryOperators reverseVisitor = new RewriteCombineBinaryOperators(true, filterRel.getCluster().getRexBuilder());
-
- condition = condition.accept(reverseVisitor);
- pruneCondition = pruneCondition.accept(reverseVisitor);
-
- RelOptTableImpl t = (RelOptTableImpl) scanRel.getTable();
- DynamicDrillTable oldTable = ((FileSystemPartitionDescriptor) descriptor).getTable();
- FormatSelection formatSelection = (FormatSelection) oldTable.getSelection();
- FileSelection oldFileSelection = formatSelection.getSelection();
- FileSelection newFileSelection = new FileSelection(newFiles, oldFileSelection.selectionRoot, oldFileSelection.getParquetMetadata(), oldFileSelection.getFileStatuses());
- FormatSelection newFormatSelection = new FormatSelection(formatSelection.getFormat(), newFileSelection);
- DynamicDrillTable newTable = new DynamicDrillTable(oldTable.getPlugin(), oldTable.getStorageEngineName(),
- oldTable.getUserName(), newFormatSelection);
- RelOptTableImpl newOptTableImpl = RelOptTableImpl.create(t.getRelOptSchema(), t.getRowType(), newTable);
-
- // TODO: The new scan should come from the PartitionDescriptor
- // TODO: Update the PartitionDescriptor to return ScanRel instead of the GroupScan
- EnumerableTableScan newScan = EnumerableTableScan.create(scanRel.getCluster(), newOptTableImpl);
+ @Override
+ public PartitionDescriptor getPartitionDescriptor(PlannerSettings settings, TableScan scanRel) {
+ return new FileSystemPartitionDescriptor(settings, scanRel);
+ }
- RelNode inputRel = newScan;
+ @Override
+ public boolean matches(RelOptRuleCall call) {
+ final TableScan scan = call.rel(1);
+ return isQualifiedDirPruning(scan);
+ }
- if (projectRel != null) {
- inputRel = projectRel.copy(projectRel.getTraitSet(), Collections.singletonList(inputRel));
- }
+ @Override
+ public void onMatch(RelOptRuleCall call) {
+ final Filter filterRel = call.rel(0);
+ final TableScan scanRel = call.rel(1);
+ doOnMatch(call, filterRel, null, scanRel);
+ }
+ }
- if (newCondition.isAlwaysTrue() && canDropFilter) {
- call.transformTo(inputRel);
- } else {
- final RelNode newFilter = filterRel.copy(filterRel.getTraitSet(), Collections.singletonList(inputRel));
- call.transformTo(newFilter);
- }
+ public static final RelOptRule getDirFilterOnProject(OptimizerRulesContext optimizerRulesContext) {
+ return new DirPruneScanFilterOnProjectRule(optimizerRulesContext);
+ }
- } catch (Exception e) {
- logger.warn("Exception while using the pruned partitions.", e);
- } finally {
- logger.info("Total pruning elapsed time: {} ms", totalPruningTime.elapsed(TimeUnit.MILLISECONDS));
- }
+ public static final RelOptRule getDirFilterOnScan(OptimizerRulesContext optimizerRulesContext) {
+ return new DirPruneScanFilterOnScanRule(optimizerRulesContext);
}
- protected void doOnMatch(RelOptRuleCall call, DrillFilterRel filterRel, DrillProjectRel projectRel, DrillScanRel scanRel) {
+ protected void doOnMatch(RelOptRuleCall call, Filter filterRel, Project projectRel, TableScan scanRel) {
final String pruningClassName = getClass().getName();
logger.info("Beginning partition pruning, pruning class: {}", pruningClassName);
Stopwatch totalPruningTime = new Stopwatch();
@@ -373,7 +162,6 @@ public abstract class PruneScanRule extends StoragePluginOptimizerRule {
PartitionDescriptor descriptor = getPartitionDescriptor(settings, scanRel);
final BufferAllocator allocator = optimizerContext.getAllocator();
-
RexNode condition = null;
if (projectRel == null) {
condition = filterRel.getCondition();
@@ -541,16 +329,7 @@ public abstract class PruneScanRule extends StoragePluginOptimizerRule {
condition = condition.accept(reverseVisitor);
pruneCondition = pruneCondition.accept(reverseVisitor);
- final DrillScanRel newScanRel =
- new DrillScanRel(scanRel.getCluster(),
- scanRel.getTraitSet().plus(DrillRel.DRILL_LOGICAL),
- scanRel.getTable(),
- descriptor.createNewGroupScan(newFiles),
- scanRel.getRowType(),
- scanRel.getColumns(),
- true /*filter pushdown*/);
-
- RelNode inputRel = newScanRel;
+ RelNode inputRel = descriptor.createTableScan(newFiles);
if (projectRel != null) {
inputRel = projectRel.copy(projectRel.getTraitSet(), Collections.singletonList(inputRel));
@@ -602,5 +381,28 @@ public abstract class PruneScanRule extends StoragePluginOptimizerRule {
return optimizerContext;
}
- public abstract PartitionDescriptor getPartitionDescriptor(PlannerSettings settings, RelNode scanRel);
+ public abstract PartitionDescriptor getPartitionDescriptor(PlannerSettings settings, TableScan scanRel);
+
+ private static boolean isQualifiedDirPruning(final TableScan scan) {
+ if (scan instanceof EnumerableTableScan) {
+ DrillTable drillTable;
+ drillTable = scan.getTable().unwrap(DrillTable.class);
+ if (drillTable == null) {
+ drillTable = scan.getTable().unwrap(DrillTranslatableTable.class).getDrillTable();
+ }
+ final Object selection = drillTable.getSelection();
+ if (selection instanceof FormatSelection
+ && ((FormatSelection)selection).supportDirPruning()) {
+ return true; // Do directory-based pruning in Calcite logical
+ } else {
+ return false; // Do not do directory-based pruning in Calcite logical
+ }
+ } else if (scan instanceof DrillScanRel) {
+ final GroupScan groupScan = ((DrillScanRel) scan).getGroupScan();
+ // this rule is applicable only for dfs based partition pruning in Drill Logical
+ return groupScan instanceof FileGroupScan && groupScan.supportsPartitionFilterPushdown() && !((DrillScanRel)scan).partitionFilterPushdown();
+ }
+ return false;
+ }
+
}
http://git-wip-us.apache.org/repos/asf/drill/blob/9b4008dc/exec/java-exec/src/main/java/org/apache/drill/exec/planner/sql/handlers/DefaultSqlHandler.java
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/planner/sql/handlers/DefaultSqlHandler.java b/exec/java-exec/src/main/java/org/apache/drill/exec/planner/sql/handlers/DefaultSqlHandler.java
index 686f7d7..d6bdc78 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/planner/sql/handlers/DefaultSqlHandler.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/planner/sql/handlers/DefaultSqlHandler.java
@@ -534,6 +534,15 @@ public class DefaultSqlHandler extends AbstractSqlHandler {
private RelNode doLogicalPlanning(RelNode relNode) throws RelConversionException, SqlUnsupportedException {
+ // 1. Call HepPlanner with directory-based partition pruning, in Calcite logical rel
+ // Partition pruning .
+ ImmutableSet<RelOptRule> dirPruneScanRules = ImmutableSet.<RelOptRule>builder()
+ .addAll(DrillRuleSets.getDirPruneScanRules(context))
+ .build();
+
+ relNode = doHepPlan(relNode, dirPruneScanRules, HepMatchOrder.BOTTOM_UP);
+ log("Post-Dir-Pruning", relNode, logger);
+
if (! context.getPlannerSettings().isHepOptEnabled()) {
return planner.transform(DrillSqlWorker.LOGICAL_RULES, relNode.getTraitSet().plus(DrillRel.DRILL_LOGICAL), relNode);
} else {
http://git-wip-us.apache.org/repos/asf/drill/blob/9b4008dc/exec/java-exec/src/main/java/org/apache/drill/exec/store/dfs/FileSelection.java
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/store/dfs/FileSelection.java b/exec/java-exec/src/main/java/org/apache/drill/exec/store/dfs/FileSelection.java
index 0d49b5c..bc3cef3 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/store/dfs/FileSelection.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/store/dfs/FileSelection.java
@@ -52,7 +52,7 @@ public class FileSelection {
* @param files list of files
* @param selectionRoot root path for selections
*/
- protected FileSelection(final List<FileStatus> statuses, final List<String> files, final String selectionRoot) {
+ public FileSelection(final List<FileStatus> statuses, final List<String> files, final String selectionRoot) {
this.statuses = statuses;
this.files = files;
this.selectionRoot = Preconditions.checkNotNull(selectionRoot);
@@ -246,4 +246,8 @@ public class FileSelection {
return statuses;
}
+ public boolean supportDirPrunig() {
+ return true;
+ }
+
}
http://git-wip-us.apache.org/repos/asf/drill/blob/9b4008dc/exec/java-exec/src/main/java/org/apache/drill/exec/store/dfs/FormatSelection.java
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/store/dfs/FormatSelection.java b/exec/java-exec/src/main/java/org/apache/drill/exec/store/dfs/FormatSelection.java
index 4473c5c..f802fb4 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/store/dfs/FormatSelection.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/store/dfs/FormatSelection.java
@@ -61,4 +61,8 @@ public class FormatSelection {
return selection;
}
+ @JsonIgnore
+ public boolean supportDirPruning() {
+ return selection.supportDirPrunig();
+ }
}
http://git-wip-us.apache.org/repos/asf/drill/blob/9b4008dc/exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet/ParquetFileSelection.java
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet/ParquetFileSelection.java b/exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet/ParquetFileSelection.java
index 33dccd6..93201bb 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet/ParquetFileSelection.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet/ParquetFileSelection.java
@@ -59,4 +59,8 @@ public class ParquetFileSelection extends FileSelection {
return new ParquetFileSelection(selection, metadata);
}
+ @Override
+ public boolean supportDirPrunig() {
+ return false;
+ }
}
http://git-wip-us.apache.org/repos/asf/drill/blob/9b4008dc/exec/java-exec/src/test/java/org/apache/drill/TestExampleQueries.java
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/test/java/org/apache/drill/TestExampleQueries.java b/exec/java-exec/src/test/java/org/apache/drill/TestExampleQueries.java
index 0ca6bb9..97df2ee 100644
--- a/exec/java-exec/src/test/java/org/apache/drill/TestExampleQueries.java
+++ b/exec/java-exec/src/test/java/org/apache/drill/TestExampleQueries.java
@@ -1195,8 +1195,4 @@ public class TestExampleQueries extends BaseTestQuery {
.run();
}
- @Test
- public void t() throws Exception {
- test("explain plan for select a from dfs.tmp.foo where dir0 = 1");
- }
}
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/drill/blob/9b4008dc/exec/java-exec/src/test/java/org/apache/drill/TestPartitionFilter.java
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/test/java/org/apache/drill/TestPartitionFilter.java b/exec/java-exec/src/test/java/org/apache/drill/TestPartitionFilter.java
index ba70788..e5d6603 100644
--- a/exec/java-exec/src/test/java/org/apache/drill/TestPartitionFilter.java
+++ b/exec/java-exec/src/test/java/org/apache/drill/TestPartitionFilter.java
@@ -345,4 +345,20 @@ public class TestPartitionFilter extends PlanTestBase {
testIncludeFilter(query, 1, "Filter", 1);
}
+ @Test
+ public void testLogicalDirPruning() throws Exception {
+ // 1995/Q1 contains one valid parquet, while 1996/Q1 contains bad format parquet.
+ // If dir pruning happens in logical, the query will run fine, since the bad parquet has been pruned before we build ParquetGroupScan.
+ String query = String.format("select dir0, o_custkey from dfs_test.`%s/multilevel/parquetWithBadFormat` where dir0=1995", TEST_RES_PATH);
+ testExcludeFilter(query, 1, "Filter", 10);
+ }
+
+ @Test
+ public void testLogicalDirPruning2() throws Exception {
+ // 1995/Q1 contains one valid parquet, while 1996/Q1 contains bad format parquet.
+ // If dir pruning happens in logical, the query will run fine, since the bad parquet has been pruned before we build ParquetGroupScan.
+ String query = String.format("select dir0, o_custkey from dfs_test.`%s/multilevel/parquetWithBadFormat` where dir0=1995 and o_custkey > 0", TEST_RES_PATH);
+ testIncludeFilter(query, 1, "Filter", 10);
+ }
+
}
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/drill/blob/9b4008dc/exec/java-exec/src/test/resources/multilevel/parquetWithBadFormat/1995/Q1/orders_95_q1.parquet
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/test/resources/multilevel/parquetWithBadFormat/1995/Q1/orders_95_q1.parquet b/exec/java-exec/src/test/resources/multilevel/parquetWithBadFormat/1995/Q1/orders_95_q1.parquet
new file mode 100644
index 0000000..93514c4
Binary files /dev/null and b/exec/java-exec/src/test/resources/multilevel/parquetWithBadFormat/1995/Q1/orders_95_q1.parquet differ
http://git-wip-us.apache.org/repos/asf/drill/blob/9b4008dc/exec/java-exec/src/test/resources/multilevel/parquetWithBadFormat/1996/Q1/badFormat.parquet
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/test/resources/multilevel/parquetWithBadFormat/1996/Q1/badFormat.parquet b/exec/java-exec/src/test/resources/multilevel/parquetWithBadFormat/1996/Q1/badFormat.parquet
new file mode 100644
index 0000000..f62d37d
--- /dev/null
+++ b/exec/java-exec/src/test/resources/multilevel/parquetWithBadFormat/1996/Q1/badFormat.parquet
@@ -0,0 +1 @@
+BAD FORMAT!!!