You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@spark.apache.org by hv...@apache.org on 2017/03/28 11:43:31 UTC
spark git commit: [SPARK-20094][SQL] Preventing push down of IN
subquery to Join operator
Repository: spark
Updated Branches:
refs/heads/master a9abff281 -> 91559d277
[SPARK-20094][SQL] Preventing push down of IN subquery to Join operator
## What changes were proposed in this pull request?
TPCDS q45 fails becuase:
`ReorderJoin` collects all predicates and try to put them into join condition when creating ordered join. If a predicate with an IN subquery (`ListQuery`) is in a join condition instead of a filter condition, `RewritePredicateSubquery.rewriteExistentialExpr` would fail to convert the subquery to an `ExistenceJoin`, and thus result in error.
We should prevent push down of IN subquery to Join operator.
## How was this patch tested?
Add a new test case in `FilterPushdownSuite`.
Author: wangzhenhua <wa...@huawei.com>
Closes #17428 from wzhfy/noSubqueryInJoinCond.
Project: http://git-wip-us.apache.org/repos/asf/spark/repo
Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/91559d27
Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/91559d27
Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/91559d27
Branch: refs/heads/master
Commit: 91559d277f42ee83b79f5d8eb7ba037cf5c108da
Parents: a9abff2
Author: wangzhenhua <wa...@huawei.com>
Authored: Tue Mar 28 13:43:23 2017 +0200
Committer: Herman van Hovell <hv...@databricks.com>
Committed: Tue Mar 28 13:43:23 2017 +0200
----------------------------------------------------------------------
.../sql/catalyst/expressions/predicates.scala | 6 ++++++
.../optimizer/FilterPushdownSuite.scala | 20 ++++++++++++++++++++
2 files changed, 26 insertions(+)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/spark/blob/91559d27/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/predicates.scala
----------------------------------------------------------------------
diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/predicates.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/predicates.scala
index e5d1a1e..1235204 100644
--- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/predicates.scala
+++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/predicates.scala
@@ -90,6 +90,12 @@ trait PredicateHelper {
* Returns true iff `expr` could be evaluated as a condition within join.
*/
protected def canEvaluateWithinJoin(expr: Expression): Boolean = expr match {
+ case l: ListQuery =>
+ // A ListQuery defines the query which we want to search in an IN subquery expression.
+ // Currently the only way to evaluate an IN subquery is to convert it to a
+ // LeftSemi/LeftAnti/ExistenceJoin by `RewritePredicateSubquery` rule.
+ // It cannot be evaluated as part of a Join operator.
+ false
case e: SubqueryExpression =>
// non-correlated subquery will be replaced as literal
e.children.isEmpty
http://git-wip-us.apache.org/repos/asf/spark/blob/91559d27/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/optimizer/FilterPushdownSuite.scala
----------------------------------------------------------------------
diff --git a/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/optimizer/FilterPushdownSuite.scala b/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/optimizer/FilterPushdownSuite.scala
index 6feea40..d846786 100644
--- a/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/optimizer/FilterPushdownSuite.scala
+++ b/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/optimizer/FilterPushdownSuite.scala
@@ -836,6 +836,26 @@ class FilterPushdownSuite extends PlanTest {
comparePlans(optimized, answer)
}
+ test("SPARK-20094: don't push predicate with IN subquery into join condition") {
+ val x = testRelation.subquery('x)
+ val z = testRelation.subquery('z)
+ val w = testRelation1.subquery('w)
+
+ val queryPlan = x
+ .join(z)
+ .where(("x.b".attr === "z.b".attr) &&
+ ("x.a".attr > 1 || "z.c".attr.in(ListQuery(w.select("w.d".attr)))))
+ .analyze
+
+ val expectedPlan = x
+ .join(z, Inner, Some("x.b".attr === "z.b".attr))
+ .where("x.a".attr > 1 || "z.c".attr.in(ListQuery(w.select("w.d".attr))))
+ .analyze
+
+ val optimized = Optimize.execute(queryPlan)
+ comparePlans(optimized, expectedPlan)
+ }
+
test("Window: predicate push down -- basic") {
val winExpr = windowExpr(count('b), windowSpec('a :: Nil, 'b.asc :: Nil, UnspecifiedFrame))
---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@spark.apache.org
For additional commands, e-mail: commits-help@spark.apache.org