You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@drill.apache.org by am...@apache.org on 2016/08/24 16:33:26 UTC

drill git commit: DRILL-4833: Insert exchanges on the inputs of union-all such that the parent and children can be independently parallelized.

Repository: drill
Updated Branches:
  refs/heads/master d5e74b61b -> 0ccc81aee


DRILL-4833: Insert exchanges on the inputs of union-all such that the parent and children can be independently parallelized.

Add planner option to enable/disable distribution for union-all.

close apache/drill#566


Project: http://git-wip-us.apache.org/repos/asf/drill/repo
Commit: http://git-wip-us.apache.org/repos/asf/drill/commit/0ccc81ae
Tree: http://git-wip-us.apache.org/repos/asf/drill/tree/0ccc81ae
Diff: http://git-wip-us.apache.org/repos/asf/drill/diff/0ccc81ae

Branch: refs/heads/master
Commit: 0ccc81aee22d74743c801f2a483f1aefdb96c87d
Parents: d5e74b6
Author: Aman Sinha <as...@maprtech.com>
Authored: Wed Aug 10 08:38:25 2016 -0700
Committer: Aman Sinha <as...@maprtech.com>
Committed: Wed Aug 24 09:29:58 2016 -0700

----------------------------------------------------------------------
 .../exec/planner/physical/PlannerSettings.java  |  6 ++
 .../exec/planner/physical/UnionAllPrule.java    | 53 ++++++++++++---
 .../server/options/SystemOptionManager.java     |  1 +
 .../java/org/apache/drill/TestUnionAll.java     | 69 ++++++++++++++++++++
 4 files changed, 119 insertions(+), 10 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/drill/blob/0ccc81ae/exec/java-exec/src/main/java/org/apache/drill/exec/planner/physical/PlannerSettings.java
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/planner/physical/PlannerSettings.java b/exec/java-exec/src/main/java/org/apache/drill/exec/planner/physical/PlannerSettings.java
index 51d3708..218bf5b 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/planner/physical/PlannerSettings.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/planner/physical/PlannerSettings.java
@@ -80,6 +80,8 @@ public class PlannerSettings implements Context{
   public static final OptionValidator HEP_PARTITION_PRUNING = new BooleanValidator("planner.enable_hep_partition_pruning", true);
   public static final OptionValidator PLANNER_MEMORY_LIMIT = new RangeLongValidator("planner.memory_limit",
       INITIAL_OFF_HEAP_ALLOCATION_IN_BYTES, MAX_OFF_HEAP_ALLOCATION_IN_BYTES, DEFAULT_MAX_OFF_HEAP_ALLOCATION_IN_BYTES);
+  public static final String UNIONALL_DISTRIBUTE_KEY = "planner.enable_unionall_distribute";
+  public static final BooleanValidator UNIONALL_DISTRIBUTE = new BooleanValidator(UNIONALL_DISTRIBUTE_KEY, false);
 
   public static final OptionValidator IDENTIFIER_MAX_LENGTH =
       new RangeLongValidator("planner.identifier_max_length", 128 /* A minimum length is needed because option names are identifiers themselves */,
@@ -241,6 +243,10 @@ public class PlannerSettings implements Context{
     return options.getOption(IN_SUBQUERY_THRESHOLD);
   }
 
+  public boolean isUnionAllDistributeEnabled() {
+    return options.getOption(UNIONALL_DISTRIBUTE);
+  }
+
   @Override
   public <T> T unwrap(Class<T> clazz) {
     if(clazz == PlannerSettings.class){

http://git-wip-us.apache.org/repos/asf/drill/blob/0ccc81ae/exec/java-exec/src/main/java/org/apache/drill/exec/planner/physical/UnionAllPrule.java
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/planner/physical/UnionAllPrule.java b/exec/java-exec/src/main/java/org/apache/drill/exec/planner/physical/UnionAllPrule.java
index 95feddd..9d93220 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/planner/physical/UnionAllPrule.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/planner/physical/UnionAllPrule.java
@@ -25,9 +25,11 @@ import org.apache.calcite.plan.RelOptRuleCall;
 import org.apache.calcite.plan.RelTraitSet;
 import org.apache.calcite.rel.InvalidRelException;
 import org.apache.calcite.rel.RelNode;
+import org.apache.calcite.rel.type.RelDataTypeField;
 import org.apache.calcite.util.trace.CalciteTrace;
 import org.apache.drill.exec.planner.logical.DrillUnionRel;
 import org.apache.drill.exec.planner.logical.RelOptHelper;
+import org.apache.drill.exec.planner.physical.DrillDistributionTrait.DistributionField;
 
 import com.google.common.base.Preconditions;
 import com.google.common.collect.ImmutableList;
@@ -53,19 +55,50 @@ public class UnionAllPrule extends Prule {
     final DrillUnionRel union = (DrillUnionRel) call.rel(0);
     final List<RelNode> inputs = union.getInputs();
     List<RelNode> convertedInputList = Lists.newArrayList();
-    RelTraitSet traits = call.getPlanner().emptyTraitSet().plus(Prel.DRILL_PHYSICAL);
+    PlannerSettings settings = PrelUtil.getPlannerSettings(call.getPlanner());
+    boolean allHashDistributed = true;
 
-    try {
-      for (int i = 0; i < inputs.size(); i++) {
-        RelNode convertedInput = convert(inputs.get(i), PrelUtil.fixTraits(call, traits));
-        convertedInputList.add(convertedInput);
+    for (int i = 0; i < inputs.size(); i++) {
+      RelNode child = inputs.get(i);
+      List<DistributionField> childDistFields = Lists.newArrayList();
+      RelNode convertedChild;
+
+      for (RelDataTypeField f : child.getRowType().getFieldList()) {
+        childDistFields.add(new DistributionField(f.getIndex()));
+      }
+
+      if (settings.isUnionAllDistributeEnabled()) {
+        /*
+         * Strictly speaking, union-all does not need re-distribution of data; but in Drill's execution
+         * model, the data distribution and parallelism operators are the same. Here, we insert a
+         * hash distribution operator to allow parallelism to be determined independently for the parent
+         * and children. (See DRILL-4833).
+         * Note that a round robin distribution would have sufficed but we don't have one.
+         */
+        DrillDistributionTrait hashChild = new DrillDistributionTrait(DrillDistributionTrait.DistributionType.HASH_DISTRIBUTED, ImmutableList.copyOf(childDistFields));
+        RelTraitSet traitsChild = call.getPlanner().emptyTraitSet().plus(Prel.DRILL_PHYSICAL).plus(hashChild);
+        convertedChild = convert(child, PrelUtil.fixTraits(call, traitsChild));
+      } else {
+        RelTraitSet traitsChild = call.getPlanner().emptyTraitSet().plus(Prel.DRILL_PHYSICAL);
+        convertedChild = convert(child, PrelUtil.fixTraits(call, traitsChild));
+        allHashDistributed = false;
       }
+      convertedInputList.add(convertedChild);
+    }
+
+    try {
 
-      // output distribution trait is set to ANY since union-all inputs may be distributed in different ways
-      // and unlike a join there are no join keys that allow determining how the output would be distributed.
-      // Note that a downstream operator may impose a required distribution which would be satisfied by
-      // inserting an Exchange after the Union-All.
-      traits = call.getPlanner().emptyTraitSet().plus(Prel.DRILL_PHYSICAL).plus(DrillDistributionTrait.ANY);
+      RelTraitSet traits;
+      if (allHashDistributed) {
+        // since all children of union-all are hash distributed, propagate the traits of the left child
+        traits = convertedInputList.get(0).getTraitSet();
+      } else {
+        // output distribution trait is set to ANY since union-all inputs may be distributed in different ways
+        // and unlike a join there are no join keys that allow determining how the output would be distributed.
+        // Note that a downstream operator may impose a required distribution which would be satisfied by
+        // inserting an Exchange after the Union-All.
+        traits = call.getPlanner().emptyTraitSet().plus(Prel.DRILL_PHYSICAL).plus(DrillDistributionTrait.ANY);
+      }
 
       Preconditions.checkArgument(convertedInputList.size() >= 2, "Union list must be at least two items.");
       RelNode left = convertedInputList.get(0);

http://git-wip-us.apache.org/repos/asf/drill/blob/0ccc81ae/exec/java-exec/src/main/java/org/apache/drill/exec/server/options/SystemOptionManager.java
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/server/options/SystemOptionManager.java b/exec/java-exec/src/main/java/org/apache/drill/exec/server/options/SystemOptionManager.java
index 71d9f0a..115ea47 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/server/options/SystemOptionManager.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/server/options/SystemOptionManager.java
@@ -87,6 +87,7 @@ public class SystemOptionManager extends BaseOptionManager implements AutoClosea
       PlannerSettings.FILTER_MAX_SELECTIVITY_ESTIMATE_FACTOR,
       PlannerSettings.TYPE_INFERENCE,
       PlannerSettings.IN_SUBQUERY_THRESHOLD,
+      PlannerSettings.UNIONALL_DISTRIBUTE,
       ExecConstants.CAST_TO_NULLABLE_NUMERIC_OPTION,
       ExecConstants.OUTPUT_FORMAT_VALIDATOR,
       ExecConstants.PARQUET_BLOCK_SIZE_VALIDATOR,

http://git-wip-us.apache.org/repos/asf/drill/blob/0ccc81ae/exec/java-exec/src/test/java/org/apache/drill/TestUnionAll.java
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/test/java/org/apache/drill/TestUnionAll.java b/exec/java-exec/src/test/java/org/apache/drill/TestUnionAll.java
index e4168c6..adada23 100644
--- a/exec/java-exec/src/test/java/org/apache/drill/TestUnionAll.java
+++ b/exec/java-exec/src/test/java/org/apache/drill/TestUnionAll.java
@@ -35,6 +35,8 @@ public class TestUnionAll extends BaseTestQuery{
 
   private static final String sliceTargetSmall = "alter session set `planner.slice_target` = 1";
   private static final String sliceTargetDefault = "alter session reset `planner.slice_target`";
+  private static final String enableDistribute = "alter session set `planner.enable_unionall_distribute` = true";
+  private static final String defaultDistribute = "alter session reset `planner.enable_unionall_distribute`";
 
   @Test  // Simple Union-All over two scans
   public void testUnionAll1() throws Exception {
@@ -1111,4 +1113,71 @@ public class TestUnionAll extends BaseTestQuery{
     }
   }
 
+  @Test // DRILL-4833  // limit 1 is on RHS of union-all
+  public void testDrill4833_1() throws Exception {
+    final String l = FileUtils.getResourceAsFile("/multilevel/parquet/1994").toURI().toString();
+    final String r = FileUtils.getResourceAsFile("/multilevel/parquet/1995").toURI().toString();
+
+    final String query = String.format("SELECT o_custkey FROM \n" +
+        " ((select o1.o_custkey from dfs_test.`%s` o1 inner join dfs_test.`%s` o2 on o1.o_orderkey = o2.o_custkey) \n" +
+        " Union All (SELECT o_custkey FROM dfs_test.`%s` limit 1))", l, r, l);
+
+    // Validate the plan
+    final String[] expectedPlan = {"(?s)UnionExchange.*UnionAll.*HashJoin.*"};
+    final String[] excludedPlan = {};
+
+    try {
+      test(sliceTargetSmall);
+      test(enableDistribute);
+
+      PlanTestBase.testPlanMatchingPatterns(query, expectedPlan, excludedPlan);
+
+      testBuilder()
+        .optionSettingQueriesForTestQuery(sliceTargetSmall)
+        .optionSettingQueriesForBaseline(sliceTargetDefault)
+        .unOrdered()
+        .sqlQuery(query)
+        .sqlBaselineQuery(query)
+        .build()
+        .run();
+    } finally {
+      test(sliceTargetDefault);
+      test(defaultDistribute);
+    }
+  }
+
+  @Test // DRILL-4833  // limit 1 is on LHS of union-all
+  public void testDrill4833_2() throws Exception {
+    final String l = FileUtils.getResourceAsFile("/multilevel/parquet/1994").toURI().toString();
+    final String r = FileUtils.getResourceAsFile("/multilevel/parquet/1995").toURI().toString();
+
+    final String query = String.format("SELECT o_custkey FROM \n" +
+        " ((SELECT o_custkey FROM dfs_test.`%s` limit 1) \n" +
+        " union all \n" +
+        " (select o1.o_custkey from dfs_test.`%s` o1 inner join dfs_test.`%s` o2 on o1.o_orderkey = o2.o_custkey))", l, r, l);
+
+    // Validate the plan
+    final String[] expectedPlan = {"(?s)UnionExchange.*UnionAll.*HashJoin.*"};
+    final String[] excludedPlan = {};
+
+    try {
+      test(sliceTargetSmall);
+      test(enableDistribute);
+
+      PlanTestBase.testPlanMatchingPatterns(query, expectedPlan, excludedPlan);
+
+      testBuilder()
+        .optionSettingQueriesForTestQuery(sliceTargetSmall)
+        .optionSettingQueriesForBaseline(sliceTargetDefault)
+        .unOrdered()
+        .sqlQuery(query)
+        .sqlBaselineQuery(query)
+        .build()
+        .run();
+    } finally {
+      test(sliceTargetDefault);
+      test(defaultDistribute);
+    }
+  }
+
 }
\ No newline at end of file