You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@drill.apache.org by am...@apache.org on 2018/01/18 23:17:27 UTC
[03/18] drill git commit: DRILL-3993: Fix aggregate exchange rules
for the cases when aggregate rel node contains several calls
DRILL-3993: Fix aggregate exchange rules for the cases when aggregate rel node contains several calls
Project: http://git-wip-us.apache.org/repos/asf/drill/repo
Commit: http://git-wip-us.apache.org/repos/asf/drill/commit/3c9093e3
Tree: http://git-wip-us.apache.org/repos/asf/drill/tree/3c9093e3
Diff: http://git-wip-us.apache.org/repos/asf/drill/diff/3c9093e3
Branch: refs/heads/master
Commit: 3c9093e32a095bd40832bcd8fe67ab20898537c4
Parents: 22d0f7e
Author: Volodymyr Vysotskyi <vv...@gmail.com>
Authored: Thu Jan 4 16:05:53 2018 +0200
Committer: Volodymyr Vysotskyi <vv...@gmail.com>
Committed: Tue Jan 16 12:10:13 2018 +0200
----------------------------------------------------------------------
.../exec/planner/physical/AggPrelBase.java | 24 ++++++++++--------
.../exec/planner/physical/AggPruleBase.java | 26 ++++++++++++++++----
.../exec/planner/physical/HashAggPrule.java | 22 +++++++++++------
.../exec/planner/physical/StreamAggPrule.java | 25 ++++++++++++++-----
4 files changed, 69 insertions(+), 28 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/drill/blob/3c9093e3/exec/java-exec/src/main/java/org/apache/drill/exec/planner/physical/AggPrelBase.java
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/planner/physical/AggPrelBase.java b/exec/java-exec/src/main/java/org/apache/drill/exec/planner/physical/AggPrelBase.java
index 8c69930..232473b 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/planner/physical/AggPrelBase.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/planner/physical/AggPrelBase.java
@@ -1,4 +1,4 @@
-/**
+/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
@@ -31,7 +31,6 @@ import org.apache.drill.common.logical.data.NamedExpression;
import org.apache.drill.exec.planner.common.DrillAggregateRelBase;
import org.apache.drill.exec.planner.physical.visitor.PrelVisitor;
import org.apache.calcite.rel.core.AggregateCall;
-import org.apache.calcite.rel.core.Aggregate;
import org.apache.calcite.rel.InvalidRelException;
import org.apache.calcite.rel.RelNode;
import org.apache.calcite.plan.RelOptCluster;
@@ -44,14 +43,13 @@ import org.apache.calcite.sql.SqlKind;
import org.apache.calcite.sql.type.OperandTypes;
import org.apache.calcite.sql.type.ReturnTypes;
-import java.util.BitSet;
import java.util.Collections;
import java.util.Iterator;
import java.util.List;
public abstract class AggPrelBase extends DrillAggregateRelBase implements Prel {
- public static enum OperatorPhase {PHASE_1of1, PHASE_1of2, PHASE_2of2};
+ public enum OperatorPhase {PHASE_1of1, PHASE_1of2, PHASE_2of2}
protected OperatorPhase operPhase = OperatorPhase.PHASE_1of1 ; // default phase
protected List<NamedExpression> keys = Lists.newArrayList();
@@ -70,11 +68,14 @@ public abstract class AggPrelBase extends DrillAggregateRelBase implements Prel
public SqlSumCountAggFunction(RelDataType type) {
super("$SUM0",
+ null,
SqlKind.OTHER_FUNCTION,
ReturnTypes.BIGINT, // use the inferred return type of SqlCountAggFunction
null,
OperandTypes.NUMERIC,
- SqlFunctionCategory.NUMERIC);
+ SqlFunctionCategory.NUMERIC,
+ false,
+ false);
this.type = type;
}
@@ -143,20 +144,24 @@ public abstract class AggPrelBase extends DrillAggregateRelBase implements Prel
// If we are doing a COUNT aggregate in Phase1of2, then in Phase2of2 we should SUM the COUNTs,
SqlAggFunction sumAggFun = new SqlSumCountAggFunction(aggCall.e.getType());
AggregateCall newAggCall =
- new AggregateCall(
+ AggregateCall.create(
sumAggFun,
aggCall.e.isDistinct(),
+ aggCall.e.isApproximate(),
Collections.singletonList(aggExprOrdinal),
+ aggCall.e.filterArg,
aggCall.e.getType(),
aggCall.e.getName());
phase2AggCallList.add(newAggCall);
} else {
AggregateCall newAggCall =
- new AggregateCall(
+ AggregateCall.create(
aggCall.e.getAggregation(),
aggCall.e.isDistinct(),
+ aggCall.e.isApproximate(),
Collections.singletonList(aggExprOrdinal),
+ aggCall.e.filterArg,
aggCall.e.getType(),
aggCall.e.getName());
@@ -174,10 +179,9 @@ public abstract class AggPrelBase extends DrillAggregateRelBase implements Prel
// for count(1).
if (args.isEmpty()) {
- args.add(new ValueExpressions.LongExpression(1l));
+ args.add(new ValueExpressions.LongExpression(1L));
}
- LogicalExpression expr = new FunctionCall(call.getAggregation().getName().toLowerCase(), args, ExpressionPosition.UNKNOWN );
- return expr;
+ return new FunctionCall(call.getAggregation().getName().toLowerCase(), args, ExpressionPosition.UNKNOWN);
}
@Override
http://git-wip-us.apache.org/repos/asf/drill/blob/3c9093e3/exec/java-exec/src/main/java/org/apache/drill/exec/planner/physical/AggPruleBase.java
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/planner/physical/AggPruleBase.java b/exec/java-exec/src/main/java/org/apache/drill/exec/planner/physical/AggPruleBase.java
index 84e37fc..6863967 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/planner/physical/AggPruleBase.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/planner/physical/AggPruleBase.java
@@ -21,8 +21,8 @@ package org.apache.drill.exec.planner.physical;
import java.util.List;
import org.apache.calcite.sql.SqlKind;
-import org.apache.calcite.util.BitSets;
+import org.apache.calcite.util.ImmutableBitSet;
import org.apache.drill.exec.planner.logical.DrillAggregateRel;
import org.apache.drill.exec.planner.physical.DrillDistributionTrait.DistributionField;
import org.apache.calcite.rel.core.AggregateCall;
@@ -42,7 +42,7 @@ public abstract class AggPruleBase extends Prule {
protected List<DistributionField> getDistributionField(DrillAggregateRel rel, boolean allFields) {
List<DistributionField> groupByFields = Lists.newArrayList();
- for (int group : BitSets.toIter(rel.getGroupSet())) {
+ for (int group : remapGroupSet(rel.getGroupSet())) {
DistributionField field = new DistributionField(group);
groupByFields.add(field);
@@ -63,10 +63,11 @@ public abstract class AggPruleBase extends Prule {
protected boolean create2PhasePlan(RelOptRuleCall call, DrillAggregateRel aggregate) {
PlannerSettings settings = PrelUtil.getPlannerSettings(call.getPlanner());
RelNode child = call.rel(0).getInputs().get(0);
- boolean smallInput = child.getRows() < settings.getSliceTarget();
- if (! settings.isMultiPhaseAggEnabled() || settings.isSingleMode() ||
+ boolean smallInput =
+ child.estimateRowCount(child.getCluster().getMetadataQuery()) < settings.getSliceTarget();
+ if (!settings.isMultiPhaseAggEnabled() || settings.isSingleMode()
// Can override a small child - e.g., for testing with a small table
- ( smallInput && ! settings.isForce2phaseAggr() ) ) {
+ || (smallInput && !settings.isForce2phaseAggr())) {
return false;
}
@@ -82,4 +83,19 @@ public abstract class AggPruleBase extends Prule {
}
return true;
}
+
+ /**
+ * Returns group-by keys with the remapped arguments for specified aggregate.
+ *
+ * @param groupSet ImmutableBitSet of aggregate rel node, whose group-by keys should be remapped.
+ * @return {@link ImmutableBitSet} instance with remapped keys.
+ */
+ public static ImmutableBitSet remapGroupSet(ImmutableBitSet groupSet) {
+ List<Integer> newGroupSet = Lists.newArrayList();
+ int groupSetToAdd = 0;
+ for (int ignored : groupSet) {
+ newGroupSet.add(groupSetToAdd++);
+ }
+ return ImmutableBitSet.of(newGroupSet);
+ }
}
http://git-wip-us.apache.org/repos/asf/drill/blob/3c9093e3/exec/java-exec/src/main/java/org/apache/drill/exec/planner/physical/HashAggPrule.java
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/planner/physical/HashAggPrule.java b/exec/java-exec/src/main/java/org/apache/drill/exec/planner/physical/HashAggPrule.java
index f4cdf62..02dd4de 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/planner/physical/HashAggPrule.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/planner/physical/HashAggPrule.java
@@ -17,6 +17,8 @@
*/
package org.apache.drill.exec.planner.physical;
+import com.google.common.collect.Lists;
+import org.apache.calcite.util.ImmutableBitSet;
import org.apache.drill.exec.planner.logical.DrillAggregateRel;
import org.apache.drill.exec.planner.logical.RelOptHelper;
import org.apache.drill.exec.planner.physical.AggPrelBase.OperatorPhase;
@@ -31,6 +33,8 @@ import org.apache.calcite.util.trace.CalciteTrace;
import com.google.common.collect.ImmutableList;
import org.slf4j.Logger;
+import java.util.List;
+
public class HashAggPrule extends AggPruleBase {
public static final RelOptRule INSTANCE = new HashAggPrule();
protected static final Logger tracer = CalciteTrace.getPlannerTracer();
@@ -51,7 +55,7 @@ public class HashAggPrule extends AggPruleBase {
return;
}
- final DrillAggregateRel aggregate = (DrillAggregateRel) call.rel(0);
+ final DrillAggregateRel aggregate = call.rel(0);
final RelNode input = call.rel(1);
if (aggregate.containsDistinctCall() || aggregate.getGroupCount() == 0) {
@@ -60,7 +64,7 @@ public class HashAggPrule extends AggPruleBase {
return;
}
- RelTraitSet traits = null;
+ RelTraitSet traits;
try {
if (aggregate.getGroupSet().isEmpty()) {
@@ -125,18 +129,22 @@ public class HashAggPrule extends AggPruleBase {
new HashToRandomExchangePrel(phase1Agg.getCluster(), phase1Agg.getTraitSet().plus(Prel.DRILL_PHYSICAL).plus(distOnAllKeys),
phase1Agg, ImmutableList.copyOf(getDistributionField(aggregate, true)));
- HashAggPrel phase2Agg = new HashAggPrel(
+ ImmutableBitSet newGroupSet = remapGroupSet(aggregate.getGroupSet());
+ List<ImmutableBitSet> newGroupSets = Lists.newArrayList();
+ for (ImmutableBitSet groupSet : aggregate.getGroupSets()) {
+ newGroupSets.add(remapGroupSet(groupSet));
+ }
+
+ return new HashAggPrel(
aggregate.getCluster(),
exch.getTraitSet(),
exch,
aggregate.indicator,
- aggregate.getGroupSet(),
- aggregate.getGroupSets(),
+ newGroupSet,
+ newGroupSets,
phase1Agg.getPhase2AggCalls(),
OperatorPhase.PHASE_2of2);
- return phase2Agg;
}
-
}
private void createTransformRequest(RelOptRuleCall call, DrillAggregateRel aggregate,
http://git-wip-us.apache.org/repos/asf/drill/blob/3c9093e3/exec/java-exec/src/main/java/org/apache/drill/exec/planner/physical/StreamAggPrule.java
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/planner/physical/StreamAggPrule.java b/exec/java-exec/src/main/java/org/apache/drill/exec/planner/physical/StreamAggPrule.java
index a6a8f28..29fa750 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/planner/physical/StreamAggPrule.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/planner/physical/StreamAggPrule.java
@@ -21,6 +21,7 @@ import java.util.List;
import org.apache.calcite.util.BitSets;
+import org.apache.calcite.util.ImmutableBitSet;
import org.apache.drill.exec.planner.logical.DrillAggregateRel;
import org.apache.drill.exec.planner.logical.RelOptHelper;
import org.apache.drill.exec.planner.physical.AggPrelBase.OperatorPhase;
@@ -53,10 +54,10 @@ public class StreamAggPrule extends AggPruleBase {
@Override
public void onMatch(RelOptRuleCall call) {
- final DrillAggregateRel aggregate = (DrillAggregateRel) call.rel(0);
+ final DrillAggregateRel aggregate = call.rel(0);
RelNode input = aggregate.getInput();
final RelCollation collation = getCollation(aggregate);
- RelTraitSet traits = null;
+ RelTraitSet traits;
if (aggregate.containsDistinctCall()) {
// currently, don't use StreamingAggregate if any of the logical aggrs contains DISTINCT
@@ -93,13 +94,19 @@ public class StreamAggPrule extends AggPruleBase {
UnionExchangePrel exch =
new UnionExchangePrel(phase1Agg.getCluster(), singleDistTrait, phase1Agg);
+ ImmutableBitSet newGroupSet = remapGroupSet(aggregate.getGroupSet());
+ List<ImmutableBitSet> newGroupSets = Lists.newArrayList();
+ for (ImmutableBitSet groupSet : aggregate.getGroupSets()) {
+ newGroupSets.add(remapGroupSet(groupSet));
+ }
+
return new StreamAggPrel(
aggregate.getCluster(),
singleDistTrait,
exch,
aggregate.indicator,
- aggregate.getGroupSet(),
- aggregate.getGroupSets(),
+ newGroupSet,
+ newGroupSets,
phase1Agg.getPhase2AggCalls(),
OperatorPhase.PHASE_2of2);
}
@@ -160,13 +167,19 @@ public class StreamAggPrule extends AggPruleBase {
collation,
numEndPoints);
+ ImmutableBitSet newGroupSet = remapGroupSet(aggregate.getGroupSet());
+ List<ImmutableBitSet> newGroupSets = Lists.newArrayList();
+ for (ImmutableBitSet groupSet : aggregate.getGroupSets()) {
+ newGroupSets.add(remapGroupSet(groupSet));
+ }
+
return new StreamAggPrel(
aggregate.getCluster(),
exch.getTraitSet(),
exch,
aggregate.indicator,
- aggregate.getGroupSet(),
- aggregate.getGroupSets(),
+ newGroupSet,
+ newGroupSets,
phase1Agg.getPhase2AggCalls(),
OperatorPhase.PHASE_2of2);
}