You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@drill.apache.org by jn...@apache.org on 2015/09/22 19:40:46 UTC
drill git commit: DRILL-2748: Improve cost estimation for Drill
logical aggregation in query planner.
Repository: drill
Updated Branches:
refs/heads/master 9f54aac33 -> d37462e59
DRILL-2748: Improve cost estimation for Drill logical aggregation in query planner.
Project: http://git-wip-us.apache.org/repos/asf/drill/repo
Commit: http://git-wip-us.apache.org/repos/asf/drill/commit/d37462e5
Tree: http://git-wip-us.apache.org/repos/asf/drill/tree/d37462e5
Diff: http://git-wip-us.apache.org/repos/asf/drill/diff/d37462e5
Branch: refs/heads/master
Commit: d37462e59c68c570fae834d2a95a1ccfde042105
Parents: 9f54aac
Author: Jinfeng Ni <jn...@apache.org>
Authored: Tue Sep 15 22:05:21 2015 -0700
Committer: Jinfeng Ni <jn...@apache.org>
Committed: Mon Sep 21 22:45:56 2015 -0700
----------------------------------------------------------------------
.../planner/common/DrillAggregateRelBase.java | 61 ++++++++++++----
.../exec/planner/logical/DrillAggregateRel.java | 19 +++++
.../exec/planner/physical/AggPrelBase.java | 3 +-
.../exec/planner/physical/HashAggPrel.java | 32 +-------
.../exec/fn/impl/TestAggregateFunctions.java | 77 ++++++++++++++++++++
5 files changed, 147 insertions(+), 45 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/drill/blob/d37462e5/exec/java-exec/src/main/java/org/apache/drill/exec/planner/common/DrillAggregateRelBase.java
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/planner/common/DrillAggregateRelBase.java b/exec/java-exec/src/main/java/org/apache/drill/exec/planner/common/DrillAggregateRelBase.java
index e0f9e34..ac6be25 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/planner/common/DrillAggregateRelBase.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/planner/common/DrillAggregateRelBase.java
@@ -17,9 +17,12 @@
*/
package org.apache.drill.exec.planner.common;
-import java.util.BitSet;
import java.util.List;
+import org.apache.calcite.rel.metadata.RelMetadataQuery;
+import org.apache.drill.exec.ExecConstants;
+import org.apache.drill.exec.expr.holders.IntHolder;
+import org.apache.drill.exec.planner.cost.DrillCostBase;
import org.apache.drill.exec.planner.cost.DrillCostBase.DrillCostFactory;
import org.apache.calcite.plan.RelOptPlanner;
import org.apache.calcite.rel.core.Aggregate;
@@ -30,6 +33,9 @@ import org.apache.calcite.util.ImmutableBitSet;
import org.apache.calcite.plan.RelOptCluster;
import org.apache.calcite.plan.RelOptCost;
import org.apache.calcite.plan.RelTraitSet;
+import org.apache.drill.exec.planner.logical.DrillAggregateRel;
+import org.apache.drill.exec.planner.physical.PrelUtil;
+import org.pentaho.aggdes.algorithm.impl.Cost;
/**
@@ -42,19 +48,48 @@ public abstract class DrillAggregateRelBase extends Aggregate implements DrillRe
super(cluster, traits, child, indicator, groupSet, groupSets, aggCalls);
}
- @Override
- public RelOptCost computeSelfCost(RelOptPlanner planner) {
- for (AggregateCall aggCall : getAggCallList()) {
- String name = aggCall.getAggregation().getName();
- // For avg, stddev_pop, stddev_samp, var_pop and var_samp, the ReduceAggregatesRule is supposed
- // to convert them to use sum and count. Here, we make the cost of the original functions high
- // enough such that the planner does not choose them and instead chooses the rewritten functions.
- if (name.equals("AVG") || name.equals("STDDEV_POP") || name.equals("STDDEV_SAMP")
- || name.equals("VAR_POP") || name.equals("VAR_SAMP")) {
- return ((DrillCostFactory)planner.getCostFactory()).makeHugeCost();
- }
+
+ /**
+ * Estimate cost of hash agg. Called by DrillAggregateRel.computeSelfCost() and HashAggPrel.computeSelfCost()
+ */
+ protected RelOptCost computeHashAggCost(RelOptPlanner planner) {
+ if(PrelUtil.getSettings(getCluster()).useDefaultCosting()) {
+ return super.computeSelfCost(planner).multiplyBy(.1);
}
- return ((DrillCostFactory)planner.getCostFactory()).makeTinyCost();
+ RelNode child = this.getInput();
+ double inputRows = RelMetadataQuery.getRowCount(child);
+
+ int numGroupByFields = this.getGroupCount();
+ int numAggrFields = this.aggCalls.size();
+ // cpu cost of hashing each grouping key
+ double cpuCost = DrillCostBase.HASH_CPU_COST * numGroupByFields * inputRows;
+ // add cpu cost for computing the aggregate functions
+ cpuCost += DrillCostBase.FUNC_CPU_COST * numAggrFields * inputRows;
+ double diskIOCost = 0; // assume in-memory for now until we enforce operator-level memory constraints
+
+ // TODO: use distinct row count
+ // + hash table template stuff
+ double factor = PrelUtil.getPlannerSettings(planner).getOptions()
+ .getOption(ExecConstants.HASH_AGG_TABLE_FACTOR_KEY).float_val;
+ long fieldWidth = PrelUtil.getPlannerSettings(planner).getOptions()
+ .getOption(ExecConstants.AVERAGE_FIELD_WIDTH_KEY).num_val;
+
+ // table + hashValues + links
+ double memCost =
+ (
+ (fieldWidth * numGroupByFields) +
+ IntHolder.WIDTH +
+ IntHolder.WIDTH
+ ) * inputRows * factor;
+
+ DrillCostFactory costFactory = (DrillCostFactory) planner.getCostFactory();
+ return costFactory.makeCost(inputRows, cpuCost, diskIOCost, 0 /* network cost */, memCost);
+
+ }
+
+ protected RelOptCost computeLogicalAggCost(RelOptPlanner planner) {
+ // Similar to Join cost estimation, use HashAgg cost during the logical planning.
+ return computeHashAggCost(planner);
}
}
http://git-wip-us.apache.org/repos/asf/drill/blob/d37462e5/exec/java-exec/src/main/java/org/apache/drill/exec/planner/logical/DrillAggregateRel.java
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/planner/logical/DrillAggregateRel.java b/exec/java-exec/src/main/java/org/apache/drill/exec/planner/logical/DrillAggregateRel.java
index 5419315..cf5988d 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/planner/logical/DrillAggregateRel.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/planner/logical/DrillAggregateRel.java
@@ -21,6 +21,8 @@ import java.util.BitSet;
import java.util.List;
import org.apache.calcite.linq4j.Ord;
+import org.apache.calcite.plan.RelOptCost;
+import org.apache.calcite.plan.RelOptPlanner;
import org.apache.calcite.util.BitSets;
import org.apache.calcite.util.ImmutableBitSet;
import org.apache.drill.common.expression.ExpressionPosition;
@@ -31,6 +33,7 @@ import org.apache.drill.common.expression.ValueExpressions;
import org.apache.drill.common.logical.data.GroupingAggregate;
import org.apache.drill.common.logical.data.LogicalOperator;
import org.apache.drill.exec.planner.common.DrillAggregateRelBase;
+import org.apache.drill.exec.planner.cost.DrillCostBase;
import org.apache.drill.exec.planner.torel.ConversionContext;
import org.apache.calcite.rel.core.AggregateCall;
import org.apache.calcite.rel.core.Aggregate;
@@ -82,6 +85,22 @@ public class DrillAggregateRel extends DrillAggregateRelBase implements DrillRel
return builder.build();
}
+ @Override
+ public RelOptCost computeSelfCost(RelOptPlanner planner) {
+ for (AggregateCall aggCall : getAggCallList()) {
+ String name = aggCall.getAggregation().getName();
+ // For avg, stddev_pop, stddev_samp, var_pop and var_samp, the ReduceAggregatesRule is supposed
+ // to convert them to use sum and count. Here, we make the cost of the original functions high
+ // enough such that the planner does not choose them and instead chooses the rewritten functions.
+ if (name.equals("AVG") || name.equals("STDDEV_POP") || name.equals("STDDEV_SAMP")
+ || name.equals("VAR_POP") || name.equals("VAR_SAMP")) {
+ return ((DrillCostBase.DrillCostFactory)planner.getCostFactory()).makeHugeCost();
+ }
+ }
+
+ return computeLogicalAggCost(planner);
+ }
+
public static LogicalExpression toDrill(AggregateCall call, List<String> fn, DrillImplementor implementor) {
List<LogicalExpression> args = Lists.newArrayList();
for(Integer i : call.getArgList()) {
http://git-wip-us.apache.org/repos/asf/drill/blob/d37462e5/exec/java-exec/src/main/java/org/apache/drill/exec/planner/physical/AggPrelBase.java
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/planner/physical/AggPrelBase.java b/exec/java-exec/src/main/java/org/apache/drill/exec/planner/physical/AggPrelBase.java
index e14ab24..0e5ea0e 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/planner/physical/AggPrelBase.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/planner/physical/AggPrelBase.java
@@ -28,6 +28,7 @@ import org.apache.drill.common.expression.FunctionCall;
import org.apache.drill.common.expression.LogicalExpression;
import org.apache.drill.common.expression.ValueExpressions;
import org.apache.drill.common.logical.data.NamedExpression;
+import org.apache.drill.exec.planner.common.DrillAggregateRelBase;
import org.apache.drill.exec.planner.physical.visitor.PrelVisitor;
import org.apache.calcite.rel.core.AggregateCall;
import org.apache.calcite.rel.core.Aggregate;
@@ -48,7 +49,7 @@ import java.util.Collections;
import java.util.Iterator;
import java.util.List;
-public abstract class AggPrelBase extends Aggregate implements Prel {
+public abstract class AggPrelBase extends DrillAggregateRelBase implements Prel {
protected static enum OperatorPhase {PHASE_1of1, PHASE_1of2, PHASE_2of2};
http://git-wip-us.apache.org/repos/asf/drill/blob/d37462e5/exec/java-exec/src/main/java/org/apache/drill/exec/planner/physical/HashAggPrel.java
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/planner/physical/HashAggPrel.java b/exec/java-exec/src/main/java/org/apache/drill/exec/planner/physical/HashAggPrel.java
index 6181959..637fb8e 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/planner/physical/HashAggPrel.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/planner/physical/HashAggPrel.java
@@ -66,37 +66,7 @@ public class HashAggPrel extends AggPrelBase implements Prel{
@Override
public RelOptCost computeSelfCost(RelOptPlanner planner) {
- if(PrelUtil.getSettings(getCluster()).useDefaultCosting()) {
- return super.computeSelfCost(planner).multiplyBy(.1);
- }
- RelNode child = this.getInput();
- double inputRows = RelMetadataQuery.getRowCount(child);
-
- int numGroupByFields = this.getGroupCount();
- int numAggrFields = this.aggCalls.size();
- // cpu cost of hashing each grouping key
- double cpuCost = DrillCostBase.HASH_CPU_COST * numGroupByFields * inputRows;
- // add cpu cost for computing the aggregate functions
- cpuCost += DrillCostBase.FUNC_CPU_COST * numAggrFields * inputRows;
- double diskIOCost = 0; // assume in-memory for now until we enforce operator-level memory constraints
-
- // TODO: use distinct row count
- // + hash table template stuff
- double factor = PrelUtil.getPlannerSettings(planner).getOptions()
- .getOption(ExecConstants.HASH_AGG_TABLE_FACTOR_KEY).float_val;
- long fieldWidth = PrelUtil.getPlannerSettings(planner).getOptions()
- .getOption(ExecConstants.AVERAGE_FIELD_WIDTH_KEY).num_val;
-
- // table + hashValues + links
- double memCost =
- (
- (fieldWidth * numGroupByFields) +
- IntHolder.WIDTH +
- IntHolder.WIDTH
- ) * inputRows * factor;
-
- DrillCostFactory costFactory = (DrillCostFactory) planner.getCostFactory();
- return costFactory.makeCost(inputRows, cpuCost, diskIOCost, 0 /* network cost */, memCost);
+ return super.computeHashAggCost(planner);
}
@Override
http://git-wip-us.apache.org/repos/asf/drill/blob/d37462e5/exec/java-exec/src/test/java/org/apache/drill/exec/fn/impl/TestAggregateFunctions.java
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/test/java/org/apache/drill/exec/fn/impl/TestAggregateFunctions.java b/exec/java-exec/src/test/java/org/apache/drill/exec/fn/impl/TestAggregateFunctions.java
index 78ebe0f..c02fc05 100644
--- a/exec/java-exec/src/test/java/org/apache/drill/exec/fn/impl/TestAggregateFunctions.java
+++ b/exec/java-exec/src/test/java/org/apache/drill/exec/fn/impl/TestAggregateFunctions.java
@@ -18,6 +18,7 @@
package org.apache.drill.exec.fn.impl;
import org.apache.drill.BaseTestQuery;
+import org.apache.drill.PlanTestBase;
import org.apache.drill.common.types.TypeProtos;
import org.apache.drill.common.util.TestTools;
import org.junit.Ignore;
@@ -323,4 +324,80 @@ public class TestAggregateFunctions extends BaseTestQuery {
.baselineValues(6l)
.go();
}
+
+ @Test //DRILL-2748
+ public void testPushFilterPastAgg() throws Exception {
+ final String query =
+ " select cnt " +
+ " from (select n_regionkey, count(*) cnt from cp.`tpch/nation.parquet` group by n_regionkey) " +
+ " where n_regionkey = 2 ";
+
+ // Validate the plan
+ final String[] expectedPlan = {"(?s)(StreamAgg|HashAgg).*Filter"};
+ final String[] excludedPatterns = {"(?s)Filter.*(StreamAgg|HashAgg)"};
+ PlanTestBase.testPlanMatchingPatterns(query, expectedPlan, excludedPatterns);
+
+ testBuilder()
+ .sqlQuery(query)
+ .unOrdered()
+ .baselineColumns("cnt")
+ .baselineValues(5l)
+ .build().run();
+
+ // having clause
+ final String query2 =
+ " select count(*) cnt from cp.`tpch/nation.parquet` group by n_regionkey " +
+ " having n_regionkey = 2 ";
+ PlanTestBase.testPlanMatchingPatterns(query2, expectedPlan, excludedPatterns);
+
+ testBuilder()
+ .sqlQuery(query)
+ .unOrdered()
+ .baselineColumns("cnt")
+ .baselineValues(5l)
+ .build().run();
+ }
+
+ @Test
+ public void testPushFilterInExprPastAgg() throws Exception {
+ final String query =
+ " select cnt " +
+ " from (select n_regionkey, count(*) cnt from cp.`tpch/nation.parquet` group by n_regionkey) " +
+ " where n_regionkey + 100 - 100 = 2 ";
+
+ // Validate the plan
+ final String[] expectedPlan = {"(?s)(StreamAgg|HashAgg).*Filter"};
+ final String[] excludedPatterns = {"(?s)Filter.*(StreamAgg|HashAgg)"};
+ PlanTestBase.testPlanMatchingPatterns(query, expectedPlan, excludedPatterns);
+
+ testBuilder()
+ .sqlQuery(query)
+ .unOrdered()
+ .baselineColumns("cnt")
+ .baselineValues(5l)
+ .build().run();
+ }
+
+ @Test
+ public void testNegPushFilterInExprPastAgg() throws Exception {
+ // negative case: should not push filter, since it involves the aggregate result
+ final String query =
+ " select cnt " +
+ " from (select n_regionkey, count(*) cnt from cp.`tpch/nation.parquet` group by n_regionkey) " +
+ " where cnt + 100 - 100 = 5 ";
+
+ // Validate the plan
+ final String[] expectedPlan = {"(?s)Filter(?!StreamAgg|!HashAgg)"};
+ final String[] excludedPatterns = {"(?s)(StreamAgg|HashAgg).*Filter"};
+ PlanTestBase.testPlanMatchingPatterns(query, expectedPlan, excludedPatterns);
+
+ // negative case: should not push filter, since it is expression of group key + agg result.
+ final String query2 =
+ " select cnt " +
+ " from (select n_regionkey, count(*) cnt from cp.`tpch/nation.parquet` group by n_regionkey) " +
+ " where cnt + n_regionkey = 5 ";
+ PlanTestBase.testPlanMatchingPatterns(query2, expectedPlan, excludedPatterns);
+
+ }
+
}