You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@drill.apache.org by jn...@apache.org on 2015/09/22 19:40:46 UTC

drill git commit: DRILL-2748: Improve cost estimation for Drill logical aggregation in query planner.

Repository: drill
Updated Branches:
  refs/heads/master 9f54aac33 -> d37462e59


DRILL-2748: Improve cost estimation for Drill logical aggregation in query planner.


Project: http://git-wip-us.apache.org/repos/asf/drill/repo
Commit: http://git-wip-us.apache.org/repos/asf/drill/commit/d37462e5
Tree: http://git-wip-us.apache.org/repos/asf/drill/tree/d37462e5
Diff: http://git-wip-us.apache.org/repos/asf/drill/diff/d37462e5

Branch: refs/heads/master
Commit: d37462e59c68c570fae834d2a95a1ccfde042105
Parents: 9f54aac
Author: Jinfeng Ni <jn...@apache.org>
Authored: Tue Sep 15 22:05:21 2015 -0700
Committer: Jinfeng Ni <jn...@apache.org>
Committed: Mon Sep 21 22:45:56 2015 -0700

----------------------------------------------------------------------
 .../planner/common/DrillAggregateRelBase.java   | 61 ++++++++++++----
 .../exec/planner/logical/DrillAggregateRel.java | 19 +++++
 .../exec/planner/physical/AggPrelBase.java      |  3 +-
 .../exec/planner/physical/HashAggPrel.java      | 32 +-------
 .../exec/fn/impl/TestAggregateFunctions.java    | 77 ++++++++++++++++++++
 5 files changed, 147 insertions(+), 45 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/drill/blob/d37462e5/exec/java-exec/src/main/java/org/apache/drill/exec/planner/common/DrillAggregateRelBase.java
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/planner/common/DrillAggregateRelBase.java b/exec/java-exec/src/main/java/org/apache/drill/exec/planner/common/DrillAggregateRelBase.java
index e0f9e34..ac6be25 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/planner/common/DrillAggregateRelBase.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/planner/common/DrillAggregateRelBase.java
@@ -17,9 +17,12 @@
  */
 package org.apache.drill.exec.planner.common;
 
-import java.util.BitSet;
 import java.util.List;
 
+import org.apache.calcite.rel.metadata.RelMetadataQuery;
+import org.apache.drill.exec.ExecConstants;
+import org.apache.drill.exec.expr.holders.IntHolder;
+import org.apache.drill.exec.planner.cost.DrillCostBase;
 import org.apache.drill.exec.planner.cost.DrillCostBase.DrillCostFactory;
 import org.apache.calcite.plan.RelOptPlanner;
 import org.apache.calcite.rel.core.Aggregate;
@@ -30,6 +33,9 @@ import org.apache.calcite.util.ImmutableBitSet;
 import org.apache.calcite.plan.RelOptCluster;
 import org.apache.calcite.plan.RelOptCost;
 import org.apache.calcite.plan.RelTraitSet;
+import org.apache.drill.exec.planner.logical.DrillAggregateRel;
+import org.apache.drill.exec.planner.physical.PrelUtil;
+import org.pentaho.aggdes.algorithm.impl.Cost;
 
 
 /**
@@ -42,19 +48,48 @@ public abstract class DrillAggregateRelBase extends Aggregate implements DrillRe
     super(cluster, traits, child, indicator, groupSet, groupSets, aggCalls);
   }
 
-  @Override
-  public RelOptCost computeSelfCost(RelOptPlanner planner) {
-    for (AggregateCall aggCall : getAggCallList()) {
-      String name = aggCall.getAggregation().getName();
-      // For avg, stddev_pop, stddev_samp, var_pop and var_samp, the ReduceAggregatesRule is supposed
-      // to convert them to use sum and count. Here, we make the cost of the original functions high
-      // enough such that the planner does not choose them and instead chooses the rewritten functions.
-      if (name.equals("AVG") || name.equals("STDDEV_POP") || name.equals("STDDEV_SAMP")
-          || name.equals("VAR_POP") || name.equals("VAR_SAMP")) {
-        return ((DrillCostFactory)planner.getCostFactory()).makeHugeCost();
-      }
+
+  /**
+   * Estimate cost of hash agg. Called by DrillAggregateRel.computeSelfCost() and HashAggPrel.computeSelfCost()
+  */
+  protected RelOptCost computeHashAggCost(RelOptPlanner planner) {
+    if(PrelUtil.getSettings(getCluster()).useDefaultCosting()) {
+      return super.computeSelfCost(planner).multiplyBy(.1);
     }
-    return ((DrillCostFactory)planner.getCostFactory()).makeTinyCost();
+    RelNode child = this.getInput();
+    double inputRows = RelMetadataQuery.getRowCount(child);
+
+    int numGroupByFields = this.getGroupCount();
+    int numAggrFields = this.aggCalls.size();
+    // cpu cost of hashing each grouping key
+    double cpuCost = DrillCostBase.HASH_CPU_COST * numGroupByFields * inputRows;
+    // add cpu cost for computing the aggregate functions
+    cpuCost += DrillCostBase.FUNC_CPU_COST * numAggrFields * inputRows;
+    double diskIOCost = 0; // assume in-memory for now until we enforce operator-level memory constraints
+
+    // TODO: use distinct row count
+    // + hash table template stuff
+    double factor = PrelUtil.getPlannerSettings(planner).getOptions()
+        .getOption(ExecConstants.HASH_AGG_TABLE_FACTOR_KEY).float_val;
+    long fieldWidth = PrelUtil.getPlannerSettings(planner).getOptions()
+        .getOption(ExecConstants.AVERAGE_FIELD_WIDTH_KEY).num_val;
+
+    // table + hashValues + links
+    double memCost =
+        (
+            (fieldWidth * numGroupByFields) +
+                IntHolder.WIDTH +
+                IntHolder.WIDTH
+        ) * inputRows * factor;
+
+    DrillCostFactory costFactory = (DrillCostFactory) planner.getCostFactory();
+    return costFactory.makeCost(inputRows, cpuCost, diskIOCost, 0 /* network cost */, memCost);
+
+  }
+
+  protected RelOptCost computeLogicalAggCost(RelOptPlanner planner) {
+    // Similar to Join cost estimation, use HashAgg cost during the logical planning.
+    return computeHashAggCost(planner);
   }
 
 }

http://git-wip-us.apache.org/repos/asf/drill/blob/d37462e5/exec/java-exec/src/main/java/org/apache/drill/exec/planner/logical/DrillAggregateRel.java
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/planner/logical/DrillAggregateRel.java b/exec/java-exec/src/main/java/org/apache/drill/exec/planner/logical/DrillAggregateRel.java
index 5419315..cf5988d 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/planner/logical/DrillAggregateRel.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/planner/logical/DrillAggregateRel.java
@@ -21,6 +21,8 @@ import java.util.BitSet;
 import java.util.List;
 
 import org.apache.calcite.linq4j.Ord;
+import org.apache.calcite.plan.RelOptCost;
+import org.apache.calcite.plan.RelOptPlanner;
 import org.apache.calcite.util.BitSets;
 import org.apache.calcite.util.ImmutableBitSet;
 import org.apache.drill.common.expression.ExpressionPosition;
@@ -31,6 +33,7 @@ import org.apache.drill.common.expression.ValueExpressions;
 import org.apache.drill.common.logical.data.GroupingAggregate;
 import org.apache.drill.common.logical.data.LogicalOperator;
 import org.apache.drill.exec.planner.common.DrillAggregateRelBase;
+import org.apache.drill.exec.planner.cost.DrillCostBase;
 import org.apache.drill.exec.planner.torel.ConversionContext;
 import org.apache.calcite.rel.core.AggregateCall;
 import org.apache.calcite.rel.core.Aggregate;
@@ -82,6 +85,22 @@ public class DrillAggregateRel extends DrillAggregateRelBase implements DrillRel
     return builder.build();
   }
 
+  @Override
+  public RelOptCost computeSelfCost(RelOptPlanner planner) {
+    for (AggregateCall aggCall : getAggCallList()) {
+      String name = aggCall.getAggregation().getName();
+      // For avg, stddev_pop, stddev_samp, var_pop and var_samp, the ReduceAggregatesRule is supposed
+      // to convert them to use sum and count. Here, we make the cost of the original functions high
+      // enough such that the planner does not choose them and instead chooses the rewritten functions.
+      if (name.equals("AVG") || name.equals("STDDEV_POP") || name.equals("STDDEV_SAMP")
+          || name.equals("VAR_POP") || name.equals("VAR_SAMP")) {
+        return ((DrillCostBase.DrillCostFactory)planner.getCostFactory()).makeHugeCost();
+      }
+    }
+
+    return computeLogicalAggCost(planner);
+  }
+
   public static LogicalExpression toDrill(AggregateCall call, List<String> fn, DrillImplementor implementor) {
     List<LogicalExpression> args = Lists.newArrayList();
     for(Integer i : call.getArgList()) {

http://git-wip-us.apache.org/repos/asf/drill/blob/d37462e5/exec/java-exec/src/main/java/org/apache/drill/exec/planner/physical/AggPrelBase.java
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/planner/physical/AggPrelBase.java b/exec/java-exec/src/main/java/org/apache/drill/exec/planner/physical/AggPrelBase.java
index e14ab24..0e5ea0e 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/planner/physical/AggPrelBase.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/planner/physical/AggPrelBase.java
@@ -28,6 +28,7 @@ import org.apache.drill.common.expression.FunctionCall;
 import org.apache.drill.common.expression.LogicalExpression;
 import org.apache.drill.common.expression.ValueExpressions;
 import org.apache.drill.common.logical.data.NamedExpression;
+import org.apache.drill.exec.planner.common.DrillAggregateRelBase;
 import org.apache.drill.exec.planner.physical.visitor.PrelVisitor;
 import org.apache.calcite.rel.core.AggregateCall;
 import org.apache.calcite.rel.core.Aggregate;
@@ -48,7 +49,7 @@ import java.util.Collections;
 import java.util.Iterator;
 import java.util.List;
 
-public abstract class AggPrelBase extends Aggregate implements Prel {
+public abstract class AggPrelBase extends DrillAggregateRelBase implements Prel {
 
   protected static enum OperatorPhase {PHASE_1of1, PHASE_1of2, PHASE_2of2};
 

http://git-wip-us.apache.org/repos/asf/drill/blob/d37462e5/exec/java-exec/src/main/java/org/apache/drill/exec/planner/physical/HashAggPrel.java
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/planner/physical/HashAggPrel.java b/exec/java-exec/src/main/java/org/apache/drill/exec/planner/physical/HashAggPrel.java
index 6181959..637fb8e 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/planner/physical/HashAggPrel.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/planner/physical/HashAggPrel.java
@@ -66,37 +66,7 @@ public class HashAggPrel extends AggPrelBase implements Prel{
 
   @Override
   public RelOptCost computeSelfCost(RelOptPlanner planner) {
-    if(PrelUtil.getSettings(getCluster()).useDefaultCosting()) {
-      return super.computeSelfCost(planner).multiplyBy(.1);
-    }
-    RelNode child = this.getInput();
-    double inputRows = RelMetadataQuery.getRowCount(child);
-
-    int numGroupByFields = this.getGroupCount();
-    int numAggrFields = this.aggCalls.size();
-    // cpu cost of hashing each grouping key
-    double cpuCost = DrillCostBase.HASH_CPU_COST * numGroupByFields * inputRows;
-    // add cpu cost for computing the aggregate functions
-    cpuCost += DrillCostBase.FUNC_CPU_COST * numAggrFields * inputRows;
-    double diskIOCost = 0; // assume in-memory for now until we enforce operator-level memory constraints
-
-    // TODO: use distinct row count
-    // + hash table template stuff
-    double factor = PrelUtil.getPlannerSettings(planner).getOptions()
-      .getOption(ExecConstants.HASH_AGG_TABLE_FACTOR_KEY).float_val;
-    long fieldWidth = PrelUtil.getPlannerSettings(planner).getOptions()
-      .getOption(ExecConstants.AVERAGE_FIELD_WIDTH_KEY).num_val;
-
-    // table + hashValues + links
-    double memCost =
-      (
-        (fieldWidth * numGroupByFields) +
-          IntHolder.WIDTH +
-          IntHolder.WIDTH
-      ) * inputRows * factor;
-
-    DrillCostFactory costFactory = (DrillCostFactory) planner.getCostFactory();
-    return costFactory.makeCost(inputRows, cpuCost, diskIOCost, 0 /* network cost */, memCost);
+    return super.computeHashAggCost(planner);
   }
 
   @Override

http://git-wip-us.apache.org/repos/asf/drill/blob/d37462e5/exec/java-exec/src/test/java/org/apache/drill/exec/fn/impl/TestAggregateFunctions.java
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/test/java/org/apache/drill/exec/fn/impl/TestAggregateFunctions.java b/exec/java-exec/src/test/java/org/apache/drill/exec/fn/impl/TestAggregateFunctions.java
index 78ebe0f..c02fc05 100644
--- a/exec/java-exec/src/test/java/org/apache/drill/exec/fn/impl/TestAggregateFunctions.java
+++ b/exec/java-exec/src/test/java/org/apache/drill/exec/fn/impl/TestAggregateFunctions.java
@@ -18,6 +18,7 @@
 package org.apache.drill.exec.fn.impl;
 
 import org.apache.drill.BaseTestQuery;
+import org.apache.drill.PlanTestBase;
 import org.apache.drill.common.types.TypeProtos;
 import org.apache.drill.common.util.TestTools;
 import org.junit.Ignore;
@@ -323,4 +324,80 @@ public class TestAggregateFunctions extends BaseTestQuery {
         .baselineValues(6l)
         .go();
   }
+
+  @Test //DRILL-2748
+  public void testPushFilterPastAgg() throws Exception {
+    final String query =
+        " select cnt " +
+        " from (select n_regionkey, count(*) cnt from cp.`tpch/nation.parquet` group by n_regionkey) " +
+        " where n_regionkey = 2 ";
+
+    // Validate the plan
+    final String[] expectedPlan = {"(?s)(StreamAgg|HashAgg).*Filter"};
+    final String[] excludedPatterns = {"(?s)Filter.*(StreamAgg|HashAgg)"};
+    PlanTestBase.testPlanMatchingPatterns(query, expectedPlan, excludedPatterns);
+
+    testBuilder()
+        .sqlQuery(query)
+        .unOrdered()
+        .baselineColumns("cnt")
+        .baselineValues(5l)
+        .build().run();
+
+    // having clause
+    final String query2 =
+        " select count(*) cnt from cp.`tpch/nation.parquet` group by n_regionkey " +
+        " having n_regionkey = 2 ";
+    PlanTestBase.testPlanMatchingPatterns(query2, expectedPlan, excludedPatterns);
+
+    testBuilder()
+        .sqlQuery(query)
+        .unOrdered()
+        .baselineColumns("cnt")
+        .baselineValues(5l)
+        .build().run();
+  }
+
+  @Test
+  public void testPushFilterInExprPastAgg() throws Exception {
+    final String query =
+        " select cnt " +
+            " from (select n_regionkey, count(*) cnt from cp.`tpch/nation.parquet` group by n_regionkey) " +
+            " where n_regionkey + 100 - 100 = 2 ";
+
+    // Validate the plan
+    final String[] expectedPlan = {"(?s)(StreamAgg|HashAgg).*Filter"};
+    final String[] excludedPatterns = {"(?s)Filter.*(StreamAgg|HashAgg)"};
+    PlanTestBase.testPlanMatchingPatterns(query, expectedPlan, excludedPatterns);
+
+    testBuilder()
+        .sqlQuery(query)
+        .unOrdered()
+        .baselineColumns("cnt")
+        .baselineValues(5l)
+        .build().run();
+  }
+
+  @Test
+  public void testNegPushFilterInExprPastAgg() throws Exception {
+    // negative case: should not push filter, since it involves the aggregate result
+    final String query =
+        " select cnt " +
+            " from (select n_regionkey, count(*) cnt from cp.`tpch/nation.parquet` group by n_regionkey) " +
+            " where cnt + 100 - 100 = 5 ";
+
+    // Validate the plan
+    final String[] expectedPlan = {"(?s)Filter(?!StreamAgg|!HashAgg)"};
+    final String[] excludedPatterns = {"(?s)(StreamAgg|HashAgg).*Filter"};
+    PlanTestBase.testPlanMatchingPatterns(query, expectedPlan, excludedPatterns);
+
+    // negative case: should not push filter, since it is expression of group key + agg result.
+    final String query2 =
+        " select cnt " +
+            " from (select n_regionkey, count(*) cnt from cp.`tpch/nation.parquet` group by n_regionkey) " +
+            " where cnt + n_regionkey = 5 ";
+    PlanTestBase.testPlanMatchingPatterns(query2, expectedPlan, excludedPatterns);
+
+  }
+
 }