You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@drill.apache.org by am...@apache.org on 2015/06/19 17:22:27 UTC

drill git commit: DRILL-3298: Make a single stream as input to the Window by inserting a SingleMergeExchange if only the ORDER-BY clause is present in a Window.

Repository: drill
Updated Branches:
  refs/heads/master 49b6dba5a -> 710f82942


DRILL-3298:  Make a single stream as input to the Window by inserting a SingleMergeExchange if only the ORDER-BY clause is present in a Window.

Incorporate review comments.  Add unit test.


Project: http://git-wip-us.apache.org/repos/asf/drill/repo
Commit: http://git-wip-us.apache.org/repos/asf/drill/commit/710f8294
Tree: http://git-wip-us.apache.org/repos/asf/drill/tree/710f8294
Diff: http://git-wip-us.apache.org/repos/asf/drill/diff/710f8294

Branch: refs/heads/master
Commit: 710f829429111f90b3977264cf127c43da440901
Parents: 49b6dba
Author: Aman Sinha <as...@maprtech.com>
Authored: Wed Jun 17 14:04:24 2015 -0700
Committer: Aman Sinha <as...@maprtech.com>
Committed: Fri Jun 19 08:04:18 2015 -0700

----------------------------------------------------------------------
 .../exec/planner/physical/WindowPrule.java      | 37 ++++++++++++++++++--
 .../apache/drill/exec/TestWindowFunctions.java  | 24 +++++++++++++
 2 files changed, 59 insertions(+), 2 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/drill/blob/710f8294/exec/java-exec/src/main/java/org/apache/drill/exec/planner/physical/WindowPrule.java
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/planner/physical/WindowPrule.java b/exec/java-exec/src/main/java/org/apache/drill/exec/planner/physical/WindowPrule.java
index f7728c8..034fd47 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/planner/physical/WindowPrule.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/planner/physical/WindowPrule.java
@@ -29,6 +29,7 @@ import org.apache.calcite.util.BitSets;
 import org.apache.drill.exec.planner.logical.DrillRel;
 import org.apache.drill.exec.planner.logical.DrillWindowRel;
 import org.apache.drill.exec.planner.logical.RelOptHelper;
+import org.apache.drill.exec.planner.physical.DrillDistributionTrait.DistributionField;
 import org.apache.calcite.rel.RelCollation;
 import org.apache.calcite.rel.RelCollationImpl;
 import org.apache.calcite.rel.RelFieldCollation;
@@ -43,7 +44,7 @@ import org.apache.calcite.sql.SqlAggFunction;
 
 import java.util.List;
 
-public class WindowPrule extends RelOptRule {
+public class WindowPrule extends Prule {
   public static final RelOptRule INSTANCE = new WindowPrule();
 
   private WindowPrule() {
@@ -59,6 +60,7 @@ public class WindowPrule extends RelOptRule {
     //input.getTraitSet().subsumes()
 
     boolean partitionby = false;
+    boolean addMerge = false;
     for (final Ord<Window.Group> w : Ord.zip(window.groups)) {
       Window.Group windowBase = w.getValue();
       RelTraitSet traits = call.getPlanner().emptyTraitSet().plus(Prel.DRILL_PHYSICAL);
@@ -69,6 +71,18 @@ public class WindowPrule extends RelOptRule {
 
         partitionby = true;
         traits = traits.plus(distOnAllKeys);
+      } else if (windowBase.orderKeys.getFieldCollations().size() > 0) {
+        // if only the order-by clause is specified, there is a single partition
+        // consisting of all the rows, so we do a distributed sort followed by a
+        // single merge as the input of the window operator
+        DrillDistributionTrait distKeys =
+            new DrillDistributionTrait(DrillDistributionTrait.DistributionType.HASH_DISTRIBUTED,
+                ImmutableList.copyOf(getDistributionFieldsFromCollation(windowBase)));
+
+        traits = traits.plus(distKeys);
+        if(!isSingleMode(call)) {
+          addMerge = true;
+        }
       }
 
       // Add collation trait if either partition-by or order-by is specified.
@@ -77,7 +91,13 @@ public class WindowPrule extends RelOptRule {
         traits = traits.plus(collation);
       }
 
-      final RelNode convertedInput = convert(input, traits);
+      RelNode convertedInput = convert(input, traits);
+
+      if (addMerge) {
+        traits = traits.plus(DrillDistributionTrait.SINGLETON);
+        convertedInput = new SingleMergeExchangePrel(window.getCluster(), traits,
+                         convertedInput, windowBase.collation());
+      }
 
       List<RelDataTypeField> newRowFields = Lists.newArrayList();
       for(RelDataTypeField field : convertedInput.getRowType().getFieldList()) {
@@ -150,6 +170,19 @@ public class WindowPrule extends RelOptRule {
       DrillDistributionTrait.DistributionField field = new DrillDistributionTrait.DistributionField(group);
       groupByFields.add(field);
     }
+
     return groupByFields;
   }
+
+  private List<DistributionField> getDistributionFieldsFromCollation(Window.Group window) {
+    List<DistributionField> distFields = Lists.newArrayList();
+
+    for (RelFieldCollation relField : window.collation().getFieldCollations()) {
+      DistributionField field = new DistributionField(relField.getFieldIndex());
+      distFields.add(field);
+    }
+
+    return distFields;
+  }
+
 }

http://git-wip-us.apache.org/repos/asf/drill/blob/710f8294/exec/java-exec/src/test/java/org/apache/drill/exec/TestWindowFunctions.java
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/test/java/org/apache/drill/exec/TestWindowFunctions.java b/exec/java-exec/src/test/java/org/apache/drill/exec/TestWindowFunctions.java
index dc34632..9d660c3 100644
--- a/exec/java-exec/src/test/java/org/apache/drill/exec/TestWindowFunctions.java
+++ b/exec/java-exec/src/test/java/org/apache/drill/exec/TestWindowFunctions.java
@@ -19,11 +19,15 @@ package org.apache.drill.exec;
 
 import org.apache.drill.BaseTestQuery;
 import org.apache.drill.common.exceptions.UserException;
+import org.apache.drill.common.util.TestTools;
 import org.apache.drill.exec.work.foreman.SqlUnsupportedException;
 import org.apache.drill.exec.work.foreman.UnsupportedFunctionException;
 import org.junit.Test;
 
 public class TestWindowFunctions extends BaseTestQuery {
+  static final String WORKING_PATH = TestTools.getWorkingPath();
+  static final String TEST_RES_PATH = WORKING_PATH + "/src/test/resources";
+
   private static void throwAsUnsupportedException(UserException ex) throws Exception {
     SqlUnsupportedException.errorClassNameToException(ex.getOrCreatePBError(false).getException().getExceptionClass());
     throw ex;
@@ -223,4 +227,24 @@ public class TestWindowFunctions extends BaseTestQuery {
 
     test(query);
   }
+
+  @Test // DRILL-3298
+  public void testCountEmptyPartitionByWithExchange() throws Exception {
+    String query = String.format("select count(*) over (order by o_orderpriority) as cnt from dfs.`%s/multilevel/parquet` where o_custkey < 100", TEST_RES_PATH);
+    try {
+      testBuilder()
+        .sqlQuery(query)
+        .ordered()
+        .baselineColumns("cnt")
+        .optionSettingQueriesForTestQuery("alter session set `planner.slice_target` = 1")
+        .baselineValues(1l)
+        .baselineValues(4l)
+        .baselineValues(4l)
+        .baselineValues(4l)
+        .build().run();
+    } finally {
+      test("alter session set `planner.slice_target` = " + ExecConstants.SLICE_TARGET_DEFAULT);
+    }
+  }
+
 }