You are viewing a plain text version of this content. The canonical link for it is here.
Posted to dev@drill.apache.org by ja...@apache.org on 2014/05/20 05:01:57 UTC
[09/14] git commit: DRILL-768: Support 2 phase COUNT() aggregates.
DRILL-768: Support 2 phase COUNT() aggregates.
Create a new SUM aggregate function whose return type is non-nullable to match the return type of COUNT.
Project: http://git-wip-us.apache.org/repos/asf/incubator-drill/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-drill/commit/78ae2658
Tree: http://git-wip-us.apache.org/repos/asf/incubator-drill/tree/78ae2658
Diff: http://git-wip-us.apache.org/repos/asf/incubator-drill/diff/78ae2658
Branch: refs/heads/master
Commit: 78ae26589745b0ed538a15644f6fd6897cb9e5ad
Parents: e9ac37d
Author: Aman Sinha <as...@maprtech.com>
Authored: Fri May 16 10:33:10 2014 -0700
Committer: Jacques Nadeau <ja...@apache.org>
Committed: Mon May 19 17:46:39 2014 -0700
----------------------------------------------------------------------
.../exec/planner/physical/AggPrelBase.java | 180 +++++++++++++++++++
.../exec/planner/physical/AggPruleBase.java | 2 +-
.../exec/planner/physical/HashAggPrel.java | 51 +-----
.../exec/planner/physical/HashAggPrule.java | 10 +-
.../exec/planner/physical/StreamAggPrel.java | 72 ++------
.../exec/planner/physical/StreamAggPrule.java | 21 ++-
6 files changed, 221 insertions(+), 115 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/78ae2658/exec/java-exec/src/main/java/org/apache/drill/exec/planner/physical/AggPrelBase.java
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/planner/physical/AggPrelBase.java b/exec/java-exec/src/main/java/org/apache/drill/exec/planner/physical/AggPrelBase.java
new file mode 100644
index 0000000..c3b1188
--- /dev/null
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/planner/physical/AggPrelBase.java
@@ -0,0 +1,180 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.drill.exec.planner.physical;
+
+import java.util.BitSet;
+import java.util.Collections;
+import java.util.Iterator;
+import java.util.List;
+
+import net.hydromatic.linq4j.Ord;
+import net.hydromatic.optiq.util.BitSets;
+
+import org.eigenbase.rel.AggregateCall;
+import org.eigenbase.rel.AggregateRelBase;
+import org.eigenbase.rel.Aggregation;
+import org.eigenbase.rel.InvalidRelException;
+import org.eigenbase.rel.RelNode;
+import org.eigenbase.relopt.RelOptCluster;
+import org.eigenbase.relopt.RelTraitSet;
+import org.eigenbase.reltype.RelDataType;
+import org.eigenbase.reltype.RelDataTypeFactory;
+import org.eigenbase.sql.SqlAggFunction;
+import org.eigenbase.sql.SqlFunctionCategory;
+import org.eigenbase.sql.SqlKind;
+import org.eigenbase.sql.type.OperandTypes;
+import org.eigenbase.sql.type.ReturnTypes;
+import org.apache.drill.common.expression.ExpressionPosition;
+import org.apache.drill.common.expression.FieldReference;
+import org.apache.drill.common.expression.FunctionCall;
+import org.apache.drill.common.expression.LogicalExpression;
+import org.apache.drill.common.expression.ValueExpressions;
+import org.apache.drill.common.logical.data.NamedExpression;
+import org.apache.drill.exec.planner.logical.DrillParseContext;
+
+import com.beust.jcommander.internal.Lists;
+import com.google.common.collect.ImmutableList;
+
+
+public abstract class AggPrelBase extends AggregateRelBase implements Prel{
+
+ protected static enum OperatorPhase {PHASE_1of1, PHASE_1of2, PHASE_2of2};
+
+ protected OperatorPhase operPhase = OperatorPhase.PHASE_1of1 ; // default phase
+ protected List<NamedExpression> keys = Lists.newArrayList();
+ protected List<NamedExpression> aggExprs = Lists.newArrayList();
+ protected List<AggregateCall> phase2AggCallList = Lists.newArrayList();
+
+
+ /**
+ * Specialized aggregate function for SUMing the COUNTs. Since return type of
+ * COUNT is non-nullable and return type of SUM is nullable, this class enables
+ * creating a SUM whose return type is non-nullable.
+ *
+ */
+ public class SqlSumCountAggFunction extends SqlAggFunction {
+
+ private final RelDataType type;
+
+ public SqlSumCountAggFunction(RelDataType type) {
+ super("SUM",
+ SqlKind.OTHER_FUNCTION,
+ ReturnTypes.BIGINT, // use the inferred return type of SqlCountAggFunction
+ null,
+ OperandTypes.NUMERIC,
+ SqlFunctionCategory.NUMERIC);
+
+ this.type = type;
+ }
+
+ public List<RelDataType> getParameterTypes(RelDataTypeFactory typeFactory) {
+ return ImmutableList.of(type);
+ }
+
+ public RelDataType getType() {
+ return type;
+ }
+
+ public RelDataType getReturnType(RelDataTypeFactory typeFactory) {
+ return type;
+ }
+
+ }
+
+ public AggPrelBase(RelOptCluster cluster, RelTraitSet traits, RelNode child, BitSet groupSet,
+ List<AggregateCall> aggCalls, OperatorPhase phase) throws InvalidRelException {
+ super(cluster, traits, child, groupSet, aggCalls);
+ this.operPhase = phase;
+ createKeysAndExprs();
+ }
+
+ public OperatorPhase getOperatorPhase() {
+ return operPhase;
+ }
+
+ public List<NamedExpression> getKeys() {
+ return keys;
+ }
+
+ public List<NamedExpression> getAggExprs() {
+ return aggExprs;
+ }
+
+ public List<AggregateCall> getPhase2AggCalls() {
+ return phase2AggCallList;
+ }
+
+ protected void createKeysAndExprs() {
+ final List<String> childFields = getChild().getRowType().getFieldNames();
+ final List<String> fields = getRowType().getFieldNames();
+
+ for (int group : BitSets.toIter(groupSet)) {
+ FieldReference fr = new FieldReference(childFields.get(group), ExpressionPosition.UNKNOWN);
+ keys.add(new NamedExpression(fr, fr));
+ }
+
+ for (Ord<AggregateCall> aggCall : Ord.zip(aggCalls)) {
+ int aggExprOrdinal = groupSet.cardinality() + aggCall.i;
+ FieldReference ref = new FieldReference(fields.get(aggExprOrdinal));
+ LogicalExpression expr = toDrill(aggCall.e, childFields, new DrillParseContext());
+ NamedExpression ne = new NamedExpression(expr, ref);
+ aggExprs.add(ne);
+
+ if (getOperatorPhase() == OperatorPhase.PHASE_1of2) {
+ if (aggCall.e.getAggregation().getName().equals("COUNT")) {
+ // If we are doing a COUNT aggregate in Phase1of2, then in Phase2of2 we should SUM the COUNTs,
+ Aggregation sumAggFun = new SqlSumCountAggFunction(aggCall.e.getType());
+ AggregateCall newAggCall =
+ new AggregateCall(
+ sumAggFun,
+ aggCall.e.isDistinct(),
+ Collections.singletonList(aggExprOrdinal),
+ aggCall.e.getType(),
+ aggCall.e.getName());
+
+ phase2AggCallList.add(newAggCall);
+ } else {
+ phase2AggCallList.add(aggCall.e);
+ }
+ }
+ }
+ }
+
+ protected LogicalExpression toDrill(AggregateCall call, List<String> fn, DrillParseContext pContext) {
+ List<LogicalExpression> args = Lists.newArrayList();
+ for(Integer i : call.getArgList()){
+ args.add(new FieldReference(fn.get(i)));
+ }
+
+ // for count(1).
+ if(args.isEmpty()) args.add(new ValueExpressions.LongExpression(1l));
+ LogicalExpression expr = new FunctionCall(call.getAggregation().getName().toLowerCase(), args, ExpressionPosition.UNKNOWN );
+ return expr;
+ }
+
+ @Override
+ public Iterator<Prel> iterator() {
+ return PrelUtil.iter(getChild());
+ }
+
+ @Override
+ public <T, X, E extends Throwable> T accept(PrelVisitor<T, X, E> logicalVisitor, X value) throws E {
+ return logicalVisitor.visitPrel(this, value);
+ }
+
+}
http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/78ae2658/exec/java-exec/src/main/java/org/apache/drill/exec/planner/physical/AggPruleBase.java
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/planner/physical/AggPruleBase.java b/exec/java-exec/src/main/java/org/apache/drill/exec/planner/physical/AggPruleBase.java
index 563458e..4edeaf8 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/planner/physical/AggPruleBase.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/planner/physical/AggPruleBase.java
@@ -67,7 +67,7 @@ public abstract class AggPruleBase extends Prule {
for (AggregateCall aggCall : aggregate.getAggCallList()) {
String name = aggCall.getAggregation().getName();
- if ( ! (name.equals("SUM") || name.equals("MIN") || name.equals("MAX"))) {
+ if ( ! (name.equals("SUM") || name.equals("MIN") || name.equals("MAX") || name.equals("COUNT"))) {
return false;
}
}
http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/78ae2658/exec/java-exec/src/main/java/org/apache/drill/exec/planner/physical/HashAggPrel.java
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/planner/physical/HashAggPrel.java b/exec/java-exec/src/main/java/org/apache/drill/exec/planner/physical/HashAggPrel.java
index b2378be..31feb48 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/planner/physical/HashAggPrel.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/planner/physical/HashAggPrel.java
@@ -21,6 +21,7 @@ import java.io.IOException;
import java.util.BitSet;
import java.util.Iterator;
import java.util.List;
+import java.util.Map;
import net.hydromatic.linq4j.Ord;
import net.hydromatic.optiq.util.BitSets;
@@ -49,18 +50,19 @@ import org.eigenbase.relopt.RelTraitSet;
import com.beust.jcommander.internal.Lists;
-public class HashAggPrel extends AggregateRelBase implements Prel{
+public class HashAggPrel extends AggPrelBase implements Prel{
static final org.slf4j.Logger logger = org.slf4j.LoggerFactory.getLogger(HashAggPrel.class);
public HashAggPrel(RelOptCluster cluster, RelTraitSet traits, RelNode child, BitSet groupSet,
- List<AggregateCall> aggCalls) throws InvalidRelException {
- super(cluster, traits, child, groupSet, aggCalls);
+ List<AggregateCall> aggCalls, OperatorPhase phase) throws InvalidRelException {
+ super(cluster, traits, child, groupSet, aggCalls, phase);
}
public AggregateRelBase copy(RelTraitSet traitSet, RelNode input, BitSet groupSet, List<AggregateCall> aggCalls) {
try {
- return new HashAggPrel(getCluster(), traitSet, input, getGroupSet(), aggCalls);
+ return new HashAggPrel(getCluster(), traitSet, input, getGroupSet(), aggCalls,
+ this.getOperatorPhase());
} catch (InvalidRelException e) {
throw new AssertionError(e);
}
@@ -88,54 +90,16 @@ public class HashAggPrel extends AggregateRelBase implements Prel{
@Override
public PhysicalOperator getPhysicalOperator(PhysicalPlanCreator creator) throws IOException {
- final List<String> childFields = getChild().getRowType().getFieldNames();
- final List<String> fields = getRowType().getFieldNames();
- List<NamedExpression> keys = Lists.newArrayList();
- List<NamedExpression> exprs = Lists.newArrayList();
-
- for (int group : BitSets.toIter(groupSet)) {
- FieldReference fr = new FieldReference(childFields.get(group), ExpressionPosition.UNKNOWN);
- keys.add(new NamedExpression(fr, fr));
- }
-
- for (Ord<AggregateCall> aggCall : Ord.zip(aggCalls)) {
- FieldReference ref = new FieldReference(fields.get(groupSet.cardinality() + aggCall.i));
- LogicalExpression expr = toDrill(aggCall.e, childFields, new DrillParseContext());
- exprs.add(new NamedExpression(expr, ref));
- }
-
Prel child = (Prel) this.getChild();
HashAggregate g = new HashAggregate(child.getPhysicalOperator(creator),
keys.toArray(new NamedExpression[keys.size()]),
- exprs.toArray(new NamedExpression[exprs.size()]),
+ aggExprs.toArray(new NamedExpression[aggExprs.size()]),
1.0f);
return g;
}
- private LogicalExpression toDrill(AggregateCall call, List<String> fn, DrillParseContext pContext) {
- List<LogicalExpression> args = Lists.newArrayList();
- for(Integer i : call.getArgList()){
- args.add(new FieldReference(fn.get(i)));
- }
-
- // for count(1).
- if(args.isEmpty()) args.add(new ValueExpressions.LongExpression(1l));
- LogicalExpression expr = new FunctionCall(call.getAggregation().getName().toLowerCase(), args, ExpressionPosition.UNKNOWN );
- return expr;
- }
-
- @Override
- public Iterator<Prel> iterator() {
- return PrelUtil.iter(getChild());
- }
-
- @Override
- public <T, X, E extends Throwable> T accept(PrelVisitor<T, X, E> logicalVisitor, X value) throws E {
- return logicalVisitor.visitPrel(this, value);
- }
-
@Override
public SelectionVectorMode[] getSupportedEncodings() {
return SelectionVectorMode.DEFAULT;
@@ -146,5 +110,4 @@ public class HashAggPrel extends AggregateRelBase implements Prel{
return SelectionVectorMode.NONE;
}
-
}
http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/78ae2658/exec/java-exec/src/main/java/org/apache/drill/exec/planner/physical/HashAggPrule.java
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/planner/physical/HashAggPrule.java b/exec/java-exec/src/main/java/org/apache/drill/exec/planner/physical/HashAggPrule.java
index 9395a1d..95c8362 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/planner/physical/HashAggPrule.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/planner/physical/HashAggPrule.java
@@ -22,6 +22,7 @@ import java.util.logging.Logger;
import org.apache.drill.exec.planner.logical.DrillAggregateRel;
import org.apache.drill.exec.planner.logical.DrillRel;
import org.apache.drill.exec.planner.logical.RelOptHelper;
+import org.apache.drill.exec.planner.physical.AggPrelBase.OperatorPhase;
import org.eigenbase.rel.InvalidRelException;
import org.eigenbase.rel.RelNode;
import org.eigenbase.relopt.RelOptRule;
@@ -95,7 +96,8 @@ public class HashAggPrule extends AggPruleBase {
HashAggPrel phase1Agg = new HashAggPrel(aggregate.getCluster(), traits, newInput,
aggregate.getGroupSet(),
- aggregate.getAggCallList());
+ aggregate.getAggCallList(),
+ OperatorPhase.PHASE_1of2);
HashToRandomExchangePrel exch =
new HashToRandomExchangePrel(phase1Agg.getCluster(), phase1Agg.getTraitSet().plus(Prel.DRILL_PHYSICAL).plus(distOnAllKeys),
@@ -103,7 +105,9 @@ public class HashAggPrule extends AggPruleBase {
HashAggPrel phase2Agg = new HashAggPrel(aggregate.getCluster(), traits, exch,
aggregate.getGroupSet(),
- aggregate.getAggCallList());
+ phase1Agg.getPhase2AggCalls(),
+ OperatorPhase.PHASE_2of2);
+
call.transformTo(phase2Agg);
}
@@ -122,7 +126,7 @@ public class HashAggPrule extends AggPruleBase {
final RelNode convertedInput = convert(input, PrelUtil.fixTraits(call, traits));
HashAggPrel newAgg = new HashAggPrel(aggregate.getCluster(), traits, convertedInput, aggregate.getGroupSet(),
- aggregate.getAggCallList());
+ aggregate.getAggCallList(), OperatorPhase.PHASE_1of1);
call.transformTo(newAgg);
}
http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/78ae2658/exec/java-exec/src/main/java/org/apache/drill/exec/planner/physical/StreamAggPrel.java
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/planner/physical/StreamAggPrel.java b/exec/java-exec/src/main/java/org/apache/drill/exec/planner/physical/StreamAggPrel.java
index 5fb758a..9706254 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/planner/physical/StreamAggPrel.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/planner/physical/StreamAggPrel.java
@@ -19,24 +19,13 @@ package org.apache.drill.exec.planner.physical;
import java.io.IOException;
import java.util.BitSet;
-import java.util.Iterator;
import java.util.List;
-import net.hydromatic.linq4j.Ord;
-import net.hydromatic.optiq.util.BitSets;
-
-import org.apache.drill.common.expression.ExpressionPosition;
-import org.apache.drill.common.expression.FieldReference;
-import org.apache.drill.common.expression.FunctionCall;
-import org.apache.drill.common.expression.LogicalExpression;
-import org.apache.drill.common.expression.ValueExpressions;
import org.apache.drill.common.logical.data.NamedExpression;
import org.apache.drill.exec.physical.base.PhysicalOperator;
-import org.apache.drill.exec.physical.config.SingleMergeExchange;
import org.apache.drill.exec.physical.config.StreamingAggregate;
import org.apache.drill.exec.planner.cost.DrillCostBase;
import org.apache.drill.exec.planner.cost.DrillCostBase.DrillCostFactory;
-import org.apache.drill.exec.planner.logical.DrillParseContext;
import org.apache.drill.exec.record.BatchSchema.SelectionVectorMode;
import org.eigenbase.rel.AggregateCall;
import org.eigenbase.rel.AggregateRelBase;
@@ -48,30 +37,27 @@ import org.eigenbase.relopt.RelOptCost;
import org.eigenbase.relopt.RelOptPlanner;
import org.eigenbase.relopt.RelTraitSet;
-import com.beust.jcommander.internal.Lists;
-
-public class StreamAggPrel extends AggregateRelBase implements Prel{
+public class StreamAggPrel extends AggPrelBase implements Prel{
static final org.slf4j.Logger logger = org.slf4j.LoggerFactory.getLogger(StreamAggPrel.class);
+
+
public StreamAggPrel(RelOptCluster cluster, RelTraitSet traits, RelNode child, BitSet groupSet,
- List<AggregateCall> aggCalls) throws InvalidRelException {
- super(cluster, traits, child, groupSet, aggCalls);
- for (AggregateCall aggCall : aggCalls) {
- if (aggCall.isDistinct()) {
- throw new InvalidRelException("DrillAggregateRel does not support DISTINCT aggregates");
- }
- }
+ List<AggregateCall> aggCalls, OperatorPhase phase) throws InvalidRelException {
+ super(cluster, traits, child, groupSet, aggCalls, phase);
}
public AggregateRelBase copy(RelTraitSet traitSet, RelNode input, BitSet groupSet, List<AggregateCall> aggCalls) {
try {
- return new StreamAggPrel(getCluster(), traitSet, input, getGroupSet(), aggCalls);
+ return new StreamAggPrel(getCluster(), traitSet, input, getGroupSet(), aggCalls,
+ this.getOperatorPhase());
} catch (InvalidRelException e) {
throw new AssertionError(e);
}
}
+
@Override
public RelOptCost computeSelfCost(RelOptPlanner planner) {
if(PrelUtil.getSettings(getCluster()).useDefaultCosting()) {
@@ -91,51 +77,15 @@ public class StreamAggPrel extends AggregateRelBase implements Prel{
@Override
public PhysicalOperator getPhysicalOperator(PhysicalPlanCreator creator) throws IOException {
- final List<String> childFields = getChild().getRowType().getFieldNames();
- final List<String> fields = getRowType().getFieldNames();
- List<NamedExpression> keys = Lists.newArrayList();
- List<NamedExpression> exprs = Lists.newArrayList();
-
- for (int group : BitSets.toIter(groupSet)) {
- FieldReference fr = new FieldReference(childFields.get(group), ExpressionPosition.UNKNOWN);
- keys.add(new NamedExpression(fr, fr));
- }
-
- for (Ord<AggregateCall> aggCall : Ord.zip(aggCalls)) {
- FieldReference ref = new FieldReference(fields.get(groupSet.cardinality() + aggCall.i));
- LogicalExpression expr = toDrill(aggCall.e, childFields, new DrillParseContext());
- exprs.add(new NamedExpression(expr, ref));
- }
Prel child = (Prel) this.getChild();
- StreamingAggregate g = new StreamingAggregate(child.getPhysicalOperator(creator), keys.toArray(new NamedExpression[keys.size()]), exprs.toArray(new NamedExpression[exprs.size()]), 1.0f);
+ StreamingAggregate g = new StreamingAggregate(child.getPhysicalOperator(creator), keys.toArray(new NamedExpression[keys.size()]),
+ aggExprs.toArray(new NamedExpression[aggExprs.size()]), 1.0f);
return g;
}
-
- private LogicalExpression toDrill(AggregateCall call, List<String> fn, DrillParseContext pContext) {
- List<LogicalExpression> args = Lists.newArrayList();
- for(Integer i : call.getArgList()){
- args.add(new FieldReference(fn.get(i)));
- }
-
- // for count(1).
- if(args.isEmpty()) args.add(new ValueExpressions.LongExpression(1l));
- LogicalExpression expr = new FunctionCall(call.getAggregation().getName().toLowerCase(), args, ExpressionPosition.UNKNOWN );
- return expr;
- }
-
- @Override
- public Iterator<Prel> iterator() {
- return PrelUtil.iter(getChild());
- }
-
- @Override
- public <T, X, E extends Throwable> T accept(PrelVisitor<T, X, E> logicalVisitor, X value) throws E {
- return logicalVisitor.visitPrel(this, value);
- }
-
+
@Override
public SelectionVectorMode[] getSupportedEncodings() {
return SelectionVectorMode.ALL;
http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/78ae2658/exec/java-exec/src/main/java/org/apache/drill/exec/planner/physical/StreamAggPrule.java
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/planner/physical/StreamAggPrule.java b/exec/java-exec/src/main/java/org/apache/drill/exec/planner/physical/StreamAggPrule.java
index ff648a4..9a60a14 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/planner/physical/StreamAggPrule.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/planner/physical/StreamAggPrule.java
@@ -25,6 +25,7 @@ import net.hydromatic.optiq.util.BitSets;
import org.apache.drill.exec.planner.logical.DrillAggregateRel;
import org.apache.drill.exec.planner.logical.DrillRel;
import org.apache.drill.exec.planner.logical.RelOptHelper;
+import org.apache.drill.exec.planner.physical.AggPrelBase.OperatorPhase;
import org.apache.drill.exec.planner.physical.DrillDistributionTrait.DistributionField;
import org.eigenbase.rel.AggregateCall;
import org.eigenbase.rel.InvalidRelException;
@@ -59,8 +60,12 @@ public class StreamAggPrule extends AggPruleBase {
final DrillAggregateRel aggregate = (DrillAggregateRel) call.rel(0);
final RelNode input = aggregate.getChild();
RelCollation collation = getCollation(aggregate);
-
RelTraitSet traits = null;
+
+ if (aggregate.containsDistinctCall()) {
+ // currently, don't use StreamingAggregate if any of the logical aggrs contains DISTINCT
+ return;
+ }
try {
if (aggregate.getGroupSet().isEmpty()) {
@@ -82,14 +87,16 @@ public class StreamAggPrule extends AggPruleBase {
StreamAggPrel phase1Agg = new StreamAggPrel(aggregate.getCluster(), traits, newInput,
aggregate.getGroupSet(),
- aggregate.getAggCallList());
+ aggregate.getAggCallList(),
+ OperatorPhase.PHASE_1of2);
UnionExchangePrel exch =
new UnionExchangePrel(phase1Agg.getCluster(), singleDistTrait, phase1Agg);
StreamAggPrel phase2Agg = new StreamAggPrel(aggregate.getCluster(), singleDistTrait, exch,
aggregate.getGroupSet(),
- aggregate.getAggCallList());
+ phase1Agg.getPhase2AggCalls(),
+ OperatorPhase.PHASE_2of2);
call.transformTo(phase2Agg);
}
@@ -135,7 +142,8 @@ public class StreamAggPrule extends AggPruleBase {
StreamAggPrel phase1Agg = new StreamAggPrel(aggregate.getCluster(), traits, newInput,
aggregate.getGroupSet(),
- aggregate.getAggCallList());
+ aggregate.getAggCallList(),
+ OperatorPhase.PHASE_1of2);
int numEndPoints = PrelUtil.getSettings(phase1Agg.getCluster()).numEndPoints();
@@ -147,7 +155,8 @@ public class StreamAggPrule extends AggPruleBase {
StreamAggPrel phase2Agg = new StreamAggPrel(aggregate.getCluster(), traits, exch,
aggregate.getGroupSet(),
- aggregate.getAggCallList());
+ phase1Agg.getPhase2AggCalls(),
+ OperatorPhase.PHASE_2of2);
call.transformTo(phase2Agg);
}
@@ -166,7 +175,7 @@ public class StreamAggPrule extends AggPruleBase {
final RelNode convertedInput = convert(input, traits);
StreamAggPrel newAgg = new StreamAggPrel(aggregate.getCluster(), traits, convertedInput, aggregate.getGroupSet(),
- aggregate.getAggCallList());
+ aggregate.getAggCallList(), OperatorPhase.PHASE_1of1);
call.transformTo(newAgg);
}