You are viewing a plain text version of this content. The canonical link for it is here.
Posted to dev@drill.apache.org by ja...@apache.org on 2014/05/13 02:56:32 UTC
[09/11] git commit: DRILL-690: Generate 2 phase plans for Hash Aggr
and Streaming Aggr only if the aggr functions are SUM, MIN, MAX.
DRILL-690: Generate 2 phase plans for Hash Aggr and Streaming Aggr only if the aggr functions are SUM, MIN, MAX.
Added session options for enabling/disabling aggrs, joins and multiphase aggrs. Modified DrillRuleSet to populate the rules based on options rather than from static list. Added matches() implementation for join and aggr rules.
Project: http://git-wip-us.apache.org/repos/asf/incubator-drill/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-drill/commit/e4d3c7ca
Tree: http://git-wip-us.apache.org/repos/asf/incubator-drill/tree/e4d3c7ca
Diff: http://git-wip-us.apache.org/repos/asf/incubator-drill/diff/e4d3c7ca
Branch: refs/heads/master
Commit: e4d3c7ca9e2cc20defb5fae7c0373d982147cabb
Parents: a50ab2b
Author: Aman Sinha <as...@maprtech.com>
Authored: Sun May 11 17:57:09 2014 -0700
Committer: Jacques Nadeau <ja...@apache.org>
Committed: Mon May 12 12:19:22 2014 -0700
----------------------------------------------------------------------
.../exec/planner/logical/DrillRuleSets.java | 55 ++++++++++++----
.../exec/planner/physical/AggPruleBase.java | 18 ++++++
.../exec/planner/physical/HashAggPrule.java | 56 +++++++++++++---
.../exec/planner/physical/HashJoinPrule.java | 5 ++
.../exec/planner/physical/MergeJoinPrule.java | 5 ++
.../exec/planner/physical/PlannerSettings.java | 29 ++++++++-
.../drill/exec/planner/physical/PrelUtil.java | 5 ++
.../exec/planner/physical/StreamAggPrule.java | 67 ++++++++++++--------
.../drill/exec/planner/sql/DrillSqlWorker.java | 12 ++--
.../server/options/SystemOptionManager.java | 7 +-
10 files changed, 204 insertions(+), 55 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/e4d3c7ca/exec/java-exec/src/main/java/org/apache/drill/exec/planner/logical/DrillRuleSets.java
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/planner/logical/DrillRuleSets.java b/exec/java-exec/src/main/java/org/apache/drill/exec/planner/logical/DrillRuleSets.java
index 0d6da25..c07fee3 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/planner/logical/DrillRuleSets.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/planner/logical/DrillRuleSets.java
@@ -17,22 +17,14 @@
*/
package org.apache.drill.exec.planner.logical;
+import java.util.ArrayList;
import java.util.Iterator;
+import java.util.List;
import net.hydromatic.optiq.tools.RuleSet;
+import org.apache.drill.exec.ops.QueryContext;
import org.apache.drill.exec.planner.physical.*;
-import org.apache.drill.exec.planner.physical.FilterPrule;
-import org.apache.drill.exec.planner.physical.HashAggPrule;
-import org.apache.drill.exec.planner.physical.HashJoinPrule;
-import org.apache.drill.exec.planner.physical.LimitPrule;
-import org.apache.drill.exec.planner.physical.MergeJoinPrule;
-import org.apache.drill.exec.planner.physical.ProjectPrule;
-import org.apache.drill.exec.planner.physical.ScanPrule;
-import org.apache.drill.exec.planner.physical.ScreenPrule;
-import org.apache.drill.exec.planner.physical.SortConvertPrule;
-import org.apache.drill.exec.planner.physical.SortPrule;
-import org.apache.drill.exec.planner.physical.StreamAggPrule;
import org.eigenbase.rel.RelFactories;
import org.eigenbase.rel.rules.MergeProjectRule;
import org.eigenbase.rel.rules.PushFilterPastJoinRule;
@@ -102,6 +94,7 @@ public class DrillRuleSets {
MergeProjectRule.INSTANCE
));
+ /*
public static final RuleSet DRILL_PHYSICAL_MEM = new DrillRuleSet(ImmutableSet.of( //
// DrillScanRule.INSTANCE,
// DrillFilterRule.INSTANCE,
@@ -150,12 +143,50 @@ public class DrillRuleSets {
// PushJoinThroughJoinRule.LEFT, //
// PushSortPastProjectRule.INSTANCE, //
));
-
+*/
public static final RuleSet DRILL_PHYSICAL_DISK = new DrillRuleSet(ImmutableSet.of( //
ProjectPrule.INSTANCE
));
+ public static final RuleSet getPhysicalRules(QueryContext qcontext) {
+ List<RelOptRule> ruleList = new ArrayList<RelOptRule>();
+
+
+ ruleList.add(ConvertCountToDirectScan.AGG_ON_PROJ_ON_SCAN);
+ ruleList.add(ConvertCountToDirectScan.AGG_ON_SCAN);
+ ruleList.add(SortConvertPrule.INSTANCE);
+ ruleList.add(SortPrule.INSTANCE);
+ ruleList.add(ProjectPrule.INSTANCE);
+ ruleList.add(ScanPrule.INSTANCE);
+ ruleList.add(ScreenPrule.INSTANCE);
+ ruleList.add(ExpandConversionRule.INSTANCE);
+ ruleList.add(FilterPrule.INSTANCE);
+ ruleList.add(LimitPrule.INSTANCE);
+ ruleList.add(WriterPrule.INSTANCE);
+ ruleList.add(PushLimitToTopN.INSTANCE);
+
+ PlannerSettings ps = qcontext.getPlannerSettings();
+
+ if (ps.isHashAggEnabled()) {
+ ruleList.add(HashAggPrule.INSTANCE);
+ }
+
+ if (ps.isStreamAggEnabled()) {
+ ruleList.add(StreamAggPrule.INSTANCE);
+ }
+
+ if (ps.isHashJoinEnabled()) {
+ ruleList.add(HashJoinPrule.INSTANCE);
+ }
+
+ if (ps.isMergeJoinEnabled()) {
+ ruleList.add(MergeJoinPrule.INSTANCE);
+ }
+
+ return new DrillRuleSet(ImmutableSet.copyOf(ruleList));
+ }
+
public static RuleSet create(ImmutableSet<RelOptRule> rules) {
return new DrillRuleSet(rules);
}
http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/e4d3c7ca/exec/java-exec/src/main/java/org/apache/drill/exec/planner/physical/AggPruleBase.java
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/planner/physical/AggPruleBase.java b/exec/java-exec/src/main/java/org/apache/drill/exec/planner/physical/AggPruleBase.java
index 235018d..3bdcc2e 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/planner/physical/AggPruleBase.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/planner/physical/AggPruleBase.java
@@ -24,7 +24,9 @@ import net.hydromatic.optiq.util.BitSets;
import org.apache.drill.exec.planner.logical.DrillAggregateRel;
import org.apache.drill.exec.planner.physical.DrillDistributionTrait.DistributionField;
+import org.eigenbase.rel.AggregateCall;
import org.eigenbase.relopt.RelOptRule;
+import org.eigenbase.relopt.RelOptRuleCall;
import org.eigenbase.relopt.RelOptRuleOperand;
import com.google.common.collect.Lists;
@@ -54,4 +56,20 @@ public abstract class AggPruleBase extends RelOptRule {
return groupByFields;
}
+ // Create 2 phase aggr plan for aggregates such as SUM, MIN, MAX
+ // If any of the aggregate functions are not one of these, then we
+ // currently won't generate a 2 phase plan.
+ protected boolean create2PhasePlan(RelOptRuleCall call, DrillAggregateRel aggregate) {
+ if (! PrelUtil.getPlannerSettings(call.getPlanner()).isMultiPhaseAggEnabled()) {
+ return false;
+ }
+
+ for (AggregateCall aggCall : aggregate.getAggCallList()) {
+ String name = aggCall.getAggregation().getName();
+ if ( ! (name.equals("SUM") || name.equals("MIN") || name.equals("MAX"))) {
+ return false;
+ }
+ }
+ return true;
+ }
}
http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/e4d3c7ca/exec/java-exec/src/main/java/org/apache/drill/exec/planner/physical/HashAggPrule.java
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/planner/physical/HashAggPrule.java b/exec/java-exec/src/main/java/org/apache/drill/exec/planner/physical/HashAggPrule.java
index 22b33ea..859941b 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/planner/physical/HashAggPrule.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/planner/physical/HashAggPrule.java
@@ -27,6 +27,7 @@ import org.eigenbase.rel.RelNode;
import org.eigenbase.relopt.RelOptRule;
import org.eigenbase.relopt.RelOptRuleCall;
import org.eigenbase.relopt.RelTraitSet;
+import org.eigenbase.relopt.volcano.RelSubset;
import org.eigenbase.trace.EigenbaseTrace;
import com.google.common.collect.ImmutableList;
@@ -40,6 +41,11 @@ public class HashAggPrule extends AggPruleBase {
}
@Override
+ public boolean matches(RelOptRuleCall call) {
+ return PrelUtil.getPlannerSettings(call.getPlanner()).isHashAggEnabled();
+ }
+
+ @Override
public void onMatch(RelOptRuleCall call) {
final DrillAggregateRel aggregate = (DrillAggregateRel) call.rel(0);
final RelNode input = call.rel(1);
@@ -50,30 +56,60 @@ public class HashAggPrule extends AggPruleBase {
return;
}
- DrillDistributionTrait toDist = null;
RelTraitSet traits = null;
try {
if (aggregate.getGroupSet().isEmpty()) {
- toDist = DrillDistributionTrait.SINGLETON;
- traits = call.getPlanner().emptyTraitSet().plus(Prel.DRILL_PHYSICAL).plus(toDist);
+ DrillDistributionTrait singleDist = DrillDistributionTrait.SINGLETON;
+ traits = call.getPlanner().emptyTraitSet().plus(Prel.DRILL_PHYSICAL).plus(singleDist);
createTransformRequest(call, aggregate, input, traits);
} else {
// hash distribute on all grouping keys
- toDist = new DrillDistributionTrait(DrillDistributionTrait.DistributionType.HASH_DISTRIBUTED,
- ImmutableList.copyOf(getDistributionField(aggregate, true /* get all grouping keys */)));
+ DrillDistributionTrait distOnAllKeys =
+ new DrillDistributionTrait(DrillDistributionTrait.DistributionType.HASH_DISTRIBUTED,
+ ImmutableList.copyOf(getDistributionField(aggregate, true /* get all grouping keys */)));
- traits = call.getPlanner().emptyTraitSet().plus(Prel.DRILL_PHYSICAL).plus(toDist);
+ traits = call.getPlanner().emptyTraitSet().plus(Prel.DRILL_PHYSICAL).plus(distOnAllKeys);
createTransformRequest(call, aggregate, input, traits);
// hash distribute on single grouping key
- toDist = new DrillDistributionTrait(DrillDistributionTrait.DistributionType.HASH_DISTRIBUTED,
- ImmutableList.copyOf(getDistributionField(aggregate, false /* get single grouping key */)));
+ DrillDistributionTrait distOnOneKey =
+ new DrillDistributionTrait(DrillDistributionTrait.DistributionType.HASH_DISTRIBUTED,
+ ImmutableList.copyOf(getDistributionField(aggregate, false /* get single grouping key */)));
- traits = call.getPlanner().emptyTraitSet().plus(Prel.DRILL_PHYSICAL).plus(toDist);
+ traits = call.getPlanner().emptyTraitSet().plus(Prel.DRILL_PHYSICAL).plus(distOnOneKey);
createTransformRequest(call, aggregate, input, traits);
- ///TODO: 2 phase hash aggregate plan
+ if (create2PhasePlan(call, aggregate)) {
+ traits = call.getPlanner().emptyTraitSet().plus(Prel.DRILL_PHYSICAL) ;
+
+ RelNode convertedInput = convert(input, traits);
+
+ if (convertedInput instanceof RelSubset) {
+ RelSubset subset = (RelSubset) convertedInput;
+ for (RelNode rel : subset.getRelList()) {
+ if (!rel.getTraitSet().getTrait(DrillDistributionTraitDef.INSTANCE).equals(DrillDistributionTrait.DEFAULT)) {
+ DrillDistributionTrait toDist = rel.getTraitSet().getTrait(DrillDistributionTraitDef.INSTANCE);
+ traits = call.getPlanner().emptyTraitSet().plus(Prel.DRILL_PHYSICAL).plus(toDist);
+ RelNode newInput = convert(input, traits);
+
+ HashAggPrel phase1Agg = new HashAggPrel(aggregate.getCluster(), traits, newInput,
+ aggregate.getGroupSet(),
+ aggregate.getAggCallList());
+
+ HashToRandomExchangePrel exch =
+ new HashToRandomExchangePrel(phase1Agg.getCluster(), phase1Agg.getTraitSet().plus(Prel.DRILL_PHYSICAL).plus(distOnAllKeys),
+ phase1Agg, ImmutableList.copyOf(getDistributionField(aggregate, true)));
+
+ HashAggPrel phase2Agg = new HashAggPrel(aggregate.getCluster(), traits, exch,
+ aggregate.getGroupSet(),
+ aggregate.getAggCallList());
+
+ call.transformTo(phase2Agg);
+ }
+ }
+ }
+ }
}
} catch (InvalidRelException e) {
tracer.warning(e.toString());
http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/e4d3c7ca/exec/java-exec/src/main/java/org/apache/drill/exec/planner/physical/HashJoinPrule.java
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/planner/physical/HashJoinPrule.java b/exec/java-exec/src/main/java/org/apache/drill/exec/planner/physical/HashJoinPrule.java
index 5e3ace2..851877f 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/planner/physical/HashJoinPrule.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/planner/physical/HashJoinPrule.java
@@ -41,6 +41,11 @@ public class HashJoinPrule extends JoinPruleBase {
}
@Override
+ public boolean matches(RelOptRuleCall call) {
+ return PrelUtil.getPlannerSettings(call.getPlanner()).isHashJoinEnabled();
+ }
+
+ @Override
public void onMatch(RelOptRuleCall call) {
final DrillJoinRel join = (DrillJoinRel) call.rel(0);
final RelNode left = call.rel(1);
http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/e4d3c7ca/exec/java-exec/src/main/java/org/apache/drill/exec/planner/physical/MergeJoinPrule.java
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/planner/physical/MergeJoinPrule.java b/exec/java-exec/src/main/java/org/apache/drill/exec/planner/physical/MergeJoinPrule.java
index 30e2a97..30f651c 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/planner/physical/MergeJoinPrule.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/planner/physical/MergeJoinPrule.java
@@ -46,6 +46,11 @@ public class MergeJoinPrule extends JoinPruleBase {
}
@Override
+ public boolean matches(RelOptRuleCall call) {
+ return PrelUtil.getPlannerSettings(call.getPlanner()).isMergeJoinEnabled();
+ }
+
+ @Override
public void onMatch(RelOptRuleCall call) {
final DrillJoinRel join = (DrillJoinRel) call.rel(0);
final RelNode left = join.getLeft();
http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/e4d3c7ca/exec/java-exec/src/main/java/org/apache/drill/exec/planner/physical/PlannerSettings.java
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/planner/physical/PlannerSettings.java b/exec/java-exec/src/main/java/org/apache/drill/exec/planner/physical/PlannerSettings.java
index ae895da..0f694af 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/planner/physical/PlannerSettings.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/planner/physical/PlannerSettings.java
@@ -30,8 +30,13 @@ public class PlannerSettings implements FrameworkContext{
private boolean useDefaultCosting = false; // True: use default Optiq costing, False: use Drill costing
public static final OptionValidator EXCHANGE = new BooleanValidator("planner.disable_exchanges", false);
-
- public OptionManager options;
+ public static final OptionValidator HASHAGG = new BooleanValidator("planner.enable_hashagg", true);
+ public static final OptionValidator STREAMAGG = new BooleanValidator("planner.enable_streamagg", true);
+ public static final OptionValidator HASHJOIN = new BooleanValidator("planner.enable_hashjoin", true);
+ public static final OptionValidator MERGEJOIN = new BooleanValidator("planner.enable_mergejoin", true);
+ public static final OptionValidator MULTIPHASE = new BooleanValidator("planner.enable_multiphase_agg", true);
+
+ public OptionManager options = null;
public PlannerSettings(OptionManager options){
this.options = options;
@@ -57,6 +62,26 @@ public class PlannerSettings implements FrameworkContext{
this.useDefaultCosting = defcost;
}
+ public boolean isHashAggEnabled() {
+ return options.getOption(HASHAGG.getOptionName()).bool_val;
+ }
+
+ public boolean isStreamAggEnabled() {
+ return options.getOption(STREAMAGG.getOptionName()).bool_val;
+ }
+
+ public boolean isHashJoinEnabled() {
+ return options.getOption(HASHJOIN.getOptionName()).bool_val;
+ }
+
+ public boolean isMergeJoinEnabled() {
+ return options.getOption(MERGEJOIN.getOptionName()).bool_val;
+ }
+
+ public boolean isMultiPhaseAggEnabled() {
+ return options.getOption(MULTIPHASE.getOptionName()).bool_val;
+ }
+
@Override
public <T> T unwrap(Class<T> clazz) {
if(clazz == PlannerSettings.class){
http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/e4d3c7ca/exec/java-exec/src/main/java/org/apache/drill/exec/planner/physical/PrelUtil.java
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/planner/physical/PrelUtil.java b/exec/java-exec/src/main/java/org/apache/drill/exec/planner/physical/PrelUtil.java
index bdc8b31..9ca9fbb 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/planner/physical/PrelUtil.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/planner/physical/PrelUtil.java
@@ -36,6 +36,7 @@ import org.apache.drill.exec.record.BatchSchema.SelectionVectorMode;
import org.eigenbase.rel.RelCollation;
import org.eigenbase.rel.RelFieldCollation;
import org.eigenbase.relopt.RelOptCluster;
+import org.eigenbase.relopt.RelOptPlanner;
import org.eigenbase.reltype.RelDataType;
import org.eigenbase.rex.RexCall;
import org.eigenbase.rex.RexInputRef;
@@ -86,6 +87,10 @@ public class PrelUtil {
public static PlannerSettings getSettings(RelOptCluster cluster){
return cluster.getPlanner().getFrameworkContext().unwrap(PlannerSettings.class);
}
+
+ public static PlannerSettings getPlannerSettings(RelOptPlanner planner) {
+ return planner.getFrameworkContext().unwrap(PlannerSettings.class);
+ }
public static PhysicalOperator removeSvIfRequired(PhysicalOperator child, SelectionVectorMode... allowed){
SelectionVectorMode current = child.getSVMode();
http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/e4d3c7ca/exec/java-exec/src/main/java/org/apache/drill/exec/planner/physical/StreamAggPrule.java
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/planner/physical/StreamAggPrule.java b/exec/java-exec/src/main/java/org/apache/drill/exec/planner/physical/StreamAggPrule.java
index 62b9aa5..bccdea5 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/planner/physical/StreamAggPrule.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/planner/physical/StreamAggPrule.java
@@ -26,6 +26,7 @@ import org.apache.drill.exec.planner.logical.DrillAggregateRel;
import org.apache.drill.exec.planner.logical.DrillRel;
import org.apache.drill.exec.planner.logical.RelOptHelper;
import org.apache.drill.exec.planner.physical.DrillDistributionTrait.DistributionField;
+import org.eigenbase.rel.AggregateCall;
import org.eigenbase.rel.InvalidRelException;
import org.eigenbase.rel.RelCollation;
import org.eigenbase.rel.RelCollationImpl;
@@ -34,6 +35,7 @@ import org.eigenbase.rel.RelNode;
import org.eigenbase.relopt.RelOptRule;
import org.eigenbase.relopt.RelOptRuleCall;
import org.eigenbase.relopt.RelTraitSet;
+import org.eigenbase.relopt.volcano.RelSubset;
import org.eigenbase.trace.EigenbaseTrace;
import com.google.common.collect.ImmutableList;
@@ -48,6 +50,11 @@ public class StreamAggPrule extends AggPruleBase {
}
@Override
+ public boolean matches(RelOptRuleCall call) {
+ return PrelUtil.getPlannerSettings(call.getPlanner()).isStreamAggEnabled();
+ }
+
+ @Override
public void onMatch(RelOptRuleCall call) {
final DrillAggregateRel aggregate = (DrillAggregateRel) call.rel(0);
final RelNode input = aggregate.getChild();
@@ -82,31 +89,41 @@ public class StreamAggPrule extends AggPruleBase {
// might be causing some problem.
/// TODO: re-enable this plan after resolving the issue.
// createTransformRequest(call, aggregate, input, traits);
-
-
- /*
- // create a 2-phase plan - commented out for now until we resolve planning for 'ANY' distribution
- traits = call.getPlanner().emptyTraitSet().plus(Prel.DRILL_PHYSICAL).plus(collation).plus(DrillDistributionTrait.ANY);
-
- RelNode convertedInput = convert(input, traits);
- StreamAggPrel phase1Agg = new StreamAggPrel(aggregate.getCluster(), traits, convertedInput,
- aggregate.getGroupSet(),
- aggregate.getAggCallList());
-
- int numEndPoints = PrelUtil.getSettings(phase1Agg.getCluster()).numEndPoints();
-
- HashToMergeExchangePrel exch =
- new HashToMergeExchangePrel(phase1Agg.getCluster(), phase1Agg.getTraitSet().plus(Prel.DRILL_PHYSICAL).plus(distOnAllKeys),
- phase1Agg, ImmutableList.copyOf(getDistributionField(aggregate, true)),
- collation,
- numEndPoints);
-
- StreamAggPrel phase2Agg = new StreamAggPrel(aggregate.getCluster(), traits, exch,
- aggregate.getGroupSet(),
- aggregate.getAggCallList());
-
- call.transformTo(phase2Agg);
- */
+
+ if (create2PhasePlan(call, aggregate)) {
+ traits = call.getPlanner().emptyTraitSet().plus(Prel.DRILL_PHYSICAL) ;
+
+ RelNode convertedInput = convert(input, traits);
+
+ if (convertedInput instanceof RelSubset) {
+ RelSubset subset = (RelSubset) convertedInput;
+ for (RelNode rel : subset.getRelList()) {
+ if (!rel.getTraitSet().getTrait(DrillDistributionTraitDef.INSTANCE).equals(DrillDistributionTrait.DEFAULT)) {
+ DrillDistributionTrait toDist = rel.getTraitSet().getTrait(DrillDistributionTraitDef.INSTANCE);
+ traits = call.getPlanner().emptyTraitSet().plus(Prel.DRILL_PHYSICAL).plus(collation).plus(toDist);
+ RelNode newInput = convert(input, traits);
+
+ StreamAggPrel phase1Agg = new StreamAggPrel(aggregate.getCluster(), traits, newInput,
+ aggregate.getGroupSet(),
+ aggregate.getAggCallList());
+
+ int numEndPoints = PrelUtil.getSettings(phase1Agg.getCluster()).numEndPoints();
+
+ HashToMergeExchangePrel exch =
+ new HashToMergeExchangePrel(phase1Agg.getCluster(), phase1Agg.getTraitSet().plus(Prel.DRILL_PHYSICAL).plus(distOnAllKeys),
+ phase1Agg, ImmutableList.copyOf(getDistributionField(aggregate, true)),
+ collation,
+ numEndPoints);
+
+ StreamAggPrel phase2Agg = new StreamAggPrel(aggregate.getCluster(), traits, exch,
+ aggregate.getGroupSet(),
+ aggregate.getAggCallList());
+
+ call.transformTo(phase2Agg);
+ }
+ }
+ }
+ }
}
} catch (InvalidRelException e) {
tracer.warning(e.toString());
http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/e4d3c7ca/exec/java-exec/src/main/java/org/apache/drill/exec/planner/sql/DrillSqlWorker.java
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/planner/sql/DrillSqlWorker.java b/exec/java-exec/src/main/java/org/apache/drill/exec/planner/sql/DrillSqlWorker.java
index bd57785..7477440 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/planner/sql/DrillSqlWorker.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/planner/sql/DrillSqlWorker.java
@@ -86,20 +86,22 @@ public class DrillSqlWorker {
.traitDefs(traitDefs) //
.convertletTable(new DrillConvertletTable()) //
.context(context.getPlannerSettings()) //
- .ruleSets(getRules(context.getStorage())) //
+ .ruleSets(getRules(context)) //
.costFactory(costFactory) //
.build();
this.planner = Frameworks.getPlanner(config);
}
- private static RuleSet[] getRules(StoragePluginRegistry storagePluginRegistry) {
+ private static RuleSet[] getRules(QueryContext context) {
+ StoragePluginRegistry storagePluginRegistry = context.getStorage();
if (allRules == null) {
synchronized (DrillSqlWorker.class) {
if (allRules == null) {
- RuleSet dirllPhysicalMem = DrillRuleSets.mergedRuleSets(
- DrillRuleSets.DRILL_PHYSICAL_MEM, storagePluginRegistry.getStoragePluginRuleSet());
- allRules = new RuleSet[] {DrillRuleSets.DRILL_BASIC_RULES, dirllPhysicalMem};
+ RuleSet drillPhysicalMem = DrillRuleSets.mergedRuleSets(
+ DrillRuleSets.getPhysicalRules(context),
+ storagePluginRegistry.getStoragePluginRuleSet());
+ allRules = new RuleSet[] {DrillRuleSets.DRILL_BASIC_RULES, drillPhysicalMem};
}
}
}
http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/e4d3c7ca/exec/java-exec/src/main/java/org/apache/drill/exec/server/options/SystemOptionManager.java
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/server/options/SystemOptionManager.java b/exec/java-exec/src/main/java/org/apache/drill/exec/server/options/SystemOptionManager.java
index 98975e4..cfe8e2c 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/server/options/SystemOptionManager.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/server/options/SystemOptionManager.java
@@ -33,7 +33,12 @@ import com.google.common.collect.Maps;
public class SystemOptionManager implements OptionManager{
private final OptionValidator[] VALIDATORS = {
- PlannerSettings.EXCHANGE
+ PlannerSettings.EXCHANGE,
+ PlannerSettings.HASHAGG,
+ PlannerSettings.STREAMAGG,
+ PlannerSettings.HASHJOIN,
+ PlannerSettings.MERGEJOIN,
+ PlannerSettings.MULTIPHASE
};
private DistributedMap<OptionValue> options;