You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@drill.apache.org by am...@apache.org on 2016/08/24 16:33:26 UTC
drill git commit: DRILL-4833: Insert exchanges on the inputs of
union-all such that the parent and children can be independently
parallelized.
Repository: drill
Updated Branches:
refs/heads/master d5e74b61b -> 0ccc81aee
DRILL-4833: Insert exchanges on the inputs of union-all such that the parent and children can be independently parallelized.
Add planner option to enable/disable distribution for union-all.
close apache/drill#566
Project: http://git-wip-us.apache.org/repos/asf/drill/repo
Commit: http://git-wip-us.apache.org/repos/asf/drill/commit/0ccc81ae
Tree: http://git-wip-us.apache.org/repos/asf/drill/tree/0ccc81ae
Diff: http://git-wip-us.apache.org/repos/asf/drill/diff/0ccc81ae
Branch: refs/heads/master
Commit: 0ccc81aee22d74743c801f2a483f1aefdb96c87d
Parents: d5e74b6
Author: Aman Sinha <as...@maprtech.com>
Authored: Wed Aug 10 08:38:25 2016 -0700
Committer: Aman Sinha <as...@maprtech.com>
Committed: Wed Aug 24 09:29:58 2016 -0700
----------------------------------------------------------------------
.../exec/planner/physical/PlannerSettings.java | 6 ++
.../exec/planner/physical/UnionAllPrule.java | 53 ++++++++++++---
.../server/options/SystemOptionManager.java | 1 +
.../java/org/apache/drill/TestUnionAll.java | 69 ++++++++++++++++++++
4 files changed, 119 insertions(+), 10 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/drill/blob/0ccc81ae/exec/java-exec/src/main/java/org/apache/drill/exec/planner/physical/PlannerSettings.java
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/planner/physical/PlannerSettings.java b/exec/java-exec/src/main/java/org/apache/drill/exec/planner/physical/PlannerSettings.java
index 51d3708..218bf5b 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/planner/physical/PlannerSettings.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/planner/physical/PlannerSettings.java
@@ -80,6 +80,8 @@ public class PlannerSettings implements Context{
public static final OptionValidator HEP_PARTITION_PRUNING = new BooleanValidator("planner.enable_hep_partition_pruning", true);
public static final OptionValidator PLANNER_MEMORY_LIMIT = new RangeLongValidator("planner.memory_limit",
INITIAL_OFF_HEAP_ALLOCATION_IN_BYTES, MAX_OFF_HEAP_ALLOCATION_IN_BYTES, DEFAULT_MAX_OFF_HEAP_ALLOCATION_IN_BYTES);
+ public static final String UNIONALL_DISTRIBUTE_KEY = "planner.enable_unionall_distribute";
+ public static final BooleanValidator UNIONALL_DISTRIBUTE = new BooleanValidator(UNIONALL_DISTRIBUTE_KEY, false);
public static final OptionValidator IDENTIFIER_MAX_LENGTH =
new RangeLongValidator("planner.identifier_max_length", 128 /* A minimum length is needed because option names are identifiers themselves */,
@@ -241,6 +243,10 @@ public class PlannerSettings implements Context{
return options.getOption(IN_SUBQUERY_THRESHOLD);
}
+ public boolean isUnionAllDistributeEnabled() {
+ return options.getOption(UNIONALL_DISTRIBUTE);
+ }
+
@Override
public <T> T unwrap(Class<T> clazz) {
if(clazz == PlannerSettings.class){
http://git-wip-us.apache.org/repos/asf/drill/blob/0ccc81ae/exec/java-exec/src/main/java/org/apache/drill/exec/planner/physical/UnionAllPrule.java
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/planner/physical/UnionAllPrule.java b/exec/java-exec/src/main/java/org/apache/drill/exec/planner/physical/UnionAllPrule.java
index 95feddd..9d93220 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/planner/physical/UnionAllPrule.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/planner/physical/UnionAllPrule.java
@@ -25,9 +25,11 @@ import org.apache.calcite.plan.RelOptRuleCall;
import org.apache.calcite.plan.RelTraitSet;
import org.apache.calcite.rel.InvalidRelException;
import org.apache.calcite.rel.RelNode;
+import org.apache.calcite.rel.type.RelDataTypeField;
import org.apache.calcite.util.trace.CalciteTrace;
import org.apache.drill.exec.planner.logical.DrillUnionRel;
import org.apache.drill.exec.planner.logical.RelOptHelper;
+import org.apache.drill.exec.planner.physical.DrillDistributionTrait.DistributionField;
import com.google.common.base.Preconditions;
import com.google.common.collect.ImmutableList;
@@ -53,19 +55,50 @@ public class UnionAllPrule extends Prule {
final DrillUnionRel union = (DrillUnionRel) call.rel(0);
final List<RelNode> inputs = union.getInputs();
List<RelNode> convertedInputList = Lists.newArrayList();
- RelTraitSet traits = call.getPlanner().emptyTraitSet().plus(Prel.DRILL_PHYSICAL);
+ PlannerSettings settings = PrelUtil.getPlannerSettings(call.getPlanner());
+ boolean allHashDistributed = true;
- try {
- for (int i = 0; i < inputs.size(); i++) {
- RelNode convertedInput = convert(inputs.get(i), PrelUtil.fixTraits(call, traits));
- convertedInputList.add(convertedInput);
+ for (int i = 0; i < inputs.size(); i++) {
+ RelNode child = inputs.get(i);
+ List<DistributionField> childDistFields = Lists.newArrayList();
+ RelNode convertedChild;
+
+ for (RelDataTypeField f : child.getRowType().getFieldList()) {
+ childDistFields.add(new DistributionField(f.getIndex()));
+ }
+
+ if (settings.isUnionAllDistributeEnabled()) {
+ /*
+ * Strictly speaking, union-all does not need re-distribution of data; but in Drill's execution
+ * model, the data distribution and parallelism operators are the same. Here, we insert a
+ * hash distribution operator to allow parallelism to be determined independently for the parent
+ * and children. (See DRILL-4833).
+ * Note that a round robin distribution would have sufficed but we don't have one.
+ */
+ DrillDistributionTrait hashChild = new DrillDistributionTrait(DrillDistributionTrait.DistributionType.HASH_DISTRIBUTED, ImmutableList.copyOf(childDistFields));
+ RelTraitSet traitsChild = call.getPlanner().emptyTraitSet().plus(Prel.DRILL_PHYSICAL).plus(hashChild);
+ convertedChild = convert(child, PrelUtil.fixTraits(call, traitsChild));
+ } else {
+ RelTraitSet traitsChild = call.getPlanner().emptyTraitSet().plus(Prel.DRILL_PHYSICAL);
+ convertedChild = convert(child, PrelUtil.fixTraits(call, traitsChild));
+ allHashDistributed = false;
}
+ convertedInputList.add(convertedChild);
+ }
+
+ try {
- // output distribution trait is set to ANY since union-all inputs may be distributed in different ways
- // and unlike a join there are no join keys that allow determining how the output would be distributed.
- // Note that a downstream operator may impose a required distribution which would be satisfied by
- // inserting an Exchange after the Union-All.
- traits = call.getPlanner().emptyTraitSet().plus(Prel.DRILL_PHYSICAL).plus(DrillDistributionTrait.ANY);
+ RelTraitSet traits;
+ if (allHashDistributed) {
+ // since all children of union-all are hash distributed, propagate the traits of the left child
+ traits = convertedInputList.get(0).getTraitSet();
+ } else {
+ // output distribution trait is set to ANY since union-all inputs may be distributed in different ways
+ // and unlike a join there are no join keys that allow determining how the output would be distributed.
+ // Note that a downstream operator may impose a required distribution which would be satisfied by
+ // inserting an Exchange after the Union-All.
+ traits = call.getPlanner().emptyTraitSet().plus(Prel.DRILL_PHYSICAL).plus(DrillDistributionTrait.ANY);
+ }
Preconditions.checkArgument(convertedInputList.size() >= 2, "Union list must be at least two items.");
RelNode left = convertedInputList.get(0);
http://git-wip-us.apache.org/repos/asf/drill/blob/0ccc81ae/exec/java-exec/src/main/java/org/apache/drill/exec/server/options/SystemOptionManager.java
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/server/options/SystemOptionManager.java b/exec/java-exec/src/main/java/org/apache/drill/exec/server/options/SystemOptionManager.java
index 71d9f0a..115ea47 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/server/options/SystemOptionManager.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/server/options/SystemOptionManager.java
@@ -87,6 +87,7 @@ public class SystemOptionManager extends BaseOptionManager implements AutoClosea
PlannerSettings.FILTER_MAX_SELECTIVITY_ESTIMATE_FACTOR,
PlannerSettings.TYPE_INFERENCE,
PlannerSettings.IN_SUBQUERY_THRESHOLD,
+ PlannerSettings.UNIONALL_DISTRIBUTE,
ExecConstants.CAST_TO_NULLABLE_NUMERIC_OPTION,
ExecConstants.OUTPUT_FORMAT_VALIDATOR,
ExecConstants.PARQUET_BLOCK_SIZE_VALIDATOR,
http://git-wip-us.apache.org/repos/asf/drill/blob/0ccc81ae/exec/java-exec/src/test/java/org/apache/drill/TestUnionAll.java
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/test/java/org/apache/drill/TestUnionAll.java b/exec/java-exec/src/test/java/org/apache/drill/TestUnionAll.java
index e4168c6..adada23 100644
--- a/exec/java-exec/src/test/java/org/apache/drill/TestUnionAll.java
+++ b/exec/java-exec/src/test/java/org/apache/drill/TestUnionAll.java
@@ -35,6 +35,8 @@ public class TestUnionAll extends BaseTestQuery{
private static final String sliceTargetSmall = "alter session set `planner.slice_target` = 1";
private static final String sliceTargetDefault = "alter session reset `planner.slice_target`";
+ private static final String enableDistribute = "alter session set `planner.enable_unionall_distribute` = true";
+ private static final String defaultDistribute = "alter session reset `planner.enable_unionall_distribute`";
@Test // Simple Union-All over two scans
public void testUnionAll1() throws Exception {
@@ -1111,4 +1113,71 @@ public class TestUnionAll extends BaseTestQuery{
}
}
+ @Test // DRILL-4833 // limit 1 is on RHS of union-all
+ public void testDrill4833_1() throws Exception {
+ final String l = FileUtils.getResourceAsFile("/multilevel/parquet/1994").toURI().toString();
+ final String r = FileUtils.getResourceAsFile("/multilevel/parquet/1995").toURI().toString();
+
+ final String query = String.format("SELECT o_custkey FROM \n" +
+ " ((select o1.o_custkey from dfs_test.`%s` o1 inner join dfs_test.`%s` o2 on o1.o_orderkey = o2.o_custkey) \n" +
+ " Union All (SELECT o_custkey FROM dfs_test.`%s` limit 1))", l, r, l);
+
+ // Validate the plan
+ final String[] expectedPlan = {"(?s)UnionExchange.*UnionAll.*HashJoin.*"};
+ final String[] excludedPlan = {};
+
+ try {
+ test(sliceTargetSmall);
+ test(enableDistribute);
+
+ PlanTestBase.testPlanMatchingPatterns(query, expectedPlan, excludedPlan);
+
+ testBuilder()
+ .optionSettingQueriesForTestQuery(sliceTargetSmall)
+ .optionSettingQueriesForBaseline(sliceTargetDefault)
+ .unOrdered()
+ .sqlQuery(query)
+ .sqlBaselineQuery(query)
+ .build()
+ .run();
+ } finally {
+ test(sliceTargetDefault);
+ test(defaultDistribute);
+ }
+ }
+
+ @Test // DRILL-4833 // limit 1 is on LHS of union-all
+ public void testDrill4833_2() throws Exception {
+ final String l = FileUtils.getResourceAsFile("/multilevel/parquet/1994").toURI().toString();
+ final String r = FileUtils.getResourceAsFile("/multilevel/parquet/1995").toURI().toString();
+
+ final String query = String.format("SELECT o_custkey FROM \n" +
+ " ((SELECT o_custkey FROM dfs_test.`%s` limit 1) \n" +
+ " union all \n" +
+ " (select o1.o_custkey from dfs_test.`%s` o1 inner join dfs_test.`%s` o2 on o1.o_orderkey = o2.o_custkey))", l, r, l);
+
+ // Validate the plan
+ final String[] expectedPlan = {"(?s)UnionExchange.*UnionAll.*HashJoin.*"};
+ final String[] excludedPlan = {};
+
+ try {
+ test(sliceTargetSmall);
+ test(enableDistribute);
+
+ PlanTestBase.testPlanMatchingPatterns(query, expectedPlan, excludedPlan);
+
+ testBuilder()
+ .optionSettingQueriesForTestQuery(sliceTargetSmall)
+ .optionSettingQueriesForBaseline(sliceTargetDefault)
+ .unOrdered()
+ .sqlQuery(query)
+ .sqlBaselineQuery(query)
+ .build()
+ .run();
+ } finally {
+ test(sliceTargetDefault);
+ test(defaultDistribute);
+ }
+ }
+
}
\ No newline at end of file