You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@drill.apache.org by jn...@apache.org on 2016/01/30 05:38:32 UTC
[2/4] drill git commit: DRILL-2517: (Prototype from Mehant) Move
directory based partition pruning to logical phase.
DRILL-2517: (Prototype from Mehant) Move directory based partition pruning to logical phase.
Project: http://git-wip-us.apache.org/repos/asf/drill/repo
Commit: http://git-wip-us.apache.org/repos/asf/drill/commit/c7dfba2e
Tree: http://git-wip-us.apache.org/repos/asf/drill/tree/c7dfba2e
Diff: http://git-wip-us.apache.org/repos/asf/drill/diff/c7dfba2e
Branch: refs/heads/master
Commit: c7dfba2e62fc6b063e953bd9cb1fc5b2903edc79
Parents: 09de31e
Author: Mehant Baid <me...@gmail.com>
Authored: Tue Nov 10 22:26:26 2015 -0800
Committer: Jinfeng Ni <jn...@apache.org>
Committed: Fri Jan 29 19:30:27 2016 -0800
----------------------------------------------------------------------
.../HivePushPartitionFilterIntoScan.java | 9 +-
.../planner/FileSystemPartitionDescriptor.java | 27 +-
.../drill/exec/planner/logical/DrillTable.java | 8 +-
.../logical/partition/ParquetPruneScanRule.java | 9 +-
.../logical/partition/PruneScanRule.java | 268 +++++++++++++++++--
.../drill/exec/store/dfs/FileSelection.java | 4 +
.../org/apache/drill/TestExampleQueries.java | 7 +-
7 files changed, 285 insertions(+), 47 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/drill/blob/c7dfba2e/contrib/storage-hive/core/src/main/java/org/apache/drill/exec/planner/sql/logical/HivePushPartitionFilterIntoScan.java
----------------------------------------------------------------------
diff --git a/contrib/storage-hive/core/src/main/java/org/apache/drill/exec/planner/sql/logical/HivePushPartitionFilterIntoScan.java b/contrib/storage-hive/core/src/main/java/org/apache/drill/exec/planner/sql/logical/HivePushPartitionFilterIntoScan.java
index fc2007e..17cd65a 100644
--- a/contrib/storage-hive/core/src/main/java/org/apache/drill/exec/planner/sql/logical/HivePushPartitionFilterIntoScan.java
+++ b/contrib/storage-hive/core/src/main/java/org/apache/drill/exec/planner/sql/logical/HivePushPartitionFilterIntoScan.java
@@ -18,6 +18,7 @@
package org.apache.drill.exec.planner.sql.logical;
+import org.apache.calcite.rel.RelNode;
import org.apache.drill.exec.ops.OptimizerRulesContext;
import org.apache.drill.exec.physical.base.GroupScan;
import org.apache.drill.exec.planner.PartitionDescriptor;
@@ -43,8 +44,8 @@ public abstract class HivePushPartitionFilterIntoScan {
optimizerRulesContext) {
@Override
- public PartitionDescriptor getPartitionDescriptor(PlannerSettings settings, DrillScanRel scanRel) {
- return new HivePartitionDescriptor(settings, scanRel, getOptimizerRulesContext().getManagedBuffer(),
+ public PartitionDescriptor getPartitionDescriptor(PlannerSettings settings, RelNode scanRel) {
+ return new HivePartitionDescriptor(settings, (DrillScanRel) scanRel, getOptimizerRulesContext().getManagedBuffer(),
defaultPartitionValue);
}
@@ -77,8 +78,8 @@ public abstract class HivePushPartitionFilterIntoScan {
"HivePushPartitionFilterIntoScan:Filter_On_Scan_Hive", optimizerRulesContext) {
@Override
- public PartitionDescriptor getPartitionDescriptor(PlannerSettings settings, DrillScanRel scanRel) {
- return new HivePartitionDescriptor(settings, scanRel, getOptimizerRulesContext().getManagedBuffer(),
+ public PartitionDescriptor getPartitionDescriptor(PlannerSettings settings, RelNode scanRel) {
+ return new HivePartitionDescriptor(settings, (DrillScanRel) scanRel, getOptimizerRulesContext().getManagedBuffer(),
defaultPartitionValue);
}
http://git-wip-us.apache.org/repos/asf/drill/blob/c7dfba2e/exec/java-exec/src/main/java/org/apache/drill/exec/planner/FileSystemPartitionDescriptor.java
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/planner/FileSystemPartitionDescriptor.java b/exec/java-exec/src/main/java/org/apache/drill/exec/planner/FileSystemPartitionDescriptor.java
index c4e4cb9..02658ed 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/planner/FileSystemPartitionDescriptor.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/planner/FileSystemPartitionDescriptor.java
@@ -27,6 +27,8 @@ import com.google.common.base.Charsets;
import com.google.common.collect.Lists;
import com.google.common.collect.Maps;
+import org.apache.calcite.adapter.enumerable.EnumerableTableScan;
+import org.apache.calcite.rel.RelNode;
import org.apache.calcite.util.BitSets;
import org.apache.drill.common.exceptions.DrillRuntimeException;
import org.apache.drill.common.expression.SchemaPath;
@@ -35,6 +37,7 @@ import org.apache.drill.common.types.Types;
import org.apache.drill.exec.physical.base.FileGroupScan;
import org.apache.drill.exec.physical.base.GroupScan;
import org.apache.drill.exec.planner.logical.DrillScanRel;
+import org.apache.drill.exec.planner.logical.DynamicDrillTable;
import org.apache.drill.exec.planner.physical.PlannerSettings;
import org.apache.drill.exec.store.dfs.FileSelection;
import org.apache.drill.exec.store.dfs.FormatSelection;
@@ -50,12 +53,14 @@ public class FileSystemPartitionDescriptor extends AbstractPartitionDescriptor {
private final String partitionLabel;
private final int partitionLabelLength;
private final Map<String, Integer> partitions = Maps.newHashMap();
- private final DrillScanRel scanRel;
+ private final EnumerableTableScan scanRel;
+ private final DynamicDrillTable table;
- public FileSystemPartitionDescriptor(PlannerSettings settings, DrillScanRel scanRel) {
+ public FileSystemPartitionDescriptor(PlannerSettings settings, RelNode scanRel) {
this.partitionLabel = settings.getFsPartitionColumnLabel();
this.partitionLabelLength = partitionLabel.length();
- this.scanRel = scanRel;
+ this.scanRel = (EnumerableTableScan) scanRel;
+ table = scanRel.getTable().unwrap(DynamicDrillTable.class);
for(int i =0; i < 10; i++){
partitions.put(partitionLabel + i, i);
}
@@ -84,9 +89,17 @@ public class FileSystemPartitionDescriptor extends AbstractPartitionDescriptor {
@Override
public GroupScan createNewGroupScan(List<String> newFiles) throws IOException {
- final FileSelection newSelection = FileSelection.create(null, newFiles, getBaseTableLocation());
- final FileGroupScan newScan = ((FileGroupScan)scanRel.getGroupScan()).clone(newSelection);
+ /*
+ THIS NEEDS TO CHANGE. WE SHOULD RETURN A ENUMERABLETABLESCAN??
+ final FileSelection newFileSelection = new FileSelection(newFiles, getBaseTableLocation(), true);
+ final FileGroupScan newScan = ((FileGroupScan)scanRel.getGroupScan()).clone(newFileSelection);
return newScan;
+ */
+ return null;
+ }
+
+ public DynamicDrillTable getTable() {
+ return table;
}
@Override
@@ -124,13 +137,13 @@ public class FileSystemPartitionDescriptor extends AbstractPartitionDescriptor {
}
private String getBaseTableLocation() {
- final FormatSelection origSelection = (FormatSelection) scanRel.getDrillTable().getSelection();
+ final FormatSelection origSelection = (FormatSelection) table.getSelection();
return origSelection.getSelection().selectionRoot;
}
@Override
protected void createPartitionSublists() {
- List<String> fileLocations = ((FormatSelection) scanRel.getDrillTable().getSelection()).getAsFiles();
+ List<String> fileLocations = ((FormatSelection) table.getSelection()).getAsFiles();
List<PartitionLocation> locations = new LinkedList<>();
for (String file: fileLocations) {
locations.add(new DFSPartitionLocation(MAX_NESTED_SUBDIRS, getBaseTableLocation(), file));
http://git-wip-us.apache.org/repos/asf/drill/blob/c7dfba2e/exec/java-exec/src/main/java/org/apache/drill/exec/planner/logical/DrillTable.java
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/planner/logical/DrillTable.java b/exec/java-exec/src/main/java/org/apache/drill/exec/planner/logical/DrillTable.java
index 106290d..1cb83b1 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/planner/logical/DrillTable.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/planner/logical/DrillTable.java
@@ -85,14 +85,14 @@ public abstract class DrillTable implements Table {
return selection;
}
- public void modifySelection(Object selection) {
- this.selection = selection;
- }
-
public String getStorageEngineName() {
return storageEngineName;
}
+ public String getUserName() {
+ return userName;
+ }
+
@Override
public Statistic getStatistic() {
return Statistics.UNKNOWN;
http://git-wip-us.apache.org/repos/asf/drill/blob/c7dfba2e/exec/java-exec/src/main/java/org/apache/drill/exec/planner/logical/partition/ParquetPruneScanRule.java
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/planner/logical/partition/ParquetPruneScanRule.java b/exec/java-exec/src/main/java/org/apache/drill/exec/planner/logical/partition/ParquetPruneScanRule.java
index b4f4e95..d92d2b0 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/planner/logical/partition/ParquetPruneScanRule.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/planner/logical/partition/ParquetPruneScanRule.java
@@ -19,6 +19,7 @@ package org.apache.drill.exec.planner.logical.partition;
import org.apache.calcite.plan.RelOptRule;
import org.apache.calcite.plan.RelOptRuleCall;
+import org.apache.calcite.rel.RelNode;
import org.apache.drill.exec.ops.OptimizerRulesContext;
import org.apache.drill.exec.physical.base.FileGroupScan;
import org.apache.drill.exec.physical.base.GroupScan;
@@ -41,8 +42,8 @@ public class ParquetPruneScanRule {
optimizerRulesContext) {
@Override
- public PartitionDescriptor getPartitionDescriptor(PlannerSettings settings, DrillScanRel scanRel) {
- return new ParquetPartitionDescriptor(settings, scanRel);
+ public PartitionDescriptor getPartitionDescriptor(PlannerSettings settings, RelNode scanRel) {
+ return new ParquetPartitionDescriptor(settings, (DrillScanRel) scanRel);
}
@Override
@@ -73,8 +74,8 @@ public class ParquetPruneScanRule {
"PruneScanRule:Filter_On_Scan_Parquet", optimizerRulesContext) {
@Override
- public PartitionDescriptor getPartitionDescriptor(PlannerSettings settings, DrillScanRel scanRel) {
- return new ParquetPartitionDescriptor(settings, scanRel);
+ public PartitionDescriptor getPartitionDescriptor(PlannerSettings settings, RelNode scanRel) {
+ return new ParquetPartitionDescriptor(settings, (DrillScanRel) scanRel);
}
@Override
http://git-wip-us.apache.org/repos/asf/drill/blob/c7dfba2e/exec/java-exec/src/main/java/org/apache/drill/exec/planner/logical/partition/PruneScanRule.java
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/planner/logical/partition/PruneScanRule.java b/exec/java-exec/src/main/java/org/apache/drill/exec/planner/logical/partition/PruneScanRule.java
index 4cc9c46..bfd6597 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/planner/logical/partition/PruneScanRule.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/planner/logical/partition/PruneScanRule.java
@@ -25,12 +25,14 @@ import java.util.concurrent.TimeUnit;
import com.google.common.base.Stopwatch;
import org.apache.calcite.adapter.enumerable.EnumerableTableScan;
-import org.apache.calcite.rel.core.Filter;
+import org.apache.calcite.prepare.RelOptTableImpl;
import org.apache.calcite.rel.core.Project;
+import org.apache.calcite.rel.logical.LogicalFilter;
import org.apache.calcite.rex.RexUtil;
import org.apache.calcite.util.BitSets;
import org.apache.drill.common.expression.ErrorCollectorImpl;
+import org.apache.drill.common.expression.ExpressionFunction;
import org.apache.drill.common.expression.LogicalExpression;
import org.apache.drill.common.expression.SchemaPath;
import org.apache.drill.common.types.TypeProtos;
@@ -53,13 +55,15 @@ import org.apache.drill.exec.planner.logical.DrillParseContext;
import org.apache.drill.exec.planner.logical.DrillProjectRel;
import org.apache.drill.exec.planner.logical.DrillRel;
import org.apache.drill.exec.planner.logical.DrillScanRel;
-import org.apache.drill.exec.planner.logical.DrillTable;
+import org.apache.drill.exec.planner.logical.DynamicDrillTable;
import org.apache.drill.exec.planner.logical.RelOptHelper;
import org.apache.drill.exec.planner.physical.PlannerSettings;
import org.apache.drill.exec.planner.physical.PrelUtil;
import org.apache.drill.exec.record.MaterializedField;
import org.apache.drill.exec.record.VectorContainer;
import org.apache.drill.exec.store.StoragePluginOptimizerRule;
+import org.apache.drill.exec.store.dfs.FileSelection;
+import org.apache.drill.exec.store.dfs.FormatSelection;
import org.apache.drill.exec.vector.NullableBitVector;
import org.apache.calcite.rel.RelNode;
import org.apache.calcite.plan.RelOptRule;
@@ -72,6 +76,7 @@ import com.google.common.collect.Lists;
import com.google.common.collect.Maps;
import org.apache.drill.exec.vector.ValueVector;
+
public abstract class PruneScanRule extends StoragePluginOptimizerRule {
static final org.slf4j.Logger logger = org.slf4j.LoggerFactory.getLogger(PruneScanRule.class);
@@ -84,71 +89,280 @@ public abstract class PruneScanRule extends StoragePluginOptimizerRule {
public static final RelOptRule getFilterOnProject(OptimizerRulesContext optimizerRulesContext) {
return new PruneScanRule(
- RelOptHelper.some(Filter.class, RelOptHelper.some(Project.class, RelOptHelper.any(EnumerableTableScan.class))),
+ RelOptHelper.some(LogicalFilter.class, RelOptHelper.some(Project.class, RelOptHelper.any(EnumerableTableScan.class))),
"PruneScanRule:Filter_On_Project",
optimizerRulesContext) {
@Override
- public PartitionDescriptor getPartitionDescriptor(PlannerSettings settings, DrillScanRel scanRel) {
+ public PartitionDescriptor getPartitionDescriptor(PlannerSettings settings, RelNode scanRel) {
return new FileSystemPartitionDescriptor(settings, scanRel);
}
@Override
public boolean matches(RelOptRuleCall call) {
+ /*
final DrillScanRel scan = (DrillScanRel) call.rel(2);
GroupScan groupScan = scan.getGroupScan();
// this rule is applicable only for dfs based partition pruning
- if (PrelUtil.getPlannerSettings(scan.getCluster().getPlanner()).isHepPartitionPruningEnabled()) {
- return groupScan instanceof FileGroupScan && groupScan.supportsPartitionFilterPushdown() && !scan.partitionFilterPushdown();
- } else {
- return groupScan instanceof FileGroupScan && groupScan.supportsPartitionFilterPushdown();
- }
+ return groupScan instanceof FileGroupScan && groupScan.supportsPartitionFilterPushdown();
+ */
+ return true;
}
@Override
public void onMatch(RelOptRuleCall call) {
- final FilterRel filterRel = (DrillFilterRel) call.rel(0);
- final ProjectRel projectRel = (DrillProjectRel) call.rel(1);
- final ScanRel scanRel = (DrillScanRel) call.rel(2);
- doOnMatch(call, filterRel, projectRel, scanRel);
+ final LogicalFilter filterRel = call.rel(0);
+ final Project projectRel = call.rel(1);
+ final EnumerableTableScan scanRel = call.rel(2);
+ doOnMatchLogical(call, filterRel, projectRel, scanRel);
}
};
}
public static final RelOptRule getFilterOnScan(OptimizerRulesContext optimizerRulesContext) {
return new PruneScanRule(
- RelOptHelper.some(Filter.class, RelOptHelper.any(EnumerableTableScan.class)),
+ RelOptHelper.some(LogicalFilter.class, RelOptHelper.any(EnumerableTableScan.class)),
"PruneScanRule:Filter_On_Scan", optimizerRulesContext) {
@Override
- public PartitionDescriptor getPartitionDescriptor(PlannerSettings settings, DrillScanRel scanRel) {
+ public PartitionDescriptor getPartitionDescriptor(PlannerSettings settings, RelNode scanRel) {
return new FileSystemPartitionDescriptor(settings, scanRel);
}
@Override
public boolean matches(RelOptRuleCall call) {
- final DrillScanRel scan = (DrillScanRel) call.rel(1);
+ /* final DrillScanRel scan = (DrillScanRel) call.rel(1);
GroupScan groupScan = scan.getGroupScan();
// this rule is applicable only for dfs based partition pruning
- if (PrelUtil.getPlannerSettings(scan.getCluster().getPlanner()).isHepPartitionPruningEnabled()) {
- return groupScan instanceof FileGroupScan && groupScan.supportsPartitionFilterPushdown() && !scan.partitionFilterPushdown();
- } else {
- return groupScan instanceof FileGroupScan && groupScan.supportsPartitionFilterPushdown();
- }
+ return groupScan instanceof FileGroupScan && groupScan.supportsPartitionFilterPushdown();
+ */
+ return true;
}
@Override
public void onMatch(RelOptRuleCall call) {
- final DrillFilterRel filterRel = (DrillFilterRel) call.rel(0);
- final DrillScanRel scanRel = (DrillScanRel) call.rel(1);
- doOnMatch(call, filterRel, null, scanRel);
+ final LogicalFilter filterRel = call.rel(0);
+ final EnumerableTableScan scanRel = call.rel(1);
+ doOnMatchLogical(call, filterRel, null, scanRel);
}
};
}
- protected void doOnMatch(RelOptRuleCall call, Filter filterRel, Project projectRel, DrillScanRel scanRel) {
- final DrillTable table = scanRel.getTable().unwrap(DrillTable.class);
+ // TODO: Combine the doOnMatch and doOnMatchLogical
+ protected void doOnMatchLogical(RelOptRuleCall call, LogicalFilter filterRel, Project projectRel, EnumerableTableScan scanRel) {
+ final String pruningClassName = getClass().getName();
+ logger.info("Beginning partition pruning, pruning class: {}", pruningClassName);
+ Stopwatch totalPruningTime = new Stopwatch();
+ totalPruningTime.start();
+
+
+ final PlannerSettings settings = PrelUtil.getPlannerSettings(call.getPlanner());
+ PartitionDescriptor descriptor = getPartitionDescriptor(settings, scanRel);
+ final BufferAllocator allocator = optimizerContext.getAllocator();
+
+
+ RexNode condition = null;
+ if (projectRel == null) {
+ condition = filterRel.getCondition();
+ } else {
+ // get the filter as if it were below the projection.
+ condition = RelOptUtil.pushFilterPastProject(filterRel.getCondition(), projectRel);
+ }
+
+ RewriteAsBinaryOperators visitor = new RewriteAsBinaryOperators(true, filterRel.getCluster().getRexBuilder());
+ condition = condition.accept(visitor);
+
+ Map<Integer, String> fieldNameMap = Maps.newHashMap();
+ List<String> fieldNames = scanRel.getRowType().getFieldNames();
+ BitSet columnBitset = new BitSet();
+ BitSet partitionColumnBitSet = new BitSet();
+
+ int relColIndex = 0;
+ for (String field : fieldNames) {
+ final Integer partitionIndex = descriptor.getIdIfValid(field);
+ if (partitionIndex != null) {
+ fieldNameMap.put(partitionIndex, field);
+ partitionColumnBitSet.set(partitionIndex);
+ columnBitset.set(relColIndex);
+ }
+ relColIndex++;
+ }
+
+ if (partitionColumnBitSet.isEmpty()) {
+ logger.info("No partition columns are projected from the scan..continue. " +
+ "Total pruning elapsed time: {} ms", totalPruningTime.elapsed(TimeUnit.MILLISECONDS));
+ return;
+ }
+
+ // stop watch to track how long we spend in different phases of pruning
+ Stopwatch miscTimer = new Stopwatch();
+
+ // track how long we spend building the filter tree
+ miscTimer.start();
+
+ FindPartitionConditions c = new FindPartitionConditions(columnBitset, filterRel.getCluster().getRexBuilder());
+ c.analyze(condition);
+ RexNode pruneCondition = c.getFinalCondition();
+
+ logger.info("Total elapsed time to build and analyze filter tree: {} ms",
+ miscTimer.elapsed(TimeUnit.MILLISECONDS));
+ miscTimer.reset();
+
+ if (pruneCondition == null) {
+ logger.info("No conditions were found eligible for partition pruning." +
+ "Total pruning elapsed time: {} ms", totalPruningTime.elapsed(TimeUnit.MILLISECONDS));
+ return;
+ }
+
+ // set up the partitions
+ List<String> newFiles = Lists.newArrayList();
+ long numTotal = 0; // total number of partitions
+ int batchIndex = 0;
+ String firstLocation = null;
+ LogicalExpression materializedExpr = null;
+
+ // Outer loop: iterate over a list of batches of PartitionLocations
+ for (List<PartitionLocation> partitions : descriptor) {
+ numTotal += partitions.size();
+ logger.debug("Evaluating partition pruning for batch {}", batchIndex);
+ if (batchIndex == 0) { // save the first location in case everything is pruned
+ firstLocation = partitions.get(0).getEntirePartitionLocation();
+ }
+ final NullableBitVector output = new NullableBitVector(MaterializedField.create("", Types.optional(MinorType.BIT)), allocator);
+ final VectorContainer container = new VectorContainer();
+
+ try {
+ final ValueVector[] vectors = new ValueVector[descriptor.getMaxHierarchyLevel()];
+ for (int partitionColumnIndex : BitSets.toIter(partitionColumnBitSet)) {
+ SchemaPath column = SchemaPath.getSimplePath(fieldNameMap.get(partitionColumnIndex));
+ MajorType type = descriptor.getVectorType(column, settings);
+ MaterializedField field = MaterializedField.create(column, type);
+ ValueVector v = TypeHelper.getNewVector(field, allocator);
+ v.allocateNew();
+ vectors[partitionColumnIndex] = v;
+ container.add(v);
+ }
+
+ // track how long we spend populating partition column vectors
+ miscTimer.start();
+
+ // populate partition vectors.
+ descriptor.populatePartitionVectors(vectors, partitions, partitionColumnBitSet, fieldNameMap);
+
+ logger.info("Elapsed time to populate partitioning column vectors: {} ms within batchIndex: {}",
+ miscTimer.elapsed(TimeUnit.MILLISECONDS), batchIndex);
+ miscTimer.reset();
+
+ // materialize the expression; only need to do this once
+ if (batchIndex == 0) {
+ materializedExpr = materializePruneExpr(pruneCondition, settings, scanRel, container);
+ if (materializedExpr == null) {
+ // continue without partition pruning; no need to log anything here since
+ // materializePruneExpr logs it already
+ logger.info("Total pruning elapsed time: {} ms",
+ totalPruningTime.elapsed(TimeUnit.MILLISECONDS));
+ return;
+ }
+ }
+
+ output.allocateNew(partitions.size());
+
+ // start the timer to evaluate how long we spend in the interpreter evaluation
+ miscTimer.start();
+
+ InterpreterEvaluator.evaluate(partitions.size(), optimizerContext, container, output, materializedExpr);
+
+ logger.info("Elapsed time in interpreter evaluation: {} ms within batchIndex: {}",
+ miscTimer.elapsed(TimeUnit.MILLISECONDS), batchIndex);
+ miscTimer.reset();
+
+ int recordCount = 0;
+ int qualifiedCount = 0;
+
+ // Inner loop: within each batch iterate over the PartitionLocations
+ for(PartitionLocation part: partitions){
+ if(!output.getAccessor().isNull(recordCount) && output.getAccessor().get(recordCount) == 1){
+ newFiles.add(part.getEntirePartitionLocation());
+ qualifiedCount++;
+ }
+ recordCount++;
+ }
+ logger.debug("Within batch {}: total records: {}, qualified records: {}", batchIndex, recordCount, qualifiedCount);
+ batchIndex++;
+ } catch (Exception e) {
+ logger.warn("Exception while trying to prune partition.", e);
+ logger.info("Total pruning elapsed time: {} ms", totalPruningTime.elapsed(TimeUnit.MILLISECONDS));
+ return; // continue without partition pruning
+ } finally {
+ container.clear();
+ if (output != null) {
+ output.clear();
+ }
+ }
+ }
+
+ try {
+
+ boolean canDropFilter = true;
+
+ if (newFiles.isEmpty()) {
+ assert firstLocation != null;
+ newFiles.add(firstLocation);
+ canDropFilter = false;
+ }
+
+ if (newFiles.size() == numTotal) {
+ logger.info("No partitions were eligible for pruning");
+ return;
+ }
+
+ logger.info("Pruned {} partitions down to {}", numTotal, newFiles.size());
+
+ List<RexNode> conjuncts = RelOptUtil.conjunctions(condition);
+ List<RexNode> pruneConjuncts = RelOptUtil.conjunctions(pruneCondition);
+ conjuncts.removeAll(pruneConjuncts);
+ RexNode newCondition = RexUtil.composeConjunction(filterRel.getCluster().getRexBuilder(), conjuncts, false);
+
+ RewriteCombineBinaryOperators reverseVisitor = new RewriteCombineBinaryOperators(true, filterRel.getCluster().getRexBuilder());
+
+ condition = condition.accept(reverseVisitor);
+ pruneCondition = pruneCondition.accept(reverseVisitor);
+
+ RelOptTableImpl t = (RelOptTableImpl) scanRel.getTable();
+ DynamicDrillTable oldTable = ((FileSystemPartitionDescriptor) descriptor).getTable();
+ FormatSelection formatSelection = (FormatSelection) oldTable.getSelection();
+ FileSelection oldFileSelection = formatSelection.getSelection();
+ FileSelection newFileSelection = new FileSelection(newFiles, oldFileSelection.selectionRoot, oldFileSelection.getParquetMetadata(), oldFileSelection.getFileStatuses());
+ FormatSelection newFormatSelection = new FormatSelection(formatSelection.getFormat(), newFileSelection);
+ DynamicDrillTable newTable = new DynamicDrillTable(oldTable.getPlugin(), oldTable.getStorageEngineName(),
+ oldTable.getUserName(), newFormatSelection);
+ RelOptTableImpl newOptTableImpl = RelOptTableImpl.create(t.getRelOptSchema(), t.getRowType(), newTable);
+
+ // TODO: The new scan should come from the PartitionDescriptor
+ // TODO: Update the PartitionDescriptor to return ScanRel instead of the GroupScan
+ EnumerableTableScan newScan = EnumerableTableScan.create(scanRel.getCluster(), newOptTableImpl);
+
+ RelNode inputRel = newScan;
+
+ if (projectRel != null) {
+ inputRel = projectRel.copy(projectRel.getTraitSet(), Collections.singletonList(inputRel));
+ }
+
+ if (newCondition.isAlwaysTrue() && canDropFilter) {
+ call.transformTo(inputRel);
+ } else {
+ final RelNode newFilter = filterRel.copy(filterRel.getTraitSet(), Collections.singletonList(inputRel));
+ call.transformTo(newFilter);
+ }
+
+ } catch (Exception e) {
+ logger.warn("Exception while using the pruned partitions.", e);
+ } finally {
+ logger.info("Total pruning elapsed time: {} ms", totalPruningTime.elapsed(TimeUnit.MILLISECONDS));
+ }
+ }
+ protected void doOnMatch(RelOptRuleCall call, DrillFilterRel filterRel, DrillProjectRel projectRel, DrillScanRel scanRel) {
final String pruningClassName = getClass().getName();
logger.info("Beginning partition pruning, pruning class: {}", pruningClassName);
Stopwatch totalPruningTime = new Stopwatch();
@@ -388,5 +602,5 @@ public abstract class PruneScanRule extends StoragePluginOptimizerRule {
return optimizerContext;
}
- public abstract PartitionDescriptor getPartitionDescriptor(PlannerSettings settings, DrillScanRel scanRel);
+ public abstract PartitionDescriptor getPartitionDescriptor(PlannerSettings settings, RelNode scanRel);
}
http://git-wip-us.apache.org/repos/asf/drill/blob/c7dfba2e/exec/java-exec/src/main/java/org/apache/drill/exec/store/dfs/FileSelection.java
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/store/dfs/FileSelection.java b/exec/java-exec/src/main/java/org/apache/drill/exec/store/dfs/FileSelection.java
index 6df3ffc..0d49b5c 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/store/dfs/FileSelection.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/store/dfs/FileSelection.java
@@ -242,4 +242,8 @@ public class FileSelection {
}
}
+ public List<FileStatus> getFileStatuses() {
+ return statuses;
+ }
+
}
http://git-wip-us.apache.org/repos/asf/drill/blob/c7dfba2e/exec/java-exec/src/test/java/org/apache/drill/TestExampleQueries.java
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/test/java/org/apache/drill/TestExampleQueries.java b/exec/java-exec/src/test/java/org/apache/drill/TestExampleQueries.java
index 0b52b9e..0ca6bb9 100644
--- a/exec/java-exec/src/test/java/org/apache/drill/TestExampleQueries.java
+++ b/exec/java-exec/src/test/java/org/apache/drill/TestExampleQueries.java
@@ -1019,7 +1019,7 @@ public class TestExampleQueries extends BaseTestQuery {
.sqlQuery(query)
.expectsEmptyResultSet()
.optionSettingQueriesForTestQuery("ALTER SESSION SET `planner.enable_hashjoin` = false; " +
- "ALTER SESSION SET `planner.disable_exchanges` = true")
+ "ALTER SESSION SET `planner.disable_exchanges` = true")
.build()
.run();
@@ -1194,4 +1194,9 @@ public class TestExampleQueries extends BaseTestQuery {
.build()
.run();
}
+
+ @Test
+ public void t() throws Exception {
+ test("explain plan for select a from dfs.tmp.foo where dir0 = 1");
+ }
}
\ No newline at end of file