You are viewing a plain text version of this content. The canonical link for it is here.
Posted to dev@drill.apache.org by ja...@apache.org on 2014/05/13 02:56:32 UTC

[09/11] git commit: DRILL-690: Generate 2 phase plans for Hash Aggr and Streaming Aggr only if the aggr functions are SUM, MIN, MAX.

DRILL-690: Generate 2 phase plans for Hash Aggr and Streaming Aggr only if the aggr functions are SUM, MIN, MAX.

Added session options for enabling/disabling aggrs, joins and multiphase aggrs.  Modified DrillRuleSet to populate the rules based on options rather than from static list.  Added matches() implementation for join and aggr rules.


Project: http://git-wip-us.apache.org/repos/asf/incubator-drill/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-drill/commit/e4d3c7ca
Tree: http://git-wip-us.apache.org/repos/asf/incubator-drill/tree/e4d3c7ca
Diff: http://git-wip-us.apache.org/repos/asf/incubator-drill/diff/e4d3c7ca

Branch: refs/heads/master
Commit: e4d3c7ca9e2cc20defb5fae7c0373d982147cabb
Parents: a50ab2b
Author: Aman Sinha <as...@maprtech.com>
Authored: Sun May 11 17:57:09 2014 -0700
Committer: Jacques Nadeau <ja...@apache.org>
Committed: Mon May 12 12:19:22 2014 -0700

----------------------------------------------------------------------
 .../exec/planner/logical/DrillRuleSets.java     | 55 ++++++++++++----
 .../exec/planner/physical/AggPruleBase.java     | 18 ++++++
 .../exec/planner/physical/HashAggPrule.java     | 56 +++++++++++++---
 .../exec/planner/physical/HashJoinPrule.java    |  5 ++
 .../exec/planner/physical/MergeJoinPrule.java   |  5 ++
 .../exec/planner/physical/PlannerSettings.java  | 29 ++++++++-
 .../drill/exec/planner/physical/PrelUtil.java   |  5 ++
 .../exec/planner/physical/StreamAggPrule.java   | 67 ++++++++++++--------
 .../drill/exec/planner/sql/DrillSqlWorker.java  | 12 ++--
 .../server/options/SystemOptionManager.java     |  7 +-
 10 files changed, 204 insertions(+), 55 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/e4d3c7ca/exec/java-exec/src/main/java/org/apache/drill/exec/planner/logical/DrillRuleSets.java
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/planner/logical/DrillRuleSets.java b/exec/java-exec/src/main/java/org/apache/drill/exec/planner/logical/DrillRuleSets.java
index 0d6da25..c07fee3 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/planner/logical/DrillRuleSets.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/planner/logical/DrillRuleSets.java
@@ -17,22 +17,14 @@
  */
 package org.apache.drill.exec.planner.logical;
 
+import java.util.ArrayList;
 import java.util.Iterator;
+import java.util.List;
 
 import net.hydromatic.optiq.tools.RuleSet;
 
+import org.apache.drill.exec.ops.QueryContext;
 import org.apache.drill.exec.planner.physical.*;
-import org.apache.drill.exec.planner.physical.FilterPrule;
-import org.apache.drill.exec.planner.physical.HashAggPrule;
-import org.apache.drill.exec.planner.physical.HashJoinPrule;
-import org.apache.drill.exec.planner.physical.LimitPrule;
-import org.apache.drill.exec.planner.physical.MergeJoinPrule;
-import org.apache.drill.exec.planner.physical.ProjectPrule;
-import org.apache.drill.exec.planner.physical.ScanPrule;
-import org.apache.drill.exec.planner.physical.ScreenPrule;
-import org.apache.drill.exec.planner.physical.SortConvertPrule;
-import org.apache.drill.exec.planner.physical.SortPrule;
-import org.apache.drill.exec.planner.physical.StreamAggPrule;
 import org.eigenbase.rel.RelFactories;
 import org.eigenbase.rel.rules.MergeProjectRule;
 import org.eigenbase.rel.rules.PushFilterPastJoinRule;
@@ -102,6 +94,7 @@ public class DrillRuleSets {
       MergeProjectRule.INSTANCE
       ));
 
+  /* 
   public static final RuleSet DRILL_PHYSICAL_MEM = new DrillRuleSet(ImmutableSet.of( //
 //      DrillScanRule.INSTANCE,
 //      DrillFilterRule.INSTANCE,
@@ -150,12 +143,50 @@ public class DrillRuleSets {
 //    PushJoinThroughJoinRule.LEFT, //
 //    PushSortPastProjectRule.INSTANCE, //
     ));
-
+*/
   public static final RuleSet DRILL_PHYSICAL_DISK = new DrillRuleSet(ImmutableSet.of( //
       ProjectPrule.INSTANCE
 
     ));
 
+  public static final RuleSet getPhysicalRules(QueryContext qcontext) {
+    List<RelOptRule> ruleList = new ArrayList<RelOptRule>(); 
+
+    
+    ruleList.add(ConvertCountToDirectScan.AGG_ON_PROJ_ON_SCAN);
+    ruleList.add(ConvertCountToDirectScan.AGG_ON_SCAN);
+    ruleList.add(SortConvertPrule.INSTANCE);
+    ruleList.add(SortPrule.INSTANCE);
+    ruleList.add(ProjectPrule.INSTANCE);
+    ruleList.add(ScanPrule.INSTANCE);
+    ruleList.add(ScreenPrule.INSTANCE);
+    ruleList.add(ExpandConversionRule.INSTANCE);
+    ruleList.add(FilterPrule.INSTANCE);
+    ruleList.add(LimitPrule.INSTANCE);
+    ruleList.add(WriterPrule.INSTANCE);
+    ruleList.add(PushLimitToTopN.INSTANCE);
+    
+    PlannerSettings ps = qcontext.getPlannerSettings();
+    
+    if (ps.isHashAggEnabled()) {
+      ruleList.add(HashAggPrule.INSTANCE);        
+    }
+    
+    if (ps.isStreamAggEnabled()) {
+      ruleList.add(StreamAggPrule.INSTANCE);        
+    }
+    
+    if (ps.isHashJoinEnabled()) {
+      ruleList.add(HashJoinPrule.INSTANCE);        
+    }
+    
+    if (ps.isMergeJoinEnabled()) {
+      ruleList.add(MergeJoinPrule.INSTANCE);        
+    }
+  
+    return new DrillRuleSet(ImmutableSet.copyOf(ruleList)); 
+  }
+
   public static RuleSet create(ImmutableSet<RelOptRule> rules) {
     return new DrillRuleSet(rules);
   }

http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/e4d3c7ca/exec/java-exec/src/main/java/org/apache/drill/exec/planner/physical/AggPruleBase.java
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/planner/physical/AggPruleBase.java b/exec/java-exec/src/main/java/org/apache/drill/exec/planner/physical/AggPruleBase.java
index 235018d..3bdcc2e 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/planner/physical/AggPruleBase.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/planner/physical/AggPruleBase.java
@@ -24,7 +24,9 @@ import net.hydromatic.optiq.util.BitSets;
 
 import org.apache.drill.exec.planner.logical.DrillAggregateRel;
 import org.apache.drill.exec.planner.physical.DrillDistributionTrait.DistributionField;
+import org.eigenbase.rel.AggregateCall;
 import org.eigenbase.relopt.RelOptRule;
+import org.eigenbase.relopt.RelOptRuleCall;
 import org.eigenbase.relopt.RelOptRuleOperand;
 
 import com.google.common.collect.Lists;
@@ -54,4 +56,20 @@ public abstract class AggPruleBase extends RelOptRule {
     return groupByFields;
   }
   
+  // Create 2 phase aggr plan for aggregates such as SUM, MIN, MAX
+  // If any of the aggregate functions are not one of these, then we 
+  // currently won't generate a 2 phase plan. 
+  protected boolean create2PhasePlan(RelOptRuleCall call, DrillAggregateRel aggregate) {
+    if (! PrelUtil.getPlannerSettings(call.getPlanner()).isMultiPhaseAggEnabled()) {
+      return false;
+    }
+    
+    for (AggregateCall aggCall : aggregate.getAggCallList()) {
+      String name = aggCall.getAggregation().getName();
+      if ( ! (name.equals("SUM") || name.equals("MIN") || name.equals("MAX"))) {
+        return false;
+      }
+    }
+    return true;
+  }
 }

http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/e4d3c7ca/exec/java-exec/src/main/java/org/apache/drill/exec/planner/physical/HashAggPrule.java
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/planner/physical/HashAggPrule.java b/exec/java-exec/src/main/java/org/apache/drill/exec/planner/physical/HashAggPrule.java
index 22b33ea..859941b 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/planner/physical/HashAggPrule.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/planner/physical/HashAggPrule.java
@@ -27,6 +27,7 @@ import org.eigenbase.rel.RelNode;
 import org.eigenbase.relopt.RelOptRule;
 import org.eigenbase.relopt.RelOptRuleCall;
 import org.eigenbase.relopt.RelTraitSet;
+import org.eigenbase.relopt.volcano.RelSubset;
 import org.eigenbase.trace.EigenbaseTrace;
 
 import com.google.common.collect.ImmutableList;
@@ -40,6 +41,11 @@ public class HashAggPrule extends AggPruleBase {
   }
 
   @Override
+  public boolean matches(RelOptRuleCall call) {
+    return PrelUtil.getPlannerSettings(call.getPlanner()).isHashAggEnabled();
+  }
+  
+  @Override
   public void onMatch(RelOptRuleCall call) {
     final DrillAggregateRel aggregate = (DrillAggregateRel) call.rel(0);
     final RelNode input = call.rel(1);
@@ -50,30 +56,60 @@ public class HashAggPrule extends AggPruleBase {
       return;
     }
     
-    DrillDistributionTrait toDist = null;
     RelTraitSet traits = null;
 
     try {
       if (aggregate.getGroupSet().isEmpty()) {
-        toDist = DrillDistributionTrait.SINGLETON;
-        traits = call.getPlanner().emptyTraitSet().plus(Prel.DRILL_PHYSICAL).plus(toDist);
+        DrillDistributionTrait singleDist = DrillDistributionTrait.SINGLETON;
+        traits = call.getPlanner().emptyTraitSet().plus(Prel.DRILL_PHYSICAL).plus(singleDist);
         createTransformRequest(call, aggregate, input, traits);
       } else {
         // hash distribute on all grouping keys
-        toDist = new DrillDistributionTrait(DrillDistributionTrait.DistributionType.HASH_DISTRIBUTED, 
-                                            ImmutableList.copyOf(getDistributionField(aggregate, true /* get all grouping keys */)));
+        DrillDistributionTrait distOnAllKeys = 
+            new DrillDistributionTrait(DrillDistributionTrait.DistributionType.HASH_DISTRIBUTED, 
+                                       ImmutableList.copyOf(getDistributionField(aggregate, true /* get all grouping keys */)));
     
-        traits = call.getPlanner().emptyTraitSet().plus(Prel.DRILL_PHYSICAL).plus(toDist);
+        traits = call.getPlanner().emptyTraitSet().plus(Prel.DRILL_PHYSICAL).plus(distOnAllKeys);
         createTransformRequest(call, aggregate, input, traits);
 
         // hash distribute on single grouping key
-        toDist = new DrillDistributionTrait(DrillDistributionTrait.DistributionType.HASH_DISTRIBUTED, 
-                                            ImmutableList.copyOf(getDistributionField(aggregate, false /* get single grouping key */)));
+        DrillDistributionTrait distOnOneKey = 
+            new DrillDistributionTrait(DrillDistributionTrait.DistributionType.HASH_DISTRIBUTED, 
+                                       ImmutableList.copyOf(getDistributionField(aggregate, false /* get single grouping key */)));
     
-        traits = call.getPlanner().emptyTraitSet().plus(Prel.DRILL_PHYSICAL).plus(toDist);
+        traits = call.getPlanner().emptyTraitSet().plus(Prel.DRILL_PHYSICAL).plus(distOnOneKey);
         createTransformRequest(call, aggregate, input, traits);
         
-        ///TODO: 2 phase hash aggregate plan 
+        if (create2PhasePlan(call, aggregate)) {
+          traits = call.getPlanner().emptyTraitSet().plus(Prel.DRILL_PHYSICAL) ;
+
+          RelNode convertedInput = convert(input, traits);  
+
+          if (convertedInput instanceof RelSubset) {
+            RelSubset subset = (RelSubset) convertedInput;
+            for (RelNode rel : subset.getRelList()) {
+              if (!rel.getTraitSet().getTrait(DrillDistributionTraitDef.INSTANCE).equals(DrillDistributionTrait.DEFAULT)) {
+                DrillDistributionTrait toDist = rel.getTraitSet().getTrait(DrillDistributionTraitDef.INSTANCE);
+                traits = call.getPlanner().emptyTraitSet().plus(Prel.DRILL_PHYSICAL).plus(toDist);
+                RelNode newInput = convert(input, traits);
+
+                HashAggPrel phase1Agg = new HashAggPrel(aggregate.getCluster(), traits, newInput,
+                    aggregate.getGroupSet(),
+                    aggregate.getAggCallList());
+
+                HashToRandomExchangePrel exch =
+                    new HashToRandomExchangePrel(phase1Agg.getCluster(), phase1Agg.getTraitSet().plus(Prel.DRILL_PHYSICAL).plus(distOnAllKeys),
+                        phase1Agg, ImmutableList.copyOf(getDistributionField(aggregate, true)));
+
+                HashAggPrel phase2Agg =  new HashAggPrel(aggregate.getCluster(), traits, exch,
+                                                         aggregate.getGroupSet(),
+                                                         aggregate.getAggCallList());
+
+                call.transformTo(phase2Agg);                   
+              }
+            }
+          }    
+        }
       } 
     } catch (InvalidRelException e) {
       tracer.warning(e.toString());

http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/e4d3c7ca/exec/java-exec/src/main/java/org/apache/drill/exec/planner/physical/HashJoinPrule.java
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/planner/physical/HashJoinPrule.java b/exec/java-exec/src/main/java/org/apache/drill/exec/planner/physical/HashJoinPrule.java
index 5e3ace2..851877f 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/planner/physical/HashJoinPrule.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/planner/physical/HashJoinPrule.java
@@ -41,6 +41,11 @@ public class HashJoinPrule extends JoinPruleBase {
   }
 
   @Override
+  public boolean matches(RelOptRuleCall call) {
+    return PrelUtil.getPlannerSettings(call.getPlanner()).isHashJoinEnabled();
+  }
+  
+  @Override
   public void onMatch(RelOptRuleCall call) {
     final DrillJoinRel join = (DrillJoinRel) call.rel(0);
     final RelNode left = call.rel(1);

http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/e4d3c7ca/exec/java-exec/src/main/java/org/apache/drill/exec/planner/physical/MergeJoinPrule.java
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/planner/physical/MergeJoinPrule.java b/exec/java-exec/src/main/java/org/apache/drill/exec/planner/physical/MergeJoinPrule.java
index 30e2a97..30f651c 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/planner/physical/MergeJoinPrule.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/planner/physical/MergeJoinPrule.java
@@ -46,6 +46,11 @@ public class MergeJoinPrule extends JoinPruleBase {
   }
 
   @Override
+  public boolean matches(RelOptRuleCall call) {
+    return PrelUtil.getPlannerSettings(call.getPlanner()).isMergeJoinEnabled();
+  }
+  
+  @Override
   public void onMatch(RelOptRuleCall call) {
     final DrillJoinRel join = (DrillJoinRel) call.rel(0);
     final RelNode left = join.getLeft();

http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/e4d3c7ca/exec/java-exec/src/main/java/org/apache/drill/exec/planner/physical/PlannerSettings.java
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/planner/physical/PlannerSettings.java b/exec/java-exec/src/main/java/org/apache/drill/exec/planner/physical/PlannerSettings.java
index ae895da..0f694af 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/planner/physical/PlannerSettings.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/planner/physical/PlannerSettings.java
@@ -30,8 +30,13 @@ public class PlannerSettings implements FrameworkContext{
   private boolean useDefaultCosting = false; // True: use default Optiq costing, False: use Drill costing
 
   public static final OptionValidator EXCHANGE = new BooleanValidator("planner.disable_exchanges", false);
-
-  public OptionManager options;
+  public static final OptionValidator HASHAGG = new BooleanValidator("planner.enable_hashagg", true);
+  public static final OptionValidator STREAMAGG = new BooleanValidator("planner.enable_streamagg", true);  
+  public static final OptionValidator HASHJOIN = new BooleanValidator("planner.enable_hashjoin", true);  
+  public static final OptionValidator MERGEJOIN = new BooleanValidator("planner.enable_mergejoin", true);  
+  public static final OptionValidator MULTIPHASE = new BooleanValidator("planner.enable_multiphase_agg", true);  
+  
+  public OptionManager options = null;
 
   public PlannerSettings(OptionManager options){
     this.options = options;
@@ -57,6 +62,26 @@ public class PlannerSettings implements FrameworkContext{
     this.useDefaultCosting = defcost;
   }
   
+  public boolean isHashAggEnabled() {
+    return options.getOption(HASHAGG.getOptionName()).bool_val;  
+  }
+  
+  public boolean isStreamAggEnabled() {
+    return options.getOption(STREAMAGG.getOptionName()).bool_val;  
+  }
+  
+  public boolean isHashJoinEnabled() {
+    return options.getOption(HASHJOIN.getOptionName()).bool_val;
+  }
+  
+  public boolean isMergeJoinEnabled() {
+    return options.getOption(MERGEJOIN.getOptionName()).bool_val;  
+  }
+  
+  public boolean isMultiPhaseAggEnabled() {
+    return options.getOption(MULTIPHASE.getOptionName()).bool_val;
+  }
+  
   @Override
   public <T> T unwrap(Class<T> clazz) {
     if(clazz == PlannerSettings.class){

http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/e4d3c7ca/exec/java-exec/src/main/java/org/apache/drill/exec/planner/physical/PrelUtil.java
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/planner/physical/PrelUtil.java b/exec/java-exec/src/main/java/org/apache/drill/exec/planner/physical/PrelUtil.java
index bdc8b31..9ca9fbb 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/planner/physical/PrelUtil.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/planner/physical/PrelUtil.java
@@ -36,6 +36,7 @@ import org.apache.drill.exec.record.BatchSchema.SelectionVectorMode;
 import org.eigenbase.rel.RelCollation;
 import org.eigenbase.rel.RelFieldCollation;
 import org.eigenbase.relopt.RelOptCluster;
+import org.eigenbase.relopt.RelOptPlanner;
 import org.eigenbase.reltype.RelDataType;
 import org.eigenbase.rex.RexCall;
 import org.eigenbase.rex.RexInputRef;
@@ -86,6 +87,10 @@ public class PrelUtil {
   public static PlannerSettings getSettings(RelOptCluster cluster){
     return cluster.getPlanner().getFrameworkContext().unwrap(PlannerSettings.class);
   }
+  
+  public static PlannerSettings getPlannerSettings(RelOptPlanner planner) {
+    return planner.getFrameworkContext().unwrap(PlannerSettings.class);
+  }
 
   public static PhysicalOperator removeSvIfRequired(PhysicalOperator child, SelectionVectorMode... allowed){
     SelectionVectorMode current = child.getSVMode();

http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/e4d3c7ca/exec/java-exec/src/main/java/org/apache/drill/exec/planner/physical/StreamAggPrule.java
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/planner/physical/StreamAggPrule.java b/exec/java-exec/src/main/java/org/apache/drill/exec/planner/physical/StreamAggPrule.java
index 62b9aa5..bccdea5 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/planner/physical/StreamAggPrule.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/planner/physical/StreamAggPrule.java
@@ -26,6 +26,7 @@ import org.apache.drill.exec.planner.logical.DrillAggregateRel;
 import org.apache.drill.exec.planner.logical.DrillRel;
 import org.apache.drill.exec.planner.logical.RelOptHelper;
 import org.apache.drill.exec.planner.physical.DrillDistributionTrait.DistributionField;
+import org.eigenbase.rel.AggregateCall;
 import org.eigenbase.rel.InvalidRelException;
 import org.eigenbase.rel.RelCollation;
 import org.eigenbase.rel.RelCollationImpl;
@@ -34,6 +35,7 @@ import org.eigenbase.rel.RelNode;
 import org.eigenbase.relopt.RelOptRule;
 import org.eigenbase.relopt.RelOptRuleCall;
 import org.eigenbase.relopt.RelTraitSet;
+import org.eigenbase.relopt.volcano.RelSubset;
 import org.eigenbase.trace.EigenbaseTrace;
 
 import com.google.common.collect.ImmutableList;
@@ -48,6 +50,11 @@ public class StreamAggPrule extends AggPruleBase {
   }
 
   @Override
+  public boolean matches(RelOptRuleCall call) {
+    return PrelUtil.getPlannerSettings(call.getPlanner()).isStreamAggEnabled();
+  }
+  
+  @Override
   public void onMatch(RelOptRuleCall call) {
     final DrillAggregateRel aggregate = (DrillAggregateRel) call.rel(0);
     final RelNode input = aggregate.getChild();
@@ -82,31 +89,41 @@ public class StreamAggPrule extends AggPruleBase {
         // might be causing some problem. 
         /// TODO: re-enable this plan after resolving the issue.  
         // createTransformRequest(call, aggregate, input, traits);
-       
-        
- /*       
-        // create a 2-phase plan - commented out for now until we resolve planning for 'ANY' distribution 
-        traits = call.getPlanner().emptyTraitSet().plus(Prel.DRILL_PHYSICAL).plus(collation).plus(DrillDistributionTrait.ANY);
-
-        RelNode convertedInput = convert(input, traits);
-        StreamAggPrel phase1Agg = new StreamAggPrel(aggregate.getCluster(), traits, convertedInput,
-                                                    aggregate.getGroupSet(),
-                                                    aggregate.getAggCallList());
-
-        int numEndPoints = PrelUtil.getSettings(phase1Agg.getCluster()).numEndPoints();
-        
-        HashToMergeExchangePrel exch =
-            new HashToMergeExchangePrel(phase1Agg.getCluster(), phase1Agg.getTraitSet().plus(Prel.DRILL_PHYSICAL).plus(distOnAllKeys),
-                                        phase1Agg, ImmutableList.copyOf(getDistributionField(aggregate, true)),
-                                        collation,
-                                        numEndPoints);
-        
-        StreamAggPrel phase2Agg =  new StreamAggPrel(aggregate.getCluster(), traits, exch,
-                                                     aggregate.getGroupSet(),
-                                                     aggregate.getAggCallList());
-
-        call.transformTo(phase2Agg);      
-  */     
+ 
+        if (create2PhasePlan(call, aggregate)) {
+          traits = call.getPlanner().emptyTraitSet().plus(Prel.DRILL_PHYSICAL) ;
+
+          RelNode convertedInput = convert(input, traits);  
+
+          if (convertedInput instanceof RelSubset) {
+            RelSubset subset = (RelSubset) convertedInput;
+            for (RelNode rel : subset.getRelList()) {
+              if (!rel.getTraitSet().getTrait(DrillDistributionTraitDef.INSTANCE).equals(DrillDistributionTrait.DEFAULT)) {
+                DrillDistributionTrait toDist = rel.getTraitSet().getTrait(DrillDistributionTraitDef.INSTANCE);              
+                traits = call.getPlanner().emptyTraitSet().plus(Prel.DRILL_PHYSICAL).plus(collation).plus(toDist);
+                RelNode newInput = convert(input, traits);
+
+                StreamAggPrel phase1Agg = new StreamAggPrel(aggregate.getCluster(), traits, newInput,
+                    aggregate.getGroupSet(),
+                    aggregate.getAggCallList());
+
+                int numEndPoints = PrelUtil.getSettings(phase1Agg.getCluster()).numEndPoints();
+
+                HashToMergeExchangePrel exch =
+                    new HashToMergeExchangePrel(phase1Agg.getCluster(), phase1Agg.getTraitSet().plus(Prel.DRILL_PHYSICAL).plus(distOnAllKeys),
+                        phase1Agg, ImmutableList.copyOf(getDistributionField(aggregate, true)),
+                        collation,
+                        numEndPoints);
+
+                StreamAggPrel phase2Agg =  new StreamAggPrel(aggregate.getCluster(), traits, exch,
+                    aggregate.getGroupSet(),
+                    aggregate.getAggCallList());
+
+                call.transformTo(phase2Agg);                   
+              }
+            }
+          }    
+        }
       } 
     } catch (InvalidRelException e) {
       tracer.warning(e.toString());

http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/e4d3c7ca/exec/java-exec/src/main/java/org/apache/drill/exec/planner/sql/DrillSqlWorker.java
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/planner/sql/DrillSqlWorker.java b/exec/java-exec/src/main/java/org/apache/drill/exec/planner/sql/DrillSqlWorker.java
index bd57785..7477440 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/planner/sql/DrillSqlWorker.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/planner/sql/DrillSqlWorker.java
@@ -86,20 +86,22 @@ public class DrillSqlWorker {
         .traitDefs(traitDefs) //
         .convertletTable(new DrillConvertletTable()) //
         .context(context.getPlannerSettings()) //
-        .ruleSets(getRules(context.getStorage())) //
+        .ruleSets(getRules(context)) //
         .costFactory(costFactory) //
         .build();
     this.planner = Frameworks.getPlanner(config);
 
   }
 
-  private static RuleSet[] getRules(StoragePluginRegistry storagePluginRegistry) {
+  private static RuleSet[] getRules(QueryContext context) {
+    StoragePluginRegistry storagePluginRegistry = context.getStorage();
     if (allRules == null) {
       synchronized (DrillSqlWorker.class) {
         if (allRules == null) {
-          RuleSet dirllPhysicalMem = DrillRuleSets.mergedRuleSets(
-              DrillRuleSets.DRILL_PHYSICAL_MEM, storagePluginRegistry.getStoragePluginRuleSet());
-          allRules = new RuleSet[] {DrillRuleSets.DRILL_BASIC_RULES, dirllPhysicalMem};
+          RuleSet drillPhysicalMem = DrillRuleSets.mergedRuleSets(
+              DrillRuleSets.getPhysicalRules(context),
+              storagePluginRegistry.getStoragePluginRuleSet());
+          allRules = new RuleSet[] {DrillRuleSets.DRILL_BASIC_RULES, drillPhysicalMem};
         }
       }
     }

http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/e4d3c7ca/exec/java-exec/src/main/java/org/apache/drill/exec/server/options/SystemOptionManager.java
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/server/options/SystemOptionManager.java b/exec/java-exec/src/main/java/org/apache/drill/exec/server/options/SystemOptionManager.java
index 98975e4..cfe8e2c 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/server/options/SystemOptionManager.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/server/options/SystemOptionManager.java
@@ -33,7 +33,12 @@ import com.google.common.collect.Maps;
 public class SystemOptionManager implements OptionManager{
 
   private final OptionValidator[] VALIDATORS = {
-      PlannerSettings.EXCHANGE
+      PlannerSettings.EXCHANGE, 
+      PlannerSettings.HASHAGG,
+      PlannerSettings.STREAMAGG,
+      PlannerSettings.HASHJOIN,
+      PlannerSettings.MERGEJOIN, 
+      PlannerSettings.MULTIPHASE
   };
 
   private DistributedMap<OptionValue> options;