You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@impala.apache.org by kw...@apache.org on 2016/12/06 19:07:33 UTC

[5/8] incubator-impala git commit: IMPALA-3167: Fix assignment of WHERE conjunct through grouping agg + OJ.

IMPALA-3167: Fix assignment of WHERE conjunct through grouping agg + OJ.

Background: We generally allow the assignment of predicates below the
nullable side of a left/right outer join, explained as follows using an
example:

SELECT * FROM t1 LEFT OUTER JOIN t2 ON t1.id = t2.id
WHERE t2.int_col < 10

The scan of 't2' picks up 't2.int_col < 10' via
Analyzer.getBoundPredicates() and recognizes that the predicate must
also be evaluated by a join later, so the predicate is not marked as
assigned. The join then picks up the unassigned predicate via
Analyzer.getUnassignedConjuncts().

The bug was that our logic for detecting whether a bound predicate must
also be evaluated at a join node was flawed because it only considered
whether the tuples of the source or destination predicate were outer
joined (plus other conditions).
The underlying assumption is that either the source or destination tuple
are bound by a tuple produced by a TableRef, but in the buggy query the
source predicate is bound by an aggregation tuple, so we incorrectly
marked the bound predicate as assigned in Analyzer.getBoundPredicates().

The fix is to conservatively not mark bound predicates as assigned if
the slots referenced by the predicate have equivalent slots that
belong to an outer-joined tuple. As a result, a plan node may pick up
the same predicate multiple times, once via
Analyzer.getBoundPredicates() and another time via
Analyzer.getUnassignedConjuncts(). Those are deduped now.

The following example explains the duplicate predicate assignment:

SELECT * FROM (SELECT * FROM t t1) a LEFT OUTER JOIN t b ON a.id = b.id
WHERE a.id < 10

1. The predicate 'a.id < 10' gets migrated into the inline view.
   'a.id < 10' is marked as assigned but is still registered as
   a single-tid conjunct in the Analyzer for potential propagation
2. The scan node of 't1' calls Analyzer.getBoundPredicates() and
   generates 't1.id < 10' based on the source predicate 'a.id < 10'.
3. The scan node of 't1' picks up the migrated conjunct 't1.id < 10'
   via Analyzer.getUnassignedConjuncts().

Change-Id: I774d13a13ad1e8fe82512df98dc29983bdd232eb
Reviewed-on: http://gerrit.cloudera.org:8080/4960
Reviewed-by: Alex Behm <al...@cloudera.com>
Tested-by: Internal Jenkins


Project: http://git-wip-us.apache.org/repos/asf/incubator-impala/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-impala/commit/f8377543
Tree: http://git-wip-us.apache.org/repos/asf/incubator-impala/tree/f8377543
Diff: http://git-wip-us.apache.org/repos/asf/incubator-impala/diff/f8377543

Branch: refs/heads/master
Commit: f8377543778b654336c978a4bb97efa3c1847441
Parents: b656f57
Author: Alex Behm <al...@cloudera.com>
Authored: Fri Nov 4 10:41:25 2016 -0700
Committer: Internal Jenkins <cl...@gerrit.cloudera.org>
Committed: Tue Dec 6 07:24:01 2016 +0000

----------------------------------------------------------------------
 .../org/apache/impala/analysis/Analyzer.java    | 29 ++++++++++++--------
 .../java/org/apache/impala/analysis/Expr.java   | 19 ++-----------
 .../org/apache/impala/analysis/SelectStmt.java  |  2 +-
 .../org/apache/impala/planner/HdfsScanNode.java |  2 +-
 .../impala/planner/SingleNodePlanner.java       |  2 +-
 .../queries/PlannerTest/outer-joins.test        | 29 ++++++++++++++++++++
 6 files changed, 52 insertions(+), 31 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/f8377543/fe/src/main/java/org/apache/impala/analysis/Analyzer.java
----------------------------------------------------------------------
diff --git a/fe/src/main/java/org/apache/impala/analysis/Analyzer.java b/fe/src/main/java/org/apache/impala/analysis/Analyzer.java
index 4819342..6bba436 100644
--- a/fe/src/main/java/org/apache/impala/analysis/Analyzer.java
+++ b/fe/src/main/java/org/apache/impala/analysis/Analyzer.java
@@ -1223,10 +1223,11 @@ public class Analyzer {
   }
 
   /**
-   * Returns true if e must be evaluated by a join node. Note that it may still be
-   * safe to evaluate e elsewhere as well, but in any case the join must evaluate e.
+   * Returns true if 'e' must be evaluated after or by a join node. Note that it may
+   * still be safe to evaluate 'e' elsewhere as well, but in any case 'e' must be
+   * evaluated again by or after a join.
    */
-  public boolean evalByJoin(Expr e) {
+  public boolean evalAfterJoin(Expr e) {
     List<TupleId> tids = Lists.newArrayList();
     e.getIds(tids, null);
     if (tids.isEmpty()) return false;
@@ -1555,18 +1556,22 @@ public class Analyzer {
             }
           }
 
-          // Check if either srcConjunct or the generated predicate needs to be evaluated
-          // at a join node (IMPALA-2018).
-          boolean evalByJoin =
-              (evalByJoin(srcConjunct)
-               && (globalState_.ojClauseByConjunct.get(srcConjunct.getId())
-                != globalState_.outerJoinedTupleIds.get(srcTid)))
-              || (evalByJoin(p)
+          // IMPALA-2018/4379: Check if srcConjunct or the generated predicate need to
+          // be evaluated again at a later point in the plan, e.g., by a join that makes
+          // referenced tuples nullable. The first condition is conservative but takes
+          // into account that On-clause conjuncts can sometimes be legitimately assigned
+          // below their originating join.
+          boolean evalAfterJoin =
+              (hasOuterJoinedTuple && !srcConjunct.isOnClauseConjunct_)
+              || (evalAfterJoin(srcConjunct)
+                  && (globalState_.ojClauseByConjunct.get(srcConjunct.getId())
+                    != globalState_.outerJoinedTupleIds.get(srcTid)))
+              || (evalAfterJoin(p)
                   && (globalState_.ojClauseByConjunct.get(p.getId())
-                   != globalState_.outerJoinedTupleIds.get(destTid)));
+                    != globalState_.outerJoinedTupleIds.get(destTid)));
 
           // mark all bound predicates including duplicate ones
-          if (reverseValueTransfer && !evalByJoin) markConjunctAssigned(srcConjunct);
+          if (reverseValueTransfer && !evalAfterJoin) markConjunctAssigned(srcConjunct);
         }
 
         // check if we already created this predicate

http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/f8377543/fe/src/main/java/org/apache/impala/analysis/Expr.java
----------------------------------------------------------------------
diff --git a/fe/src/main/java/org/apache/impala/analysis/Expr.java b/fe/src/main/java/org/apache/impala/analysis/Expr.java
index 779e252..87a2a12 100644
--- a/fe/src/main/java/org/apache/impala/analysis/Expr.java
+++ b/fe/src/main/java/org/apache/impala/analysis/Expr.java
@@ -851,22 +851,9 @@ abstract public class Expr extends TreeNode<Expr> implements ParseNode, Cloneabl
    */
   public static <C extends Expr> void removeDuplicates(List<C> l) {
     if (l == null) return;
-    ListIterator<C> it1 = l.listIterator();
-    while (it1.hasNext()) {
-      C e1 = it1.next();
-      ListIterator<C> it2 = l.listIterator();
-      boolean duplicate = false;
-      while (it2.hasNext()) {
-        C e2 = it2.next();
-          // only check up to but excluding e1
-        if (e1 == e2) break;
-        if (e1.equals(e2)) {
-          duplicate = true;
-          break;
-        }
-      }
-      if (duplicate) it1.remove();
-    }
+    List<C> origList = Lists.newArrayList(l);
+    l.clear();
+    for (C expr: origList) if (!l.contains(expr)) l.add(expr);
   }
 
   /**

http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/f8377543/fe/src/main/java/org/apache/impala/analysis/SelectStmt.java
----------------------------------------------------------------------
diff --git a/fe/src/main/java/org/apache/impala/analysis/SelectStmt.java b/fe/src/main/java/org/apache/impala/analysis/SelectStmt.java
index 80ffde5..74399d0 100644
--- a/fe/src/main/java/org/apache/impala/analysis/SelectStmt.java
+++ b/fe/src/main/java/org/apache/impala/analysis/SelectStmt.java
@@ -333,7 +333,7 @@ public class SelectStmt extends QueryStmt {
         analyzer.getUnassignedConjuncts(getTableRefIds(), true);
     List<Expr> unassignedJoinConjuncts = Lists.newArrayList();
     for (Expr e: unassigned) {
-      if (analyzer.evalByJoin(e)) unassignedJoinConjuncts.add(e);
+      if (analyzer.evalAfterJoin(e)) unassignedJoinConjuncts.add(e);
     }
     List<Expr> baseTblJoinConjuncts =
         Expr.substituteList(unassignedJoinConjuncts, baseTblSmap_, analyzer, false);

http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/f8377543/fe/src/main/java/org/apache/impala/planner/HdfsScanNode.java
----------------------------------------------------------------------
diff --git a/fe/src/main/java/org/apache/impala/planner/HdfsScanNode.java b/fe/src/main/java/org/apache/impala/planner/HdfsScanNode.java
index 9642b97..d2eff19 100644
--- a/fe/src/main/java/org/apache/impala/planner/HdfsScanNode.java
+++ b/fe/src/main/java/org/apache/impala/planner/HdfsScanNode.java
@@ -292,7 +292,7 @@ public class HdfsScanNode extends ScanNode {
       // Mark those conjuncts as assigned that do not also need to be evaluated by a
       // subsequent semi or outer join.
       for (Expr conjunct: collectionConjuncts) {
-        if (!analyzer.evalByJoin(conjunct)) analyzer.markConjunctAssigned(conjunct);
+        if (!analyzer.evalAfterJoin(conjunct)) analyzer.markConjunctAssigned(conjunct);
       }
       if (!collectionConjuncts.isEmpty()) {
         analyzer.materializeSlots(collectionConjuncts);

http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/f8377543/fe/src/main/java/org/apache/impala/planner/SingleNodePlanner.java
----------------------------------------------------------------------
diff --git a/fe/src/main/java/org/apache/impala/planner/SingleNodePlanner.java b/fe/src/main/java/org/apache/impala/planner/SingleNodePlanner.java
index 4bc8a88..3687a17 100644
--- a/fe/src/main/java/org/apache/impala/planner/SingleNodePlanner.java
+++ b/fe/src/main/java/org/apache/impala/planner/SingleNodePlanner.java
@@ -1195,7 +1195,6 @@ public class SingleNodePlanner {
    */
   private PlanNode createHdfsScanPlan(TableRef hdfsTblRef, boolean fastPartitionKeyScans,
       Analyzer analyzer) throws ImpalaException {
-    HdfsTable hdfsTable = (HdfsTable)hdfsTblRef.getTable();
     TupleDescriptor tupleDesc = hdfsTblRef.getDesc();
 
     // Get all predicates bound by the tuple.
@@ -1208,6 +1207,7 @@ public class SingleNodePlanner {
     analyzer.markConjunctsAssigned(unassigned);
 
     analyzer.createEquivConjuncts(tupleDesc.getId(), conjuncts);
+    Expr.removeDuplicates(conjuncts);
 
     // Do partition pruning before deciding which slots to materialize,
     // We might end up removing some predicates.

http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/f8377543/testdata/workloads/functional-planner/queries/PlannerTest/outer-joins.test
----------------------------------------------------------------------
diff --git a/testdata/workloads/functional-planner/queries/PlannerTest/outer-joins.test b/testdata/workloads/functional-planner/queries/PlannerTest/outer-joins.test
index 2d5d6cd..95d16f8 100644
--- a/testdata/workloads/functional-planner/queries/PlannerTest/outer-joins.test
+++ b/testdata/workloads/functional-planner/queries/PlannerTest/outer-joins.test
@@ -919,3 +919,32 @@ PLAN-ROOT SINK
    partitions=24/24 files=24 size=478.45KB
    runtime filters: RF000 -> b.int_col
 ====
+# IMPALA-3167: Test correct assignment of a WHERE-clause predicate through an inline view
+# that has a grouping aggregation and an outer join. The predicate can be assigned at the
+# scan on the nullable side of the outer join, but it must also be evaluated after the join.
+select v2.id, v2.s
+from (select v1.id, sum(bigint_col) s
+      from functional.alltypes t1
+      left outer join (select t2.int_col, t2.id
+                       from functional.alltypessmall t2) v1
+      on t1.int_col = v1.int_col
+      group by v1.id) v2
+where v2.id < 10
+---- PLAN
+PLAN-ROOT SINK
+|
+03:AGGREGATE [FINALIZE]
+|  output: sum(bigint_col)
+|  group by: t2.id
+|  having: v1.id < 10
+|
+02:HASH JOIN [LEFT OUTER JOIN]
+|  hash predicates: t1.int_col = t2.int_col
+|
+|--01:SCAN HDFS [functional.alltypessmall t2]
+|     partitions=4/4 files=4 size=6.32KB
+|     predicates: t2.id < 10
+|
+00:SCAN HDFS [functional.alltypes t1]
+   partitions=24/24 files=24 size=478.45KB
+====