You are viewing a plain text version of this content. The canonical link for it is here.
Posted to dev@drill.apache.org by ja...@apache.org on 2014/05/20 05:01:57 UTC

[09/14] git commit: DRILL-768: Support 2 phase COUNT() aggregates.

DRILL-768: Support 2 phase COUNT() aggregates.

Create a new SUM aggregate function whose return type is non-nullable to match the return type of COUNT.


Project: http://git-wip-us.apache.org/repos/asf/incubator-drill/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-drill/commit/78ae2658
Tree: http://git-wip-us.apache.org/repos/asf/incubator-drill/tree/78ae2658
Diff: http://git-wip-us.apache.org/repos/asf/incubator-drill/diff/78ae2658

Branch: refs/heads/master
Commit: 78ae26589745b0ed538a15644f6fd6897cb9e5ad
Parents: e9ac37d
Author: Aman Sinha <as...@maprtech.com>
Authored: Fri May 16 10:33:10 2014 -0700
Committer: Jacques Nadeau <ja...@apache.org>
Committed: Mon May 19 17:46:39 2014 -0700

----------------------------------------------------------------------
 .../exec/planner/physical/AggPrelBase.java      | 180 +++++++++++++++++++
 .../exec/planner/physical/AggPruleBase.java     |   2 +-
 .../exec/planner/physical/HashAggPrel.java      |  51 +-----
 .../exec/planner/physical/HashAggPrule.java     |  10 +-
 .../exec/planner/physical/StreamAggPrel.java    |  72 ++------
 .../exec/planner/physical/StreamAggPrule.java   |  21 ++-
 6 files changed, 221 insertions(+), 115 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/78ae2658/exec/java-exec/src/main/java/org/apache/drill/exec/planner/physical/AggPrelBase.java
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/planner/physical/AggPrelBase.java b/exec/java-exec/src/main/java/org/apache/drill/exec/planner/physical/AggPrelBase.java
new file mode 100644
index 0000000..c3b1188
--- /dev/null
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/planner/physical/AggPrelBase.java
@@ -0,0 +1,180 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.drill.exec.planner.physical;
+
+import java.util.BitSet;
+import java.util.Collections;
+import java.util.Iterator;
+import java.util.List;
+
+import net.hydromatic.linq4j.Ord;
+import net.hydromatic.optiq.util.BitSets;
+
+import org.eigenbase.rel.AggregateCall;
+import org.eigenbase.rel.AggregateRelBase;
+import org.eigenbase.rel.Aggregation;
+import org.eigenbase.rel.InvalidRelException;
+import org.eigenbase.rel.RelNode;
+import org.eigenbase.relopt.RelOptCluster;
+import org.eigenbase.relopt.RelTraitSet;
+import org.eigenbase.reltype.RelDataType;
+import org.eigenbase.reltype.RelDataTypeFactory;
+import org.eigenbase.sql.SqlAggFunction;
+import org.eigenbase.sql.SqlFunctionCategory;
+import org.eigenbase.sql.SqlKind;
+import org.eigenbase.sql.type.OperandTypes;
+import org.eigenbase.sql.type.ReturnTypes;
+import org.apache.drill.common.expression.ExpressionPosition;
+import org.apache.drill.common.expression.FieldReference;
+import org.apache.drill.common.expression.FunctionCall;
+import org.apache.drill.common.expression.LogicalExpression;
+import org.apache.drill.common.expression.ValueExpressions;
+import org.apache.drill.common.logical.data.NamedExpression;
+import org.apache.drill.exec.planner.logical.DrillParseContext;
+
+import com.beust.jcommander.internal.Lists;
+import com.google.common.collect.ImmutableList;
+
+
+public abstract class AggPrelBase extends AggregateRelBase implements Prel{
+
+  protected static enum OperatorPhase {PHASE_1of1, PHASE_1of2, PHASE_2of2};
+
+  protected OperatorPhase operPhase = OperatorPhase.PHASE_1of1 ; // default phase
+  protected List<NamedExpression> keys = Lists.newArrayList();
+  protected List<NamedExpression> aggExprs = Lists.newArrayList();
+  protected List<AggregateCall> phase2AggCallList = Lists.newArrayList();
+  
+  
+  /**
+   * Specialized aggregate function for SUMing the COUNTs.  Since return type of 
+   * COUNT is non-nullable and return type of SUM is nullable, this class enables 
+   * creating a SUM whose return type is non-nullable. 
+   *
+   */
+  public class SqlSumCountAggFunction extends SqlAggFunction {
+ 
+    private final RelDataType type;
+    
+    public SqlSumCountAggFunction(RelDataType type) {
+      super("SUM",
+          SqlKind.OTHER_FUNCTION,
+          ReturnTypes.BIGINT, // use the inferred return type of SqlCountAggFunction
+          null,
+          OperandTypes.NUMERIC,
+          SqlFunctionCategory.NUMERIC);
+      
+      this.type = type;
+    }
+ 
+    public List<RelDataType> getParameterTypes(RelDataTypeFactory typeFactory) {
+      return ImmutableList.of(type);
+    }
+
+    public RelDataType getType() {
+      return type;
+    }
+
+    public RelDataType getReturnType(RelDataTypeFactory typeFactory) {
+      return type;
+    }
+    
+  }
+  
+  public AggPrelBase(RelOptCluster cluster, RelTraitSet traits, RelNode child, BitSet groupSet,
+      List<AggregateCall> aggCalls, OperatorPhase phase) throws InvalidRelException {
+    super(cluster, traits, child, groupSet, aggCalls);
+    this.operPhase = phase;
+    createKeysAndExprs();
+  }
+  
+  public OperatorPhase getOperatorPhase() {
+    return operPhase;  
+  }
+  
+  public List<NamedExpression> getKeys() {
+    return keys;
+  }
+
+  public List<NamedExpression> getAggExprs() {
+    return aggExprs;  
+  }
+  
+  public List<AggregateCall> getPhase2AggCalls() {
+    return phase2AggCallList;  
+  }
+  
+  protected void createKeysAndExprs() {
+    final List<String> childFields = getChild().getRowType().getFieldNames();
+    final List<String> fields = getRowType().getFieldNames();
+
+    for (int group : BitSets.toIter(groupSet)) {
+      FieldReference fr = new FieldReference(childFields.get(group), ExpressionPosition.UNKNOWN);
+      keys.add(new NamedExpression(fr, fr));
+    }
+
+    for (Ord<AggregateCall> aggCall : Ord.zip(aggCalls)) {
+      int aggExprOrdinal = groupSet.cardinality() + aggCall.i;
+      FieldReference ref = new FieldReference(fields.get(aggExprOrdinal));
+      LogicalExpression expr = toDrill(aggCall.e, childFields, new DrillParseContext());
+      NamedExpression ne = new NamedExpression(expr, ref);
+      aggExprs.add(ne);
+      
+      if (getOperatorPhase() == OperatorPhase.PHASE_1of2) {
+        if (aggCall.e.getAggregation().getName().equals("COUNT")) {
+          // If we are doing a COUNT aggregate in Phase1of2, then in Phase2of2 we should SUM the COUNTs, 
+          Aggregation sumAggFun = new SqlSumCountAggFunction(aggCall.e.getType());
+          AggregateCall newAggCall = 
+              new AggregateCall(
+                  sumAggFun, 
+                  aggCall.e.isDistinct(), 
+                  Collections.singletonList(aggExprOrdinal), 
+                  aggCall.e.getType(),
+                  aggCall.e.getName());
+
+          phase2AggCallList.add(newAggCall); 
+        } else {
+          phase2AggCallList.add(aggCall.e);
+        }
+      }
+    }    
+  }
+  
+  protected LogicalExpression toDrill(AggregateCall call, List<String> fn, DrillParseContext pContext) {
+    List<LogicalExpression> args = Lists.newArrayList();    
+    for(Integer i : call.getArgList()){
+      args.add(new FieldReference(fn.get(i)));
+    }
+
+    // for count(1).
+    if(args.isEmpty()) args.add(new ValueExpressions.LongExpression(1l));
+    LogicalExpression expr = new FunctionCall(call.getAggregation().getName().toLowerCase(), args, ExpressionPosition.UNKNOWN );
+    return expr;
+  }
+  
+  @Override
+  public Iterator<Prel> iterator() {
+    return PrelUtil.iter(getChild());
+  }
+
+  @Override
+  public <T, X, E extends Throwable> T accept(PrelVisitor<T, X, E> logicalVisitor, X value) throws E {
+    return logicalVisitor.visitPrel(this, value);
+  }
+
+}

http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/78ae2658/exec/java-exec/src/main/java/org/apache/drill/exec/planner/physical/AggPruleBase.java
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/planner/physical/AggPruleBase.java b/exec/java-exec/src/main/java/org/apache/drill/exec/planner/physical/AggPruleBase.java
index 563458e..4edeaf8 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/planner/physical/AggPruleBase.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/planner/physical/AggPruleBase.java
@@ -67,7 +67,7 @@ public abstract class AggPruleBase extends Prule {
 
     for (AggregateCall aggCall : aggregate.getAggCallList()) {
       String name = aggCall.getAggregation().getName();
-      if ( ! (name.equals("SUM") || name.equals("MIN") || name.equals("MAX"))) {
+      if ( ! (name.equals("SUM") || name.equals("MIN") || name.equals("MAX") || name.equals("COUNT"))) {
         return false;
       }
     }

http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/78ae2658/exec/java-exec/src/main/java/org/apache/drill/exec/planner/physical/HashAggPrel.java
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/planner/physical/HashAggPrel.java b/exec/java-exec/src/main/java/org/apache/drill/exec/planner/physical/HashAggPrel.java
index b2378be..31feb48 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/planner/physical/HashAggPrel.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/planner/physical/HashAggPrel.java
@@ -21,6 +21,7 @@ import java.io.IOException;
 import java.util.BitSet;
 import java.util.Iterator;
 import java.util.List;
+import java.util.Map;
 
 import net.hydromatic.linq4j.Ord;
 import net.hydromatic.optiq.util.BitSets;
@@ -49,18 +50,19 @@ import org.eigenbase.relopt.RelTraitSet;
 
 import com.beust.jcommander.internal.Lists;
 
-public class HashAggPrel extends AggregateRelBase implements Prel{
+public class HashAggPrel extends AggPrelBase implements Prel{
 
   static final org.slf4j.Logger logger = org.slf4j.LoggerFactory.getLogger(HashAggPrel.class);
 
   public HashAggPrel(RelOptCluster cluster, RelTraitSet traits, RelNode child, BitSet groupSet,
-      List<AggregateCall> aggCalls) throws InvalidRelException {
-    super(cluster, traits, child, groupSet, aggCalls);
+      List<AggregateCall> aggCalls, OperatorPhase phase) throws InvalidRelException {
+    super(cluster, traits, child, groupSet, aggCalls, phase);
   }
 
   public AggregateRelBase copy(RelTraitSet traitSet, RelNode input, BitSet groupSet, List<AggregateCall> aggCalls) {
     try {
-      return new HashAggPrel(getCluster(), traitSet, input, getGroupSet(), aggCalls);
+      return new HashAggPrel(getCluster(), traitSet, input, getGroupSet(), aggCalls, 
+          this.getOperatorPhase());
     } catch (InvalidRelException e) {
       throw new AssertionError(e);
     }
@@ -88,54 +90,16 @@ public class HashAggPrel extends AggregateRelBase implements Prel{
   @Override
   public PhysicalOperator getPhysicalOperator(PhysicalPlanCreator creator) throws IOException {
 
-    final List<String> childFields = getChild().getRowType().getFieldNames();
-    final List<String> fields = getRowType().getFieldNames();
-    List<NamedExpression> keys = Lists.newArrayList();
-    List<NamedExpression> exprs = Lists.newArrayList();
-
-    for (int group : BitSets.toIter(groupSet)) {
-      FieldReference fr = new FieldReference(childFields.get(group), ExpressionPosition.UNKNOWN);
-      keys.add(new NamedExpression(fr, fr));
-    }
-
-    for (Ord<AggregateCall> aggCall : Ord.zip(aggCalls)) {
-      FieldReference ref = new FieldReference(fields.get(groupSet.cardinality() + aggCall.i));
-      LogicalExpression expr = toDrill(aggCall.e, childFields, new DrillParseContext());
-      exprs.add(new NamedExpression(expr, ref));
-    }
-
     Prel child = (Prel) this.getChild();
     HashAggregate g = new HashAggregate(child.getPhysicalOperator(creator),
         keys.toArray(new NamedExpression[keys.size()]),
-        exprs.toArray(new NamedExpression[exprs.size()]),
+        aggExprs.toArray(new NamedExpression[aggExprs.size()]),
         1.0f);
 
     return g;
 
   }
 
-  private LogicalExpression toDrill(AggregateCall call, List<String> fn, DrillParseContext pContext) {
-    List<LogicalExpression> args = Lists.newArrayList();
-    for(Integer i : call.getArgList()){
-      args.add(new FieldReference(fn.get(i)));
-    }
-
-    // for count(1).
-    if(args.isEmpty()) args.add(new ValueExpressions.LongExpression(1l));
-    LogicalExpression expr = new FunctionCall(call.getAggregation().getName().toLowerCase(), args, ExpressionPosition.UNKNOWN );
-    return expr;
-  }
-
-  @Override
-  public Iterator<Prel> iterator() {
-    return PrelUtil.iter(getChild());
-  }
-
-  @Override
-  public <T, X, E extends Throwable> T accept(PrelVisitor<T, X, E> logicalVisitor, X value) throws E {
-    return logicalVisitor.visitPrel(this, value);
-  }
-
   @Override
   public SelectionVectorMode[] getSupportedEncodings() {
     return SelectionVectorMode.DEFAULT;
@@ -146,5 +110,4 @@ public class HashAggPrel extends AggregateRelBase implements Prel{
     return SelectionVectorMode.NONE;
   }
 
-
 }

http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/78ae2658/exec/java-exec/src/main/java/org/apache/drill/exec/planner/physical/HashAggPrule.java
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/planner/physical/HashAggPrule.java b/exec/java-exec/src/main/java/org/apache/drill/exec/planner/physical/HashAggPrule.java
index 9395a1d..95c8362 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/planner/physical/HashAggPrule.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/planner/physical/HashAggPrule.java
@@ -22,6 +22,7 @@ import java.util.logging.Logger;
 import org.apache.drill.exec.planner.logical.DrillAggregateRel;
 import org.apache.drill.exec.planner.logical.DrillRel;
 import org.apache.drill.exec.planner.logical.RelOptHelper;
+import org.apache.drill.exec.planner.physical.AggPrelBase.OperatorPhase;
 import org.eigenbase.rel.InvalidRelException;
 import org.eigenbase.rel.RelNode;
 import org.eigenbase.relopt.RelOptRule;
@@ -95,7 +96,8 @@ public class HashAggPrule extends AggPruleBase {
 
                 HashAggPrel phase1Agg = new HashAggPrel(aggregate.getCluster(), traits, newInput,
                     aggregate.getGroupSet(),
-                    aggregate.getAggCallList());
+                    aggregate.getAggCallList(), 
+                    OperatorPhase.PHASE_1of2);
 
                 HashToRandomExchangePrel exch =
                     new HashToRandomExchangePrel(phase1Agg.getCluster(), phase1Agg.getTraitSet().plus(Prel.DRILL_PHYSICAL).plus(distOnAllKeys),
@@ -103,7 +105,9 @@ public class HashAggPrule extends AggPruleBase {
 
                 HashAggPrel phase2Agg =  new HashAggPrel(aggregate.getCluster(), traits, exch,
                                                          aggregate.getGroupSet(),
-                                                         aggregate.getAggCallList());
+                                                         phase1Agg.getPhase2AggCalls(), 
+                                                         OperatorPhase.PHASE_2of2); 
+                                                    
 
                 call.transformTo(phase2Agg);
               }
@@ -122,7 +126,7 @@ public class HashAggPrule extends AggPruleBase {
     final RelNode convertedInput = convert(input, PrelUtil.fixTraits(call, traits));
 
     HashAggPrel newAgg = new HashAggPrel(aggregate.getCluster(), traits, convertedInput, aggregate.getGroupSet(),
-                                         aggregate.getAggCallList());
+                                         aggregate.getAggCallList(), OperatorPhase.PHASE_1of1);
 
     call.transformTo(newAgg);
   }

http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/78ae2658/exec/java-exec/src/main/java/org/apache/drill/exec/planner/physical/StreamAggPrel.java
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/planner/physical/StreamAggPrel.java b/exec/java-exec/src/main/java/org/apache/drill/exec/planner/physical/StreamAggPrel.java
index 5fb758a..9706254 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/planner/physical/StreamAggPrel.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/planner/physical/StreamAggPrel.java
@@ -19,24 +19,13 @@ package org.apache.drill.exec.planner.physical;
 
 import java.io.IOException;
 import java.util.BitSet;
-import java.util.Iterator;
 import java.util.List;
 
-import net.hydromatic.linq4j.Ord;
-import net.hydromatic.optiq.util.BitSets;
-
-import org.apache.drill.common.expression.ExpressionPosition;
-import org.apache.drill.common.expression.FieldReference;
-import org.apache.drill.common.expression.FunctionCall;
-import org.apache.drill.common.expression.LogicalExpression;
-import org.apache.drill.common.expression.ValueExpressions;
 import org.apache.drill.common.logical.data.NamedExpression;
 import org.apache.drill.exec.physical.base.PhysicalOperator;
-import org.apache.drill.exec.physical.config.SingleMergeExchange;
 import org.apache.drill.exec.physical.config.StreamingAggregate;
 import org.apache.drill.exec.planner.cost.DrillCostBase;
 import org.apache.drill.exec.planner.cost.DrillCostBase.DrillCostFactory;
-import org.apache.drill.exec.planner.logical.DrillParseContext;
 import org.apache.drill.exec.record.BatchSchema.SelectionVectorMode;
 import org.eigenbase.rel.AggregateCall;
 import org.eigenbase.rel.AggregateRelBase;
@@ -48,30 +37,27 @@ import org.eigenbase.relopt.RelOptCost;
 import org.eigenbase.relopt.RelOptPlanner;
 import org.eigenbase.relopt.RelTraitSet;
 
-import com.beust.jcommander.internal.Lists;
-
-public class StreamAggPrel extends AggregateRelBase implements Prel{
+public class StreamAggPrel extends AggPrelBase implements Prel{
 
   static final org.slf4j.Logger logger = org.slf4j.LoggerFactory.getLogger(StreamAggPrel.class);
 
+
+  
   public StreamAggPrel(RelOptCluster cluster, RelTraitSet traits, RelNode child, BitSet groupSet,
-      List<AggregateCall> aggCalls) throws InvalidRelException {
-    super(cluster, traits, child, groupSet, aggCalls);
-    for (AggregateCall aggCall : aggCalls) {
-      if (aggCall.isDistinct()) {
-        throw new InvalidRelException("DrillAggregateRel does not support DISTINCT aggregates");
-      }
-    }
+      List<AggregateCall> aggCalls, OperatorPhase phase) throws InvalidRelException {
+    super(cluster, traits, child, groupSet, aggCalls, phase);
   }
 
   public AggregateRelBase copy(RelTraitSet traitSet, RelNode input, BitSet groupSet, List<AggregateCall> aggCalls) {
     try {
-      return new StreamAggPrel(getCluster(), traitSet, input, getGroupSet(), aggCalls);
+      return new StreamAggPrel(getCluster(), traitSet, input, getGroupSet(), aggCalls, 
+          this.getOperatorPhase());
     } catch (InvalidRelException e) {
       throw new AssertionError(e);
     }
   }
 
+
   @Override
   public RelOptCost computeSelfCost(RelOptPlanner planner) {
     if(PrelUtil.getSettings(getCluster()).useDefaultCosting()) {
@@ -91,51 +77,15 @@ public class StreamAggPrel extends AggregateRelBase implements Prel{
 
   @Override
   public PhysicalOperator getPhysicalOperator(PhysicalPlanCreator creator) throws IOException {
-    final List<String> childFields = getChild().getRowType().getFieldNames();
-    final List<String> fields = getRowType().getFieldNames();
-    List<NamedExpression> keys = Lists.newArrayList();
-    List<NamedExpression> exprs = Lists.newArrayList();
-
-    for (int group : BitSets.toIter(groupSet)) {
-      FieldReference fr = new FieldReference(childFields.get(group), ExpressionPosition.UNKNOWN);
-      keys.add(new NamedExpression(fr, fr));
-    }
-
-    for (Ord<AggregateCall> aggCall : Ord.zip(aggCalls)) {
-      FieldReference ref = new FieldReference(fields.get(groupSet.cardinality() + aggCall.i));
-      LogicalExpression expr = toDrill(aggCall.e, childFields, new DrillParseContext());
-      exprs.add(new NamedExpression(expr, ref));
-    }
 
     Prel child = (Prel) this.getChild();
-    StreamingAggregate g = new StreamingAggregate(child.getPhysicalOperator(creator), keys.toArray(new NamedExpression[keys.size()]), exprs.toArray(new NamedExpression[exprs.size()]), 1.0f);
+    StreamingAggregate g = new StreamingAggregate(child.getPhysicalOperator(creator), keys.toArray(new NamedExpression[keys.size()]), 
+        aggExprs.toArray(new NamedExpression[aggExprs.size()]), 1.0f);
 
     return g;
 
   }
-
-  private LogicalExpression toDrill(AggregateCall call, List<String> fn, DrillParseContext pContext) {
-    List<LogicalExpression> args = Lists.newArrayList();
-    for(Integer i : call.getArgList()){
-      args.add(new FieldReference(fn.get(i)));
-    }
-
-    // for count(1).
-    if(args.isEmpty()) args.add(new ValueExpressions.LongExpression(1l));
-    LogicalExpression expr = new FunctionCall(call.getAggregation().getName().toLowerCase(), args, ExpressionPosition.UNKNOWN );
-    return expr;
-  }
-
-  @Override
-  public Iterator<Prel> iterator() {
-    return PrelUtil.iter(getChild());
-  }
-
-  @Override
-  public <T, X, E extends Throwable> T accept(PrelVisitor<T, X, E> logicalVisitor, X value) throws E {
-    return logicalVisitor.visitPrel(this, value);
-  }
-
+  
   @Override
   public SelectionVectorMode[] getSupportedEncodings() {
     return SelectionVectorMode.ALL;

http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/78ae2658/exec/java-exec/src/main/java/org/apache/drill/exec/planner/physical/StreamAggPrule.java
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/planner/physical/StreamAggPrule.java b/exec/java-exec/src/main/java/org/apache/drill/exec/planner/physical/StreamAggPrule.java
index ff648a4..9a60a14 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/planner/physical/StreamAggPrule.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/planner/physical/StreamAggPrule.java
@@ -25,6 +25,7 @@ import net.hydromatic.optiq.util.BitSets;
 import org.apache.drill.exec.planner.logical.DrillAggregateRel;
 import org.apache.drill.exec.planner.logical.DrillRel;
 import org.apache.drill.exec.planner.logical.RelOptHelper;
+import org.apache.drill.exec.planner.physical.AggPrelBase.OperatorPhase;
 import org.apache.drill.exec.planner.physical.DrillDistributionTrait.DistributionField;
 import org.eigenbase.rel.AggregateCall;
 import org.eigenbase.rel.InvalidRelException;
@@ -59,8 +60,12 @@ public class StreamAggPrule extends AggPruleBase {
     final DrillAggregateRel aggregate = (DrillAggregateRel) call.rel(0);
     final RelNode input = aggregate.getChild();
     RelCollation collation = getCollation(aggregate);
-
     RelTraitSet traits = null;
+    
+    if (aggregate.containsDistinctCall()) {
+      // currently, don't use StreamingAggregate if any of the logical aggrs contains DISTINCT
+      return;
+    }
 
     try {
       if (aggregate.getGroupSet().isEmpty()) {
@@ -82,14 +87,16 @@ public class StreamAggPrule extends AggPruleBase {
 
                 StreamAggPrel phase1Agg = new StreamAggPrel(aggregate.getCluster(), traits, newInput,
                     aggregate.getGroupSet(),
-                    aggregate.getAggCallList());
+                    aggregate.getAggCallList(), 
+                    OperatorPhase.PHASE_1of2);
 
                 UnionExchangePrel exch = 
                     new UnionExchangePrel(phase1Agg.getCluster(), singleDistTrait, phase1Agg);
         
                 StreamAggPrel phase2Agg =  new StreamAggPrel(aggregate.getCluster(), singleDistTrait, exch,
                     aggregate.getGroupSet(),
-                    aggregate.getAggCallList());
+                    phase1Agg.getPhase2AggCalls(),
+                    OperatorPhase.PHASE_2of2);
 
                 call.transformTo(phase2Agg);  
               }
@@ -135,7 +142,8 @@ public class StreamAggPrule extends AggPruleBase {
 
                 StreamAggPrel phase1Agg = new StreamAggPrel(aggregate.getCluster(), traits, newInput,
                     aggregate.getGroupSet(),
-                    aggregate.getAggCallList());
+                    aggregate.getAggCallList(), 
+                    OperatorPhase.PHASE_1of2);
 
                 int numEndPoints = PrelUtil.getSettings(phase1Agg.getCluster()).numEndPoints();
 
@@ -147,7 +155,8 @@ public class StreamAggPrule extends AggPruleBase {
 
                 StreamAggPrel phase2Agg =  new StreamAggPrel(aggregate.getCluster(), traits, exch,
                     aggregate.getGroupSet(),
-                    aggregate.getAggCallList());
+                    phase1Agg.getPhase2AggCalls(), 
+                    OperatorPhase.PHASE_2of2);
 
                 call.transformTo(phase2Agg);                   
               }
@@ -166,7 +175,7 @@ public class StreamAggPrule extends AggPruleBase {
     final RelNode convertedInput = convert(input, traits);
     
     StreamAggPrel newAgg = new StreamAggPrel(aggregate.getCluster(), traits, convertedInput, aggregate.getGroupSet(),
-                                             aggregate.getAggCallList());
+                                             aggregate.getAggCallList(), OperatorPhase.PHASE_1of1);
       
     call.transformTo(newAgg);
   }