You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@impala.apache.org by ab...@apache.org on 2017/12/08 23:28:01 UTC

[2/2] impala git commit: IMPALA-6286: Remove invalid runtime filter targets.

IMPALA-6286: Remove invalid runtime filter targets.

If the target expression of a runtime filter evaluates to a
non-NULL value for outer-join non-matches, then assigning
the filter below the nullable side of an outer join may
lead to incorrect query results.
See IMPALA-6286 for an example and explanation.

This patch adds a conservative check that prevents the
creation of runtime filters that could potentially
have such incorrect targets. Some safe opportunities
are deliberately missed to keep the code simple.
See RuntimeFilterGenerator#getTargetSlots().

Testing:
- added planner tests which passed locally

Change-Id: I88153eea9f4b5117df60366fad2bd91776b95298
Reviewed-on: http://gerrit.cloudera.org:8080/8783
Reviewed-by: Alex Behm <al...@cloudera.com>
Tested-by: Impala Public Jenkins


Project: http://git-wip-us.apache.org/repos/asf/impala/repo
Commit: http://git-wip-us.apache.org/repos/asf/impala/commit/11497c2a
Tree: http://git-wip-us.apache.org/repos/asf/impala/tree/11497c2a
Diff: http://git-wip-us.apache.org/repos/asf/impala/diff/11497c2a

Branch: refs/heads/master
Commit: 11497c2aa9b49c031a9237e18a142ddbd115188f
Parents: e94c608
Author: Alex Behm <al...@cloudera.com>
Authored: Wed Dec 6 15:11:38 2017 -0800
Committer: Impala Public Jenkins <im...@gerrit.cloudera.org>
Committed: Fri Dec 8 23:11:36 2017 +0000

----------------------------------------------------------------------
 .../org/apache/impala/analysis/Analyzer.java    | 73 +++++++++--------
 .../org/apache/impala/planner/HdfsScanNode.java | 11 ++-
 .../impala/planner/RuntimeFilterGenerator.java  | 45 ++++++++---
 .../PlannerTest/runtime-filter-propagation.test | 82 ++++++++++++++++++++
 4 files changed, 168 insertions(+), 43 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/impala/blob/11497c2a/fe/src/main/java/org/apache/impala/analysis/Analyzer.java
----------------------------------------------------------------------
diff --git a/fe/src/main/java/org/apache/impala/analysis/Analyzer.java b/fe/src/main/java/org/apache/impala/analysis/Analyzer.java
index 0ecc958..b84f9ef 100644
--- a/fe/src/main/java/org/apache/impala/analysis/Analyzer.java
+++ b/fe/src/main/java/org/apache/impala/analysis/Analyzer.java
@@ -18,7 +18,6 @@
 package org.apache.impala.analysis;
 
 import java.util.ArrayList;
-import java.util.Arrays;
 import java.util.Collections;
 import java.util.HashMap;
 import java.util.HashSet;
@@ -54,11 +53,9 @@ import org.apache.impala.common.IdGenerator;
 import org.apache.impala.common.ImpalaException;
 import org.apache.impala.common.InternalException;
 import org.apache.impala.common.Pair;
-import org.apache.impala.common.PrintUtils;
 import org.apache.impala.common.RuntimeEnv;
 import org.apache.impala.planner.PlanNode;
 import org.apache.impala.rewrite.BetweenToCompoundRule;
-import org.apache.impala.rewrite.SimplifyDistinctFromRule;
 import org.apache.impala.rewrite.EqualityDisjunctsToInRule;
 import org.apache.impala.rewrite.ExprRewriteRule;
 import org.apache.impala.rewrite.ExprRewriter;
@@ -68,6 +65,7 @@ import org.apache.impala.rewrite.NormalizeBinaryPredicatesRule;
 import org.apache.impala.rewrite.NormalizeCountStarRule;
 import org.apache.impala.rewrite.NormalizeExprsRule;
 import org.apache.impala.rewrite.SimplifyConditionalsRule;
+import org.apache.impala.rewrite.SimplifyDistinctFromRule;
 import org.apache.impala.service.FeSupport;
 import org.apache.impala.thrift.TAccessEvent;
 import org.apache.impala.thrift.TCatalogObjectType;
@@ -75,8 +73,13 @@ import org.apache.impala.thrift.TLineageGraph;
 import org.apache.impala.thrift.TNetworkAddress;
 import org.apache.impala.thrift.TQueryCtx;
 import org.apache.impala.thrift.TQueryOptions;
-import org.apache.impala.util.*;
-import org.apache.impala.util.Graph.*;
+import org.apache.impala.util.DisjointSet;
+import org.apache.impala.util.Graph.RandomAccessibleGraph;
+import org.apache.impala.util.Graph.SccCondensedGraph;
+import org.apache.impala.util.Graph.WritableGraph;
+import org.apache.impala.util.IntIterator;
+import org.apache.impala.util.ListMap;
+import org.apache.impala.util.TSessionStateUtil;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
@@ -1477,17 +1480,20 @@ public class Analyzer {
   }
 
   /**
-   * Returns a list of predicates that are fully bound by destTid. Predicates are derived
-   * by replacing the slots of a source predicate with slots of the destTid, if every
-   * source slot has a value transfer to a slot in destTid.
+   * Returns a list of predicates that are fully bound by destTid. The generated
+   * predicates are for optimization purposes and not required for query correctness.
+   * It is up to the caller to decide if a bound predicate should actually be used.
+   * Predicates are derived by replacing the slots of a source predicate with slots of
+   * the destTid, if every source slot has a value transfer to a slot in destTid.
    * In particular, the returned list contains predicates that must be evaluated
    * at a join node (bound to outer-joined tuple) but can also be safely evaluated by a
    * plan node materializing destTid. Such predicates are not marked as assigned.
    * All other inferred predicates are marked as assigned if 'markAssigned'
    * is true. This function returns bound predicates regardless of whether the source
-   * predicated have been assigned. It is up to the caller to decide if a bound predicate
-   * should actually be used.
+   * predicates have been assigned.
    * Destination slots in destTid can be ignored by passing them in ignoreSlots.
+   * Some bound predicates may be missed due to errors in backend expr evaluation
+   * or expr substitution.
    * TODO: exclude UDFs from predicate propagation? their overloaded variants could
    * have very different semantics
    */
@@ -1511,16 +1517,7 @@ public class Analyzer {
 
       // Indicates whether there is value transfer from the source slots to slots that
       // belong to an outer-joined tuple.
-      boolean hasOuterJoinedTuple = false;
-      for (SlotId srcSid: srcSids) {
-        for (SlotId dst : getValueTransferTargets(srcSid)) {
-          if (isOuterJoined(getTupleId(dst))) {
-            hasOuterJoinedTuple = true;
-            break;
-          }
-        }
-        if (hasOuterJoinedTuple) break;
-      }
+      boolean hasOuterJoinedTuple = hasOuterJoinedValueTransferTarget(srcSids);
 
       // It is incorrect to propagate predicates into a plan subtree that is on the
       // nullable side of an outer join if the predicate evaluates to true when all
@@ -1535,7 +1532,15 @@ public class Analyzer {
       // TODO: Make the check precise by considering the blocks (analyzers) where the
       // outer-joined tuples in the dest slot's equivalence classes appear
       // relative to 'srcConjunct'.
-      if (hasOuterJoinedTuple && isTrueWithNullSlots(srcConjunct)) continue;
+      try {
+        if (hasOuterJoinedTuple && isTrueWithNullSlots(srcConjunct)) continue;
+      } catch (InternalException e) {
+        // Expr evaluation failed in the backend. Skip 'srcConjunct' since we cannot
+        // determine whether propagation is safe.
+        LOG.warn("Skipping propagation of conjunct because backend evaluation failed: "
+            + srcConjunct.toSql(), e);
+        continue;
+      }
 
       // if srcConjunct comes out of an OJ's On clause, we need to make sure it's the
       // same as the one that makes destTid nullable
@@ -1629,6 +1634,19 @@ public class Analyzer {
   }
 
   /**
+   * Returns true if any of the given slot ids or their value-transfer targets belong
+   * to an outer-joined tuple.
+   */
+  public boolean hasOuterJoinedValueTransferTarget(List<SlotId> sids) {
+    for (SlotId srcSid: sids) {
+      for (SlotId dstSid: getValueTransferTargets(srcSid)) {
+        if (isOuterJoined(getTupleId(dstSid))) return true;
+      }
+    }
+    return false;
+  }
+
+  /**
    * For each slot equivalence class, adds/removes predicates from conjuncts such that it
    * contains a minimum set of <lhsSlot> = <rhsSlot> predicates that establish the known
    * equivalences between slots in lhsTids and rhsTids which must be disjoint. Preserves
@@ -1891,10 +1909,9 @@ public class Analyzer {
 
   /**
    * Returns true if 'p' evaluates to true when all its referenced slots are NULL,
-   * false otherwise.
-   * TODO: Can we avoid dealing with the exceptions thrown by analysis and eval?
+   * returns false otherwise. Throws if backend expression evaluation fails.
    */
-  public boolean isTrueWithNullSlots(Expr p) {
+  public boolean isTrueWithNullSlots(Expr p) throws InternalException {
     // Construct predicate with all SlotRefs substituted by NullLiterals.
     List<SlotRef> slotRefs = Lists.newArrayList();
     p.collect(Predicates.instanceOf(SlotRef.class), slotRefs);
@@ -1908,13 +1925,7 @@ public class Analyzer {
         nullSmap.put(slotRef.clone(), NullLiteral.create(slotRef.getType()));
     }
     Expr nullTuplePred = p.substitute(nullSmap, this, false);
-    try {
-      return FeSupport.EvalPredicate(nullTuplePred, getQueryCtx());
-    } catch (InternalException e) {
-      Preconditions.checkState(false, "Failed to evaluate generated predicate: "
-          + nullTuplePred.toSql() + "." + e.getMessage());
-    }
-    return true;
+    return FeSupport.EvalPredicate(nullTuplePred, getQueryCtx());
   }
 
   public TupleId getTupleId(SlotId slotId) {

http://git-wip-us.apache.org/repos/asf/impala/blob/11497c2a/fe/src/main/java/org/apache/impala/planner/HdfsScanNode.java
----------------------------------------------------------------------
diff --git a/fe/src/main/java/org/apache/impala/planner/HdfsScanNode.java b/fe/src/main/java/org/apache/impala/planner/HdfsScanNode.java
index 964119a..b13f435 100644
--- a/fe/src/main/java/org/apache/impala/planner/HdfsScanNode.java
+++ b/fe/src/main/java/org/apache/impala/planner/HdfsScanNode.java
@@ -57,6 +57,7 @@ import org.apache.impala.catalog.Type;
 import org.apache.impala.common.FileSystemUtil;
 import org.apache.impala.common.ImpalaException;
 import org.apache.impala.common.ImpalaRuntimeException;
+import org.apache.impala.common.InternalException;
 import org.apache.impala.common.NotImplementedException;
 import org.apache.impala.common.PrintUtils;
 import org.apache.impala.common.RuntimeEnv;
@@ -631,7 +632,15 @@ public class HdfsScanNode extends ScanNode {
       // contain an entry for NULL and do not provide an indication about
       // whether NULLs are present. A conjunct that evaluates to true on NULL
       // cannot be evaluated purely on the dictionary.
-      if (analyzer.isTrueWithNullSlots(conjunct)) continue;
+      try {
+        if (analyzer.isTrueWithNullSlots(conjunct)) continue;
+      } catch (InternalException e) {
+        // Expr evaluation failed in the backend. Skip this conjunct since we cannot
+        // determine whether it is safe to apply it against a dictionary.
+        LOG.warn("Skipping dictionary filter because backend evaluation failed: "
+            + conjunct.toSql(), e);
+        continue;
+      }
 
       // TODO: Should there be a limit on the cost/structure of the conjunct?
       Integer slotIdInt = slotIds.get(0).asInt();

http://git-wip-us.apache.org/repos/asf/impala/blob/11497c2a/fe/src/main/java/org/apache/impala/planner/RuntimeFilterGenerator.java
----------------------------------------------------------------------
diff --git a/fe/src/main/java/org/apache/impala/planner/RuntimeFilterGenerator.java b/fe/src/main/java/org/apache/impala/planner/RuntimeFilterGenerator.java
index 758e79d..d295bf5 100644
--- a/fe/src/main/java/org/apache/impala/planner/RuntimeFilterGenerator.java
+++ b/fe/src/main/java/org/apache/impala/planner/RuntimeFilterGenerator.java
@@ -30,6 +30,7 @@ import org.apache.impala.analysis.BinaryPredicate.Operator;
 import org.apache.impala.analysis.CastExpr;
 import org.apache.impala.analysis.Expr;
 import org.apache.impala.analysis.ExprSubstitutionMap;
+import org.apache.impala.analysis.IsNullPredicate;
 import org.apache.impala.analysis.Predicate;
 import org.apache.impala.analysis.SlotDescriptor;
 import org.apache.impala.analysis.SlotId;
@@ -40,14 +41,15 @@ import org.apache.impala.analysis.TupleIsNullPredicate;
 import org.apache.impala.catalog.KuduColumn;
 import org.apache.impala.catalog.Table;
 import org.apache.impala.catalog.Type;
-import org.apache.impala.common.AnalysisException;
 import org.apache.impala.common.IdGenerator;
+import org.apache.impala.common.InternalException;
 import org.apache.impala.planner.JoinNode.DistributionMode;
-import org.apache.impala.planner.PlanNode;
 import org.apache.impala.thrift.TRuntimeFilterDesc;
 import org.apache.impala.thrift.TRuntimeFilterMode;
 import org.apache.impala.thrift.TRuntimeFilterTargetDesc;
 import org.apache.impala.thrift.TRuntimeFilterType;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
 
 import com.google.common.base.Joiner;
 import com.google.common.base.Preconditions;
@@ -55,9 +57,6 @@ import com.google.common.collect.Lists;
 import com.google.common.collect.Maps;
 import com.google.common.collect.Sets;
 
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
 /**
  * Class used for generating and assigning runtime filters to a query plan using
  * runtime filter propagation. Runtime filter propagation is an optimization technique
@@ -138,7 +137,7 @@ public final class RuntimeFilterGenerator {
     // Once set, it can't be unset.
     private boolean finalized_ = false;
     // The type of filter to build.
-    private TRuntimeFilterType type_;
+    private final TRuntimeFilterType type_;
 
     /**
      * Internal representation of a runtime filter target.
@@ -264,16 +263,16 @@ public final class RuntimeFilterGenerator {
               filterSrcNode.getChild(1).getTupleIds(), analyzer);
       if (normalizedJoinConjunct == null) return null;
 
-      Expr targetExpr = normalizedJoinConjunct.getChild(0);
+      // Ensure that the target expr does not contain TupleIsNull predicates as these
+      // can't be evaluated at a scan node.
+      Expr targetExpr =
+          TupleIsNullPredicate.unwrapExpr(normalizedJoinConjunct.getChild(0).clone());
       Expr srcExpr = normalizedJoinConjunct.getChild(1);
 
       Map<TupleId, List<SlotId>> targetSlots = getTargetSlots(analyzer, targetExpr);
       Preconditions.checkNotNull(targetSlots);
       if (targetSlots.isEmpty()) return null;
 
-      // Ensure that the targer expr does not contain TupleIsNull predicates as these
-      // can't be evaluated at a scan node.
-      targetExpr = TupleIsNullPredicate.unwrapExpr(targetExpr.clone());
       if (LOG.isTraceEnabled()) {
         LOG.trace("Generating runtime filter from predicate " + joinPredicate);
       }
@@ -285,7 +284,8 @@ public final class RuntimeFilterGenerator {
      * Returns the ids of base table tuple slots on which a runtime filter expr can be
      * applied. Due to the existence of equivalence classes, a filter expr may be
      * applicable at multiple scan nodes. The returned slot ids are grouped by tuple id.
-     * Returns an empty collection if the filter expr cannot be applied at a base table.
+     * Returns an empty collection if the filter expr cannot be applied at a base table
+     * or if applying the filter might lead to incorrect results.
      */
     private static Map<TupleId, List<SlotId>> getTargetSlots(Analyzer analyzer,
         Expr expr) {
@@ -293,6 +293,29 @@ public final class RuntimeFilterGenerator {
       List<TupleId> tids = Lists.newArrayList();
       List<SlotId> sids = Lists.newArrayList();
       expr.getIds(tids, sids);
+
+      // IMPALA-6286: If the target expression evaluates to a non-NULL value for
+      // outer-join non-matches, then assigning the filter below the nullable side of
+      // an outer join may produce incorrect query results.
+      // This check is conservative but correct to keep the code simple. In particular,
+      // it would otherwise be difficult to identify incorrect runtime filter assignments
+      // through outer-joined inline views because the 'expr' has already been fully
+      // resolved. We rely on the value-transfer graph to check whether 'expr' could
+      // potentially be assigned below an outer-joined inline view.
+      if (analyzer.hasOuterJoinedValueTransferTarget(sids)) {
+        Expr isNotNullPred = new IsNullPredicate(expr, true);
+        isNotNullPred.analyzeNoThrow(analyzer);
+        try {
+          if (analyzer.isTrueWithNullSlots(isNotNullPred)) return Collections.emptyMap();
+        } catch (InternalException e) {
+          // Expr evaluation failed in the backend. Skip this runtime filter since we
+          // cannot determine whether it is safe to assign it.
+          LOG.warn("Skipping runtime filter because backend evaluation failed: "
+              + isNotNullPred.toSql(), e);
+          return Collections.emptyMap();
+        }
+      }
+
       Map<TupleId, List<SlotId>> slotsByTid = Maps.newHashMap();
       // We need to iterate over all the slots of 'expr' and check if they have
       // equivalent slots that are bound by the same base table tuple(s).

http://git-wip-us.apache.org/repos/asf/impala/blob/11497c2a/testdata/workloads/functional-planner/queries/PlannerTest/runtime-filter-propagation.test
----------------------------------------------------------------------
diff --git a/testdata/workloads/functional-planner/queries/PlannerTest/runtime-filter-propagation.test b/testdata/workloads/functional-planner/queries/PlannerTest/runtime-filter-propagation.test
index 5e60d35..9ca806f 100644
--- a/testdata/workloads/functional-planner/queries/PlannerTest/runtime-filter-propagation.test
+++ b/testdata/workloads/functional-planner/queries/PlannerTest/runtime-filter-propagation.test
@@ -1456,3 +1456,85 @@ PLAN-ROOT SINK
    partitions=1/1 files=1 size=6.23MB
    runtime filters: RF000 -> p_partkey, RF001 -> p_retailprice
 ====
+# IMPALA-6286: Runtime filter must not be assigned at scan 01 because that could
+# alter the query results due to the coalesce() in the join condition of join 04.
+select /* +straight_join */ 1 from functional.alltypestiny t1
+left outer join functional.alltypestiny t2
+  on t1.int_col = t2.int_col
+where coalesce(t2.id + 10, 100) in (select 100);
+---- PLAN
+PLAN-ROOT SINK
+|
+04:HASH JOIN [LEFT SEMI JOIN]
+|  hash predicates: coalesce(t2.id + 10, 100) = `$a$1`.`$c$1`
+|
+|--02:UNION
+|     constant-operands=1
+|
+03:HASH JOIN [LEFT OUTER JOIN]
+|  hash predicates: t1.int_col = t2.int_col
+|
+|--01:SCAN HDFS [functional.alltypestiny t2]
+|     partitions=4/4 files=4 size=460B
+|
+00:SCAN HDFS [functional.alltypestiny t1]
+   partitions=4/4 files=4 size=460B
+====
+# IMPALA-6286: Same as above but with an inline view.
+select /* +straight_join */ 1 from functional.alltypestiny t1
+left outer join (select * from functional.alltypestiny t2) v
+  on t1.int_col = v.int_col
+where coalesce(v.id + 10, 100) in (select 100);
+---- PLAN
+PLAN-ROOT SINK
+|
+04:HASH JOIN [LEFT SEMI JOIN]
+|  hash predicates: coalesce(t2.id + 10, 100) = `$a$1`.`$c$1`
+|
+|--02:UNION
+|     constant-operands=1
+|
+03:HASH JOIN [LEFT OUTER JOIN]
+|  hash predicates: t1.int_col = t2.int_col
+|
+|--01:SCAN HDFS [functional.alltypestiny t2]
+|     partitions=4/4 files=4 size=460B
+|
+00:SCAN HDFS [functional.alltypestiny t1]
+   partitions=4/4 files=4 size=460B
+====
+# IMPALA-6286: The runtime filter produced by inner join 05 can safely be assigned
+# at scan 01. It would also be safe to produce a runtime filter at join 06 and assign
+# it to scan 00, but our check is too conservative to recognize the opportunity.
+select /* +straight_join */ 1 from functional.alltypestiny t1
+left outer join functional.alltypestiny t2
+  on t1.int_col = t2.int_col
+inner join functional.alltypestiny t3
+  on t2.id = t3.id
+where coalesce(t2.id + 10, 100) in (select 100)
+---- PLAN
+PLAN-ROOT SINK
+|
+06:HASH JOIN [LEFT SEMI JOIN]
+|  hash predicates: coalesce(t2.id + 10, 100) = `$a$1`.`$c$1`
+|
+|--03:UNION
+|     constant-operands=1
+|
+05:HASH JOIN [INNER JOIN]
+|  hash predicates: t2.id = t3.id
+|  runtime filters: RF000 <- t3.id
+|
+|--02:SCAN HDFS [functional.alltypestiny t3]
+|     partitions=4/4 files=4 size=460B
+|
+04:HASH JOIN [LEFT OUTER JOIN]
+|  hash predicates: t1.int_col = t2.int_col
+|
+|--01:SCAN HDFS [functional.alltypestiny t2]
+|     partitions=4/4 files=4 size=460B
+|     runtime filters: RF000 -> t2.id
+|
+00:SCAN HDFS [functional.alltypestiny t1]
+   partitions=4/4 files=4 size=460B
+====