You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@impala.apache.org by bh...@apache.org on 2019/08/17 15:13:10 UTC

[impala] 01/02: IMPALA-8790: fix referencing wrong grouping exprs of MultiAggregateInfo

This is an automated email from the ASF dual-hosted git repository.

bharathv pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/impala.git

commit c665fc1e06d53c7b70611ad3993c712c7fe35cb2
Author: stiga-huang <hu...@gmail.com>
AuthorDate: Thu Aug 15 06:01:45 2019 -0700

    IMPALA-8790: fix referencing wrong grouping exprs of MultiAggregateInfo
    
    When creating the single node plan for analytic functions (see
    SingleNodePlanner#createQueryPlan), if the query block contains
    aggregations, the grouping exprs are carried on for optimizations (see
    AnalyticPlanner#computeInputPartitionExprs). This patch fixes a
    regression bug due to IMPALA-110 in this phase.
    
    The pre-IMPALA-110 behavior is carrying the groupingExprs of the
    AggregateInfo (if has). Those exprs are already substituted in
    AggregationNode#init calling from SingleNodePlanner#createSelectPlan.
    Now the behavior is carrying the groupingExprs of the MultiAggregateInfo
    (if has). Those exprs are not substituted in AggregationNode#init.
    Instead, MultiAggregateInfo creates a substituted grouping exprs in this
    step. They are what we actually need. The original grouping exprs may
    use non-materialized slots, which should not be referenced in exchanges.
    
    Also add some useful TRACE logs for future debugging.
    
    Tests
      - Add planner tests to cover the regression bug.
      - Run CORE tests
    
    Change-Id: I11a80bd6d73ea00ad8c644469558a1885706f596
    Reviewed-on: http://gerrit.cloudera.org:8080/14063
    Reviewed-by: Impala Public Jenkins <im...@cloudera.com>
    Tested-by: Impala Public Jenkins <im...@cloudera.com>
---
 .../org/apache/impala/planner/AnalyticPlanner.java | 10 +++
 .../java/org/apache/impala/planner/Planner.java    |  2 +
 .../apache/impala/planner/SingleNodePlanner.java   | 10 ++-
 .../queries/PlannerTest/analytic-fns.test          | 90 ++++++++++++++++++++++
 4 files changed, 109 insertions(+), 3 deletions(-)

diff --git a/fe/src/main/java/org/apache/impala/planner/AnalyticPlanner.java b/fe/src/main/java/org/apache/impala/planner/AnalyticPlanner.java
index 3e58eae..af2a871 100644
--- a/fe/src/main/java/org/apache/impala/planner/AnalyticPlanner.java
+++ b/fe/src/main/java/org/apache/impala/planner/AnalyticPlanner.java
@@ -209,6 +209,13 @@ public class AnalyticPlanner {
       analyzer_.exprIntersect(pg.partitionByExprs, groupingExprs, l1, l2);
       // TODO: also look at l2 and take the max?
       long ndv = Expr.getNumDistinctValues(l1);
+      if (LOG.isTraceEnabled()) {
+        LOG.trace(String.format("Partition group: %s, intersection: %s. " +
+                "GroupingExprs: %s, intersection: %s. ndv: %d, numNodes: %d, maxNdv: %d.",
+            Expr.debugString(pg.partitionByExprs), Expr.debugString(l1),
+            Expr.debugString(groupingExprs), Expr.debugString(l2),
+            ndv, numNodes, maxNdv));
+      }
       if (ndv < 0 || ndv < numNodes || ndv < maxNdv) continue;
       // found a better partition group
       maxPg = pg;
@@ -224,6 +231,9 @@ public class AnalyticPlanner {
       partitionGroups.remove(maxPg);
       partitionGroups.add(0, maxPg);
       inputPartitionExprs.addAll(maxGroupingExprs);
+      if (LOG.isTraceEnabled()) {
+        LOG.trace("Optimized partition exprs: " + Expr.debugString(inputPartitionExprs));
+      }
     }
   }
 
diff --git a/fe/src/main/java/org/apache/impala/planner/Planner.java b/fe/src/main/java/org/apache/impala/planner/Planner.java
index dbcff87..6854f73 100644
--- a/fe/src/main/java/org/apache/impala/planner/Planner.java
+++ b/fe/src/main/java/org/apache/impala/planner/Planner.java
@@ -640,6 +640,8 @@ public class Planner {
     int threshold = ctx_.getQueryOptions().exec_single_node_rows_threshold;
     if (maxRowsProcessed < threshold) {
       // Execute on a single node and disable codegen for small results
+      LOG.trace("Query is small enough to execute on a single node: maxRowsProcessed = "
+          + maxRowsProcessed);
       ctx_.getQueryOptions().setNum_nodes(1);
       ctx_.getQueryCtx().disable_codegen_hint = true;
       if (maxRowsProcessed < ctx_.getQueryOptions().batch_size ||
diff --git a/fe/src/main/java/org/apache/impala/planner/SingleNodePlanner.java b/fe/src/main/java/org/apache/impala/planner/SingleNodePlanner.java
index f6d6302..3265daf 100644
--- a/fe/src/main/java/org/apache/impala/planner/SingleNodePlanner.java
+++ b/fe/src/main/java/org/apache/impala/planner/SingleNodePlanner.java
@@ -266,9 +266,13 @@ public class SingleNodePlanner {
         AnalyticPlanner analyticPlanner =
             new AnalyticPlanner(analyticInfo, analyzer, ctx_);
         MultiAggregateInfo multiAggInfo = selectStmt.getMultiAggInfo();
-        List<Expr> groupingExprs = multiAggInfo != null ?
-            multiAggInfo.getGroupingExprs() :
-            Collections.<Expr>emptyList();
+        List<Expr> groupingExprs;
+        if (multiAggInfo != null) {
+          groupingExprs = multiAggInfo.getSubstGroupingExprs();
+          Preconditions.checkState(groupingExprs != null);
+        } else {
+          groupingExprs = Collections.emptyList();
+        }
         List<Expr> inputPartitionExprs = new ArrayList<>();
         root = analyticPlanner.createSingleNodePlan(
             root, groupingExprs, inputPartitionExprs);
diff --git a/testdata/workloads/functional-planner/queries/PlannerTest/analytic-fns.test b/testdata/workloads/functional-planner/queries/PlannerTest/analytic-fns.test
index a2038e4..b4856f3 100644
--- a/testdata/workloads/functional-planner/queries/PlannerTest/analytic-fns.test
+++ b/testdata/workloads/functional-planner/queries/PlannerTest/analytic-fns.test
@@ -2921,3 +2921,93 @@ PLAN-ROOT SINK
    partitions=4/4 files=4 size=460B
    row-size=0B cardinality=8
 ====
+# Regression test for IMPALA-8790
+# Query block contains analytic functions and aggregations together. The GroupBy clause
+# references to inline view columns.
+select string_col, int_col,
+  rank() over(partition by int_col order by count(bigint_col))
+from (select string_col, int_col, bigint_col from functional.alltypes) w
+group by string_col, int_col
+---- DISTRIBUTEDPLAN
+PLAN-ROOT SINK
+|
+06:EXCHANGE [UNPARTITIONED]
+|
+03:ANALYTIC
+|  functions: rank()
+|  partition by: int_col
+|  order by: count(bigint_col) ASC
+|  window: RANGE BETWEEN UNBOUNDED PRECEDING AND CURRENT ROW
+|  row-size=33B cardinality=100
+|
+02:SORT
+|  order by: int_col ASC NULLS FIRST, count(bigint_col) ASC
+|  row-size=25B cardinality=100
+|
+05:AGGREGATE [FINALIZE]
+|  output: count:merge(bigint_col)
+|  group by: string_col, int_col
+|  row-size=25B cardinality=100
+|
+04:EXCHANGE [HASH(int_col)]
+|
+01:AGGREGATE [STREAMING]
+|  output: count(bigint_col)
+|  group by: string_col, int_col
+|  row-size=25B cardinality=100
+|
+00:SCAN HDFS [functional.alltypes]
+   HDFS partitions=24/24 files=24 size=478.45KB
+   row-size=25B cardinality=7.30K
+====
+# Regression test for IMPALA-8790
+# Query block contains analytic functions and aggregations together. The GroupBy clause
+# references to inline view columns. Coverage for distinct aggregation.
+select string_col, int_col,
+  rank() over(partition by int_col order by count(distinct bigint_col))
+from (select string_col, int_col, bigint_col from functional.alltypes) w
+group by string_col, int_col
+---- DISTRIBUTEDPLAN
+PLAN-ROOT SINK
+|
+10:EXCHANGE [UNPARTITIONED]
+|
+04:ANALYTIC
+|  functions: rank()
+|  partition by: int_col
+|  order by: count(bigint_col) ASC
+|  window: RANGE BETWEEN UNBOUNDED PRECEDING AND CURRENT ROW
+|  row-size=33B cardinality=100
+|
+03:SORT
+|  order by: int_col ASC NULLS FIRST, count(bigint_col) ASC
+|  row-size=25B cardinality=100
+|
+09:EXCHANGE [HASH(int_col)]
+|
+08:AGGREGATE [FINALIZE]
+|  output: count:merge(bigint_col)
+|  group by: string_col, int_col
+|  row-size=25B cardinality=100
+|
+07:EXCHANGE [HASH(string_col,int_col)]
+|
+02:AGGREGATE [STREAMING]
+|  output: count(bigint_col)
+|  group by: string_col, int_col
+|  row-size=25B cardinality=100
+|
+06:AGGREGATE
+|  group by: string_col, int_col, bigint_col
+|  row-size=25B cardinality=1.00K
+|
+05:EXCHANGE [HASH(int_col)]
+|
+01:AGGREGATE [STREAMING]
+|  group by: string_col, int_col, bigint_col
+|  row-size=25B cardinality=1.00K
+|
+00:SCAN HDFS [functional.alltypes]
+   HDFS partitions=24/24 files=24 size=478.45KB
+   row-size=25B cardinality=7.30K
+====