You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@impala.apache.org by ab...@apache.org on 2017/12/08 23:28:01 UTC
[2/2] impala git commit: IMPALA-6286: Remove invalid runtime filter
targets.
IMPALA-6286: Remove invalid runtime filter targets.
If the target expression of a runtime filter evaluates to a
non-NULL value for outer-join non-matches, then assigning
the filter below the nullable side of an outer join may
lead to incorrect query results.
See IMPALA-6286 for an example and explanation.
This patch adds a conservative check that prevents the
creation of runtime filters that could potentially
have such incorrect targets. Some safe opportunities
are deliberately missed to keep the code simple.
See RuntimeFilterGenerator#getTargetSlots().
Testing:
- added planner tests which passed locally
Change-Id: I88153eea9f4b5117df60366fad2bd91776b95298
Reviewed-on: http://gerrit.cloudera.org:8080/8783
Reviewed-by: Alex Behm <al...@cloudera.com>
Tested-by: Impala Public Jenkins
Project: http://git-wip-us.apache.org/repos/asf/impala/repo
Commit: http://git-wip-us.apache.org/repos/asf/impala/commit/11497c2a
Tree: http://git-wip-us.apache.org/repos/asf/impala/tree/11497c2a
Diff: http://git-wip-us.apache.org/repos/asf/impala/diff/11497c2a
Branch: refs/heads/master
Commit: 11497c2aa9b49c031a9237e18a142ddbd115188f
Parents: e94c608
Author: Alex Behm <al...@cloudera.com>
Authored: Wed Dec 6 15:11:38 2017 -0800
Committer: Impala Public Jenkins <im...@gerrit.cloudera.org>
Committed: Fri Dec 8 23:11:36 2017 +0000
----------------------------------------------------------------------
.../org/apache/impala/analysis/Analyzer.java | 73 +++++++++--------
.../org/apache/impala/planner/HdfsScanNode.java | 11 ++-
.../impala/planner/RuntimeFilterGenerator.java | 45 ++++++++---
.../PlannerTest/runtime-filter-propagation.test | 82 ++++++++++++++++++++
4 files changed, 168 insertions(+), 43 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/impala/blob/11497c2a/fe/src/main/java/org/apache/impala/analysis/Analyzer.java
----------------------------------------------------------------------
diff --git a/fe/src/main/java/org/apache/impala/analysis/Analyzer.java b/fe/src/main/java/org/apache/impala/analysis/Analyzer.java
index 0ecc958..b84f9ef 100644
--- a/fe/src/main/java/org/apache/impala/analysis/Analyzer.java
+++ b/fe/src/main/java/org/apache/impala/analysis/Analyzer.java
@@ -18,7 +18,6 @@
package org.apache.impala.analysis;
import java.util.ArrayList;
-import java.util.Arrays;
import java.util.Collections;
import java.util.HashMap;
import java.util.HashSet;
@@ -54,11 +53,9 @@ import org.apache.impala.common.IdGenerator;
import org.apache.impala.common.ImpalaException;
import org.apache.impala.common.InternalException;
import org.apache.impala.common.Pair;
-import org.apache.impala.common.PrintUtils;
import org.apache.impala.common.RuntimeEnv;
import org.apache.impala.planner.PlanNode;
import org.apache.impala.rewrite.BetweenToCompoundRule;
-import org.apache.impala.rewrite.SimplifyDistinctFromRule;
import org.apache.impala.rewrite.EqualityDisjunctsToInRule;
import org.apache.impala.rewrite.ExprRewriteRule;
import org.apache.impala.rewrite.ExprRewriter;
@@ -68,6 +65,7 @@ import org.apache.impala.rewrite.NormalizeBinaryPredicatesRule;
import org.apache.impala.rewrite.NormalizeCountStarRule;
import org.apache.impala.rewrite.NormalizeExprsRule;
import org.apache.impala.rewrite.SimplifyConditionalsRule;
+import org.apache.impala.rewrite.SimplifyDistinctFromRule;
import org.apache.impala.service.FeSupport;
import org.apache.impala.thrift.TAccessEvent;
import org.apache.impala.thrift.TCatalogObjectType;
@@ -75,8 +73,13 @@ import org.apache.impala.thrift.TLineageGraph;
import org.apache.impala.thrift.TNetworkAddress;
import org.apache.impala.thrift.TQueryCtx;
import org.apache.impala.thrift.TQueryOptions;
-import org.apache.impala.util.*;
-import org.apache.impala.util.Graph.*;
+import org.apache.impala.util.DisjointSet;
+import org.apache.impala.util.Graph.RandomAccessibleGraph;
+import org.apache.impala.util.Graph.SccCondensedGraph;
+import org.apache.impala.util.Graph.WritableGraph;
+import org.apache.impala.util.IntIterator;
+import org.apache.impala.util.ListMap;
+import org.apache.impala.util.TSessionStateUtil;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@@ -1477,17 +1480,20 @@ public class Analyzer {
}
/**
- * Returns a list of predicates that are fully bound by destTid. Predicates are derived
- * by replacing the slots of a source predicate with slots of the destTid, if every
- * source slot has a value transfer to a slot in destTid.
+ * Returns a list of predicates that are fully bound by destTid. The generated
+ * predicates are for optimization purposes and not required for query correctness.
+ * It is up to the caller to decide if a bound predicate should actually be used.
+ * Predicates are derived by replacing the slots of a source predicate with slots of
+ * the destTid, if every source slot has a value transfer to a slot in destTid.
* In particular, the returned list contains predicates that must be evaluated
* at a join node (bound to outer-joined tuple) but can also be safely evaluated by a
* plan node materializing destTid. Such predicates are not marked as assigned.
* All other inferred predicates are marked as assigned if 'markAssigned'
* is true. This function returns bound predicates regardless of whether the source
- * predicated have been assigned. It is up to the caller to decide if a bound predicate
- * should actually be used.
+ * predicates have been assigned.
* Destination slots in destTid can be ignored by passing them in ignoreSlots.
+ * Some bound predicates may be missed due to errors in backend expr evaluation
+ * or expr substitution.
* TODO: exclude UDFs from predicate propagation? their overloaded variants could
* have very different semantics
*/
@@ -1511,16 +1517,7 @@ public class Analyzer {
// Indicates whether there is value transfer from the source slots to slots that
// belong to an outer-joined tuple.
- boolean hasOuterJoinedTuple = false;
- for (SlotId srcSid: srcSids) {
- for (SlotId dst : getValueTransferTargets(srcSid)) {
- if (isOuterJoined(getTupleId(dst))) {
- hasOuterJoinedTuple = true;
- break;
- }
- }
- if (hasOuterJoinedTuple) break;
- }
+ boolean hasOuterJoinedTuple = hasOuterJoinedValueTransferTarget(srcSids);
// It is incorrect to propagate predicates into a plan subtree that is on the
// nullable side of an outer join if the predicate evaluates to true when all
@@ -1535,7 +1532,15 @@ public class Analyzer {
// TODO: Make the check precise by considering the blocks (analyzers) where the
// outer-joined tuples in the dest slot's equivalence classes appear
// relative to 'srcConjunct'.
- if (hasOuterJoinedTuple && isTrueWithNullSlots(srcConjunct)) continue;
+ try {
+ if (hasOuterJoinedTuple && isTrueWithNullSlots(srcConjunct)) continue;
+ } catch (InternalException e) {
+ // Expr evaluation failed in the backend. Skip 'srcConjunct' since we cannot
+ // determine whether propagation is safe.
+ LOG.warn("Skipping propagation of conjunct because backend evaluation failed: "
+ + srcConjunct.toSql(), e);
+ continue;
+ }
// if srcConjunct comes out of an OJ's On clause, we need to make sure it's the
// same as the one that makes destTid nullable
@@ -1629,6 +1634,19 @@ public class Analyzer {
}
/**
+ * Returns true if any of the given slot ids or their value-transfer targets belong
+ * to an outer-joined tuple.
+ */
+ public boolean hasOuterJoinedValueTransferTarget(List<SlotId> sids) {
+ for (SlotId srcSid: sids) {
+ for (SlotId dstSid: getValueTransferTargets(srcSid)) {
+ if (isOuterJoined(getTupleId(dstSid))) return true;
+ }
+ }
+ return false;
+ }
+
+ /**
* For each slot equivalence class, adds/removes predicates from conjuncts such that it
* contains a minimum set of <lhsSlot> = <rhsSlot> predicates that establish the known
* equivalences between slots in lhsTids and rhsTids which must be disjoint. Preserves
@@ -1891,10 +1909,9 @@ public class Analyzer {
/**
* Returns true if 'p' evaluates to true when all its referenced slots are NULL,
- * false otherwise.
- * TODO: Can we avoid dealing with the exceptions thrown by analysis and eval?
+ * returns false otherwise. Throws if backend expression evaluation fails.
*/
- public boolean isTrueWithNullSlots(Expr p) {
+ public boolean isTrueWithNullSlots(Expr p) throws InternalException {
// Construct predicate with all SlotRefs substituted by NullLiterals.
List<SlotRef> slotRefs = Lists.newArrayList();
p.collect(Predicates.instanceOf(SlotRef.class), slotRefs);
@@ -1908,13 +1925,7 @@ public class Analyzer {
nullSmap.put(slotRef.clone(), NullLiteral.create(slotRef.getType()));
}
Expr nullTuplePred = p.substitute(nullSmap, this, false);
- try {
- return FeSupport.EvalPredicate(nullTuplePred, getQueryCtx());
- } catch (InternalException e) {
- Preconditions.checkState(false, "Failed to evaluate generated predicate: "
- + nullTuplePred.toSql() + "." + e.getMessage());
- }
- return true;
+ return FeSupport.EvalPredicate(nullTuplePred, getQueryCtx());
}
public TupleId getTupleId(SlotId slotId) {
http://git-wip-us.apache.org/repos/asf/impala/blob/11497c2a/fe/src/main/java/org/apache/impala/planner/HdfsScanNode.java
----------------------------------------------------------------------
diff --git a/fe/src/main/java/org/apache/impala/planner/HdfsScanNode.java b/fe/src/main/java/org/apache/impala/planner/HdfsScanNode.java
index 964119a..b13f435 100644
--- a/fe/src/main/java/org/apache/impala/planner/HdfsScanNode.java
+++ b/fe/src/main/java/org/apache/impala/planner/HdfsScanNode.java
@@ -57,6 +57,7 @@ import org.apache.impala.catalog.Type;
import org.apache.impala.common.FileSystemUtil;
import org.apache.impala.common.ImpalaException;
import org.apache.impala.common.ImpalaRuntimeException;
+import org.apache.impala.common.InternalException;
import org.apache.impala.common.NotImplementedException;
import org.apache.impala.common.PrintUtils;
import org.apache.impala.common.RuntimeEnv;
@@ -631,7 +632,15 @@ public class HdfsScanNode extends ScanNode {
// contain an entry for NULL and do not provide an indication about
// whether NULLs are present. A conjunct that evaluates to true on NULL
// cannot be evaluated purely on the dictionary.
- if (analyzer.isTrueWithNullSlots(conjunct)) continue;
+ try {
+ if (analyzer.isTrueWithNullSlots(conjunct)) continue;
+ } catch (InternalException e) {
+ // Expr evaluation failed in the backend. Skip this conjunct since we cannot
+ // determine whether it is safe to apply it against a dictionary.
+ LOG.warn("Skipping dictionary filter because backend evaluation failed: "
+ + conjunct.toSql(), e);
+ continue;
+ }
// TODO: Should there be a limit on the cost/structure of the conjunct?
Integer slotIdInt = slotIds.get(0).asInt();
http://git-wip-us.apache.org/repos/asf/impala/blob/11497c2a/fe/src/main/java/org/apache/impala/planner/RuntimeFilterGenerator.java
----------------------------------------------------------------------
diff --git a/fe/src/main/java/org/apache/impala/planner/RuntimeFilterGenerator.java b/fe/src/main/java/org/apache/impala/planner/RuntimeFilterGenerator.java
index 758e79d..d295bf5 100644
--- a/fe/src/main/java/org/apache/impala/planner/RuntimeFilterGenerator.java
+++ b/fe/src/main/java/org/apache/impala/planner/RuntimeFilterGenerator.java
@@ -30,6 +30,7 @@ import org.apache.impala.analysis.BinaryPredicate.Operator;
import org.apache.impala.analysis.CastExpr;
import org.apache.impala.analysis.Expr;
import org.apache.impala.analysis.ExprSubstitutionMap;
+import org.apache.impala.analysis.IsNullPredicate;
import org.apache.impala.analysis.Predicate;
import org.apache.impala.analysis.SlotDescriptor;
import org.apache.impala.analysis.SlotId;
@@ -40,14 +41,15 @@ import org.apache.impala.analysis.TupleIsNullPredicate;
import org.apache.impala.catalog.KuduColumn;
import org.apache.impala.catalog.Table;
import org.apache.impala.catalog.Type;
-import org.apache.impala.common.AnalysisException;
import org.apache.impala.common.IdGenerator;
+import org.apache.impala.common.InternalException;
import org.apache.impala.planner.JoinNode.DistributionMode;
-import org.apache.impala.planner.PlanNode;
import org.apache.impala.thrift.TRuntimeFilterDesc;
import org.apache.impala.thrift.TRuntimeFilterMode;
import org.apache.impala.thrift.TRuntimeFilterTargetDesc;
import org.apache.impala.thrift.TRuntimeFilterType;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
import com.google.common.base.Joiner;
import com.google.common.base.Preconditions;
@@ -55,9 +57,6 @@ import com.google.common.collect.Lists;
import com.google.common.collect.Maps;
import com.google.common.collect.Sets;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
/**
* Class used for generating and assigning runtime filters to a query plan using
* runtime filter propagation. Runtime filter propagation is an optimization technique
@@ -138,7 +137,7 @@ public final class RuntimeFilterGenerator {
// Once set, it can't be unset.
private boolean finalized_ = false;
// The type of filter to build.
- private TRuntimeFilterType type_;
+ private final TRuntimeFilterType type_;
/**
* Internal representation of a runtime filter target.
@@ -264,16 +263,16 @@ public final class RuntimeFilterGenerator {
filterSrcNode.getChild(1).getTupleIds(), analyzer);
if (normalizedJoinConjunct == null) return null;
- Expr targetExpr = normalizedJoinConjunct.getChild(0);
+ // Ensure that the target expr does not contain TupleIsNull predicates as these
+ // can't be evaluated at a scan node.
+ Expr targetExpr =
+ TupleIsNullPredicate.unwrapExpr(normalizedJoinConjunct.getChild(0).clone());
Expr srcExpr = normalizedJoinConjunct.getChild(1);
Map<TupleId, List<SlotId>> targetSlots = getTargetSlots(analyzer, targetExpr);
Preconditions.checkNotNull(targetSlots);
if (targetSlots.isEmpty()) return null;
- // Ensure that the targer expr does not contain TupleIsNull predicates as these
- // can't be evaluated at a scan node.
- targetExpr = TupleIsNullPredicate.unwrapExpr(targetExpr.clone());
if (LOG.isTraceEnabled()) {
LOG.trace("Generating runtime filter from predicate " + joinPredicate);
}
@@ -285,7 +284,8 @@ public final class RuntimeFilterGenerator {
* Returns the ids of base table tuple slots on which a runtime filter expr can be
* applied. Due to the existence of equivalence classes, a filter expr may be
* applicable at multiple scan nodes. The returned slot ids are grouped by tuple id.
- * Returns an empty collection if the filter expr cannot be applied at a base table.
+ * Returns an empty collection if the filter expr cannot be applied at a base table
+ * or if applying the filter might lead to incorrect results.
*/
private static Map<TupleId, List<SlotId>> getTargetSlots(Analyzer analyzer,
Expr expr) {
@@ -293,6 +293,29 @@ public final class RuntimeFilterGenerator {
List<TupleId> tids = Lists.newArrayList();
List<SlotId> sids = Lists.newArrayList();
expr.getIds(tids, sids);
+
+ // IMPALA-6286: If the target expression evaluates to a non-NULL value for
+ // outer-join non-matches, then assigning the filter below the nullable side of
+ // an outer join may produce incorrect query results.
+ // This check is conservative but correct to keep the code simple. In particular,
+ // it would otherwise be difficult to identify incorrect runtime filter assignments
+ // through outer-joined inline views because the 'expr' has already been fully
+ // resolved. We rely on the value-transfer graph to check whether 'expr' could
+ // potentially be assigned below an outer-joined inline view.
+ if (analyzer.hasOuterJoinedValueTransferTarget(sids)) {
+ Expr isNotNullPred = new IsNullPredicate(expr, true);
+ isNotNullPred.analyzeNoThrow(analyzer);
+ try {
+ if (analyzer.isTrueWithNullSlots(isNotNullPred)) return Collections.emptyMap();
+ } catch (InternalException e) {
+ // Expr evaluation failed in the backend. Skip this runtime filter since we
+ // cannot determine whether it is safe to assign it.
+ LOG.warn("Skipping runtime filter because backend evaluation failed: "
+ + isNotNullPred.toSql(), e);
+ return Collections.emptyMap();
+ }
+ }
+
Map<TupleId, List<SlotId>> slotsByTid = Maps.newHashMap();
// We need to iterate over all the slots of 'expr' and check if they have
// equivalent slots that are bound by the same base table tuple(s).
http://git-wip-us.apache.org/repos/asf/impala/blob/11497c2a/testdata/workloads/functional-planner/queries/PlannerTest/runtime-filter-propagation.test
----------------------------------------------------------------------
diff --git a/testdata/workloads/functional-planner/queries/PlannerTest/runtime-filter-propagation.test b/testdata/workloads/functional-planner/queries/PlannerTest/runtime-filter-propagation.test
index 5e60d35..9ca806f 100644
--- a/testdata/workloads/functional-planner/queries/PlannerTest/runtime-filter-propagation.test
+++ b/testdata/workloads/functional-planner/queries/PlannerTest/runtime-filter-propagation.test
@@ -1456,3 +1456,85 @@ PLAN-ROOT SINK
partitions=1/1 files=1 size=6.23MB
runtime filters: RF000 -> p_partkey, RF001 -> p_retailprice
====
+# IMPALA-6286: Runtime filter must not be assigned at scan 01 because that could
+# alter the query results due to the coalesce() in the join condition of join 04.
+select /* +straight_join */ 1 from functional.alltypestiny t1
+left outer join functional.alltypestiny t2
+ on t1.int_col = t2.int_col
+where coalesce(t2.id + 10, 100) in (select 100);
+---- PLAN
+PLAN-ROOT SINK
+|
+04:HASH JOIN [LEFT SEMI JOIN]
+| hash predicates: coalesce(t2.id + 10, 100) = `$a$1`.`$c$1`
+|
+|--02:UNION
+| constant-operands=1
+|
+03:HASH JOIN [LEFT OUTER JOIN]
+| hash predicates: t1.int_col = t2.int_col
+|
+|--01:SCAN HDFS [functional.alltypestiny t2]
+| partitions=4/4 files=4 size=460B
+|
+00:SCAN HDFS [functional.alltypestiny t1]
+ partitions=4/4 files=4 size=460B
+====
+# IMPALA-6286: Same as above but with an inline view.
+select /* +straight_join */ 1 from functional.alltypestiny t1
+left outer join (select * from functional.alltypestiny t2) v
+ on t1.int_col = v.int_col
+where coalesce(v.id + 10, 100) in (select 100);
+---- PLAN
+PLAN-ROOT SINK
+|
+04:HASH JOIN [LEFT SEMI JOIN]
+| hash predicates: coalesce(t2.id + 10, 100) = `$a$1`.`$c$1`
+|
+|--02:UNION
+| constant-operands=1
+|
+03:HASH JOIN [LEFT OUTER JOIN]
+| hash predicates: t1.int_col = t2.int_col
+|
+|--01:SCAN HDFS [functional.alltypestiny t2]
+| partitions=4/4 files=4 size=460B
+|
+00:SCAN HDFS [functional.alltypestiny t1]
+ partitions=4/4 files=4 size=460B
+====
+# IMPALA-6286: The runtime filter produced by inner join 05 can safely be assigned
+# at scan 01. It would also be safe to produce a runtime filter at join 06 and assign
+# it to scan 00, but our check is too conservative to recognize the opportunity.
+select /* +straight_join */ 1 from functional.alltypestiny t1
+left outer join functional.alltypestiny t2
+ on t1.int_col = t2.int_col
+inner join functional.alltypestiny t3
+ on t2.id = t3.id
+where coalesce(t2.id + 10, 100) in (select 100)
+---- PLAN
+PLAN-ROOT SINK
+|
+06:HASH JOIN [LEFT SEMI JOIN]
+| hash predicates: coalesce(t2.id + 10, 100) = `$a$1`.`$c$1`
+|
+|--03:UNION
+| constant-operands=1
+|
+05:HASH JOIN [INNER JOIN]
+| hash predicates: t2.id = t3.id
+| runtime filters: RF000 <- t3.id
+|
+|--02:SCAN HDFS [functional.alltypestiny t3]
+| partitions=4/4 files=4 size=460B
+|
+04:HASH JOIN [LEFT OUTER JOIN]
+| hash predicates: t1.int_col = t2.int_col
+|
+|--01:SCAN HDFS [functional.alltypestiny t2]
+| partitions=4/4 files=4 size=460B
+| runtime filters: RF000 -> t2.id
+|
+00:SCAN HDFS [functional.alltypestiny t1]
+ partitions=4/4 files=4 size=460B
+====