You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@drill.apache.org by am...@apache.org on 2015/06/19 17:22:27 UTC
drill git commit: DRILL-3298: Make a single stream as input to the
Window by inserting a SingleMergeExchange if only the ORDER-BY clause is
present in a Window.
Repository: drill
Updated Branches:
refs/heads/master 49b6dba5a -> 710f82942
DRILL-3298: Make a single stream as input to the Window by inserting a SingleMergeExchange if only the ORDER-BY clause is present in a Window.
Incorporate review comments. Add unit test.
Project: http://git-wip-us.apache.org/repos/asf/drill/repo
Commit: http://git-wip-us.apache.org/repos/asf/drill/commit/710f8294
Tree: http://git-wip-us.apache.org/repos/asf/drill/tree/710f8294
Diff: http://git-wip-us.apache.org/repos/asf/drill/diff/710f8294
Branch: refs/heads/master
Commit: 710f829429111f90b3977264cf127c43da440901
Parents: 49b6dba
Author: Aman Sinha <as...@maprtech.com>
Authored: Wed Jun 17 14:04:24 2015 -0700
Committer: Aman Sinha <as...@maprtech.com>
Committed: Fri Jun 19 08:04:18 2015 -0700
----------------------------------------------------------------------
.../exec/planner/physical/WindowPrule.java | 37 ++++++++++++++++++--
.../apache/drill/exec/TestWindowFunctions.java | 24 +++++++++++++
2 files changed, 59 insertions(+), 2 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/drill/blob/710f8294/exec/java-exec/src/main/java/org/apache/drill/exec/planner/physical/WindowPrule.java
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/planner/physical/WindowPrule.java b/exec/java-exec/src/main/java/org/apache/drill/exec/planner/physical/WindowPrule.java
index f7728c8..034fd47 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/planner/physical/WindowPrule.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/planner/physical/WindowPrule.java
@@ -29,6 +29,7 @@ import org.apache.calcite.util.BitSets;
import org.apache.drill.exec.planner.logical.DrillRel;
import org.apache.drill.exec.planner.logical.DrillWindowRel;
import org.apache.drill.exec.planner.logical.RelOptHelper;
+import org.apache.drill.exec.planner.physical.DrillDistributionTrait.DistributionField;
import org.apache.calcite.rel.RelCollation;
import org.apache.calcite.rel.RelCollationImpl;
import org.apache.calcite.rel.RelFieldCollation;
@@ -43,7 +44,7 @@ import org.apache.calcite.sql.SqlAggFunction;
import java.util.List;
-public class WindowPrule extends RelOptRule {
+public class WindowPrule extends Prule {
public static final RelOptRule INSTANCE = new WindowPrule();
private WindowPrule() {
@@ -59,6 +60,7 @@ public class WindowPrule extends RelOptRule {
//input.getTraitSet().subsumes()
boolean partitionby = false;
+ boolean addMerge = false;
for (final Ord<Window.Group> w : Ord.zip(window.groups)) {
Window.Group windowBase = w.getValue();
RelTraitSet traits = call.getPlanner().emptyTraitSet().plus(Prel.DRILL_PHYSICAL);
@@ -69,6 +71,18 @@ public class WindowPrule extends RelOptRule {
partitionby = true;
traits = traits.plus(distOnAllKeys);
+ } else if (windowBase.orderKeys.getFieldCollations().size() > 0) {
+ // if only the order-by clause is specified, there is a single partition
+ // consisting of all the rows, so we do a distributed sort followed by a
+ // single merge as the input of the window operator
+ DrillDistributionTrait distKeys =
+ new DrillDistributionTrait(DrillDistributionTrait.DistributionType.HASH_DISTRIBUTED,
+ ImmutableList.copyOf(getDistributionFieldsFromCollation(windowBase)));
+
+ traits = traits.plus(distKeys);
+ if(!isSingleMode(call)) {
+ addMerge = true;
+ }
}
// Add collation trait if either partition-by or order-by is specified.
@@ -77,7 +91,13 @@ public class WindowPrule extends RelOptRule {
traits = traits.plus(collation);
}
- final RelNode convertedInput = convert(input, traits);
+ RelNode convertedInput = convert(input, traits);
+
+ if (addMerge) {
+ traits = traits.plus(DrillDistributionTrait.SINGLETON);
+ convertedInput = new SingleMergeExchangePrel(window.getCluster(), traits,
+ convertedInput, windowBase.collation());
+ }
List<RelDataTypeField> newRowFields = Lists.newArrayList();
for(RelDataTypeField field : convertedInput.getRowType().getFieldList()) {
@@ -150,6 +170,19 @@ public class WindowPrule extends RelOptRule {
DrillDistributionTrait.DistributionField field = new DrillDistributionTrait.DistributionField(group);
groupByFields.add(field);
}
+
return groupByFields;
}
+
+ private List<DistributionField> getDistributionFieldsFromCollation(Window.Group window) {
+ List<DistributionField> distFields = Lists.newArrayList();
+
+ for (RelFieldCollation relField : window.collation().getFieldCollations()) {
+ DistributionField field = new DistributionField(relField.getFieldIndex());
+ distFields.add(field);
+ }
+
+ return distFields;
+ }
+
}
http://git-wip-us.apache.org/repos/asf/drill/blob/710f8294/exec/java-exec/src/test/java/org/apache/drill/exec/TestWindowFunctions.java
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/test/java/org/apache/drill/exec/TestWindowFunctions.java b/exec/java-exec/src/test/java/org/apache/drill/exec/TestWindowFunctions.java
index dc34632..9d660c3 100644
--- a/exec/java-exec/src/test/java/org/apache/drill/exec/TestWindowFunctions.java
+++ b/exec/java-exec/src/test/java/org/apache/drill/exec/TestWindowFunctions.java
@@ -19,11 +19,15 @@ package org.apache.drill.exec;
import org.apache.drill.BaseTestQuery;
import org.apache.drill.common.exceptions.UserException;
+import org.apache.drill.common.util.TestTools;
import org.apache.drill.exec.work.foreman.SqlUnsupportedException;
import org.apache.drill.exec.work.foreman.UnsupportedFunctionException;
import org.junit.Test;
public class TestWindowFunctions extends BaseTestQuery {
+ static final String WORKING_PATH = TestTools.getWorkingPath();
+ static final String TEST_RES_PATH = WORKING_PATH + "/src/test/resources";
+
private static void throwAsUnsupportedException(UserException ex) throws Exception {
SqlUnsupportedException.errorClassNameToException(ex.getOrCreatePBError(false).getException().getExceptionClass());
throw ex;
@@ -223,4 +227,24 @@ public class TestWindowFunctions extends BaseTestQuery {
test(query);
}
+
+ @Test // DRILL-3298
+ public void testCountEmptyPartitionByWithExchange() throws Exception {
+ String query = String.format("select count(*) over (order by o_orderpriority) as cnt from dfs.`%s/multilevel/parquet` where o_custkey < 100", TEST_RES_PATH);
+ try {
+ testBuilder()
+ .sqlQuery(query)
+ .ordered()
+ .baselineColumns("cnt")
+ .optionSettingQueriesForTestQuery("alter session set `planner.slice_target` = 1")
+ .baselineValues(1l)
+ .baselineValues(4l)
+ .baselineValues(4l)
+ .baselineValues(4l)
+ .build().run();
+ } finally {
+ test("alter session set `planner.slice_target` = " + ExecConstants.SLICE_TARGET_DEFAULT);
+ }
+ }
+
}