You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@impala.apache.org by bh...@apache.org on 2019/08/17 15:13:10 UTC
[impala] 01/02: IMPALA-8790: fix referencing wrong grouping exprs
of MultiAggregateInfo
This is an automated email from the ASF dual-hosted git repository.
bharathv pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/impala.git
commit c665fc1e06d53c7b70611ad3993c712c7fe35cb2
Author: stiga-huang <hu...@gmail.com>
AuthorDate: Thu Aug 15 06:01:45 2019 -0700
IMPALA-8790: fix referencing wrong grouping exprs of MultiAggregateInfo
When creating the single node plan for analytic functions (see
SingleNodePlanner#createQueryPlan), if the query block contains
aggregations, the grouping exprs are carried on for optimizations (see
AnalyticPlanner#computeInputPartitionExprs). This patch fixes a
regression bug due to IMPALA-110 in this phase.
The pre-IMPALA-110 behavior is carrying the groupingExprs of the
AggregateInfo (if has). Those exprs are already substituted in
AggregationNode#init calling from SingleNodePlanner#createSelectPlan.
Now the behavior is carrying the groupingExprs of the MultiAggregateInfo
(if has). Those exprs are not substituted in AggregationNode#init.
Instead, MultiAggregateInfo creates a substituted grouping exprs in this
step. They are what we actually need. The original grouping exprs may
use non-materialized slots, which should not be referenced in exchanges.
Also add some useful TRACE logs for future debugging.
Tests
- Add planner tests to cover the regression bug.
- Run CORE tests
Change-Id: I11a80bd6d73ea00ad8c644469558a1885706f596
Reviewed-on: http://gerrit.cloudera.org:8080/14063
Reviewed-by: Impala Public Jenkins <im...@cloudera.com>
Tested-by: Impala Public Jenkins <im...@cloudera.com>
---
.../org/apache/impala/planner/AnalyticPlanner.java | 10 +++
.../java/org/apache/impala/planner/Planner.java | 2 +
.../apache/impala/planner/SingleNodePlanner.java | 10 ++-
.../queries/PlannerTest/analytic-fns.test | 90 ++++++++++++++++++++++
4 files changed, 109 insertions(+), 3 deletions(-)
diff --git a/fe/src/main/java/org/apache/impala/planner/AnalyticPlanner.java b/fe/src/main/java/org/apache/impala/planner/AnalyticPlanner.java
index 3e58eae..af2a871 100644
--- a/fe/src/main/java/org/apache/impala/planner/AnalyticPlanner.java
+++ b/fe/src/main/java/org/apache/impala/planner/AnalyticPlanner.java
@@ -209,6 +209,13 @@ public class AnalyticPlanner {
analyzer_.exprIntersect(pg.partitionByExprs, groupingExprs, l1, l2);
// TODO: also look at l2 and take the max?
long ndv = Expr.getNumDistinctValues(l1);
+ if (LOG.isTraceEnabled()) {
+ LOG.trace(String.format("Partition group: %s, intersection: %s. " +
+ "GroupingExprs: %s, intersection: %s. ndv: %d, numNodes: %d, maxNdv: %d.",
+ Expr.debugString(pg.partitionByExprs), Expr.debugString(l1),
+ Expr.debugString(groupingExprs), Expr.debugString(l2),
+ ndv, numNodes, maxNdv));
+ }
if (ndv < 0 || ndv < numNodes || ndv < maxNdv) continue;
// found a better partition group
maxPg = pg;
@@ -224,6 +231,9 @@ public class AnalyticPlanner {
partitionGroups.remove(maxPg);
partitionGroups.add(0, maxPg);
inputPartitionExprs.addAll(maxGroupingExprs);
+ if (LOG.isTraceEnabled()) {
+ LOG.trace("Optimized partition exprs: " + Expr.debugString(inputPartitionExprs));
+ }
}
}
diff --git a/fe/src/main/java/org/apache/impala/planner/Planner.java b/fe/src/main/java/org/apache/impala/planner/Planner.java
index dbcff87..6854f73 100644
--- a/fe/src/main/java/org/apache/impala/planner/Planner.java
+++ b/fe/src/main/java/org/apache/impala/planner/Planner.java
@@ -640,6 +640,8 @@ public class Planner {
int threshold = ctx_.getQueryOptions().exec_single_node_rows_threshold;
if (maxRowsProcessed < threshold) {
// Execute on a single node and disable codegen for small results
+ LOG.trace("Query is small enough to execute on a single node: maxRowsProcessed = "
+ + maxRowsProcessed);
ctx_.getQueryOptions().setNum_nodes(1);
ctx_.getQueryCtx().disable_codegen_hint = true;
if (maxRowsProcessed < ctx_.getQueryOptions().batch_size ||
diff --git a/fe/src/main/java/org/apache/impala/planner/SingleNodePlanner.java b/fe/src/main/java/org/apache/impala/planner/SingleNodePlanner.java
index f6d6302..3265daf 100644
--- a/fe/src/main/java/org/apache/impala/planner/SingleNodePlanner.java
+++ b/fe/src/main/java/org/apache/impala/planner/SingleNodePlanner.java
@@ -266,9 +266,13 @@ public class SingleNodePlanner {
AnalyticPlanner analyticPlanner =
new AnalyticPlanner(analyticInfo, analyzer, ctx_);
MultiAggregateInfo multiAggInfo = selectStmt.getMultiAggInfo();
- List<Expr> groupingExprs = multiAggInfo != null ?
- multiAggInfo.getGroupingExprs() :
- Collections.<Expr>emptyList();
+ List<Expr> groupingExprs;
+ if (multiAggInfo != null) {
+ groupingExprs = multiAggInfo.getSubstGroupingExprs();
+ Preconditions.checkState(groupingExprs != null);
+ } else {
+ groupingExprs = Collections.emptyList();
+ }
List<Expr> inputPartitionExprs = new ArrayList<>();
root = analyticPlanner.createSingleNodePlan(
root, groupingExprs, inputPartitionExprs);
diff --git a/testdata/workloads/functional-planner/queries/PlannerTest/analytic-fns.test b/testdata/workloads/functional-planner/queries/PlannerTest/analytic-fns.test
index a2038e4..b4856f3 100644
--- a/testdata/workloads/functional-planner/queries/PlannerTest/analytic-fns.test
+++ b/testdata/workloads/functional-planner/queries/PlannerTest/analytic-fns.test
@@ -2921,3 +2921,93 @@ PLAN-ROOT SINK
partitions=4/4 files=4 size=460B
row-size=0B cardinality=8
====
+# Regression test for IMPALA-8790
+# Query block contains analytic functions and aggregations together. The GroupBy clause
+# references to inline view columns.
+select string_col, int_col,
+ rank() over(partition by int_col order by count(bigint_col))
+from (select string_col, int_col, bigint_col from functional.alltypes) w
+group by string_col, int_col
+---- DISTRIBUTEDPLAN
+PLAN-ROOT SINK
+|
+06:EXCHANGE [UNPARTITIONED]
+|
+03:ANALYTIC
+| functions: rank()
+| partition by: int_col
+| order by: count(bigint_col) ASC
+| window: RANGE BETWEEN UNBOUNDED PRECEDING AND CURRENT ROW
+| row-size=33B cardinality=100
+|
+02:SORT
+| order by: int_col ASC NULLS FIRST, count(bigint_col) ASC
+| row-size=25B cardinality=100
+|
+05:AGGREGATE [FINALIZE]
+| output: count:merge(bigint_col)
+| group by: string_col, int_col
+| row-size=25B cardinality=100
+|
+04:EXCHANGE [HASH(int_col)]
+|
+01:AGGREGATE [STREAMING]
+| output: count(bigint_col)
+| group by: string_col, int_col
+| row-size=25B cardinality=100
+|
+00:SCAN HDFS [functional.alltypes]
+ HDFS partitions=24/24 files=24 size=478.45KB
+ row-size=25B cardinality=7.30K
+====
+# Regression test for IMPALA-8790
+# Query block contains analytic functions and aggregations together. The GroupBy clause
+# references to inline view columns. Coverage for distinct aggregation.
+select string_col, int_col,
+ rank() over(partition by int_col order by count(distinct bigint_col))
+from (select string_col, int_col, bigint_col from functional.alltypes) w
+group by string_col, int_col
+---- DISTRIBUTEDPLAN
+PLAN-ROOT SINK
+|
+10:EXCHANGE [UNPARTITIONED]
+|
+04:ANALYTIC
+| functions: rank()
+| partition by: int_col
+| order by: count(bigint_col) ASC
+| window: RANGE BETWEEN UNBOUNDED PRECEDING AND CURRENT ROW
+| row-size=33B cardinality=100
+|
+03:SORT
+| order by: int_col ASC NULLS FIRST, count(bigint_col) ASC
+| row-size=25B cardinality=100
+|
+09:EXCHANGE [HASH(int_col)]
+|
+08:AGGREGATE [FINALIZE]
+| output: count:merge(bigint_col)
+| group by: string_col, int_col
+| row-size=25B cardinality=100
+|
+07:EXCHANGE [HASH(string_col,int_col)]
+|
+02:AGGREGATE [STREAMING]
+| output: count(bigint_col)
+| group by: string_col, int_col
+| row-size=25B cardinality=100
+|
+06:AGGREGATE
+| group by: string_col, int_col, bigint_col
+| row-size=25B cardinality=1.00K
+|
+05:EXCHANGE [HASH(int_col)]
+|
+01:AGGREGATE [STREAMING]
+| group by: string_col, int_col, bigint_col
+| row-size=25B cardinality=1.00K
+|
+00:SCAN HDFS [functional.alltypes]
+ HDFS partitions=24/24 files=24 size=478.45KB
+ row-size=25B cardinality=7.30K
+====