You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@spark.apache.org by hv...@apache.org on 2017/03/28 11:43:31 UTC

spark git commit: [SPARK-20094][SQL] Preventing push down of IN subquery to Join operator

Repository: spark
Updated Branches:
  refs/heads/master a9abff281 -> 91559d277


[SPARK-20094][SQL] Preventing push down of IN subquery to Join operator

## What changes were proposed in this pull request?

TPCDS q45 fails becuase:
`ReorderJoin` collects all predicates and try to put them into join condition when creating ordered join. If a predicate with an IN subquery (`ListQuery`) is in a join condition instead of a filter condition, `RewritePredicateSubquery.rewriteExistentialExpr` would fail to convert the subquery to an `ExistenceJoin`, and thus result in error.

We should prevent push down of IN subquery to Join operator.

## How was this patch tested?

Add a new test case in `FilterPushdownSuite`.

Author: wangzhenhua <wa...@huawei.com>

Closes #17428 from wzhfy/noSubqueryInJoinCond.


Project: http://git-wip-us.apache.org/repos/asf/spark/repo
Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/91559d27
Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/91559d27
Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/91559d27

Branch: refs/heads/master
Commit: 91559d277f42ee83b79f5d8eb7ba037cf5c108da
Parents: a9abff2
Author: wangzhenhua <wa...@huawei.com>
Authored: Tue Mar 28 13:43:23 2017 +0200
Committer: Herman van Hovell <hv...@databricks.com>
Committed: Tue Mar 28 13:43:23 2017 +0200

----------------------------------------------------------------------
 .../sql/catalyst/expressions/predicates.scala   |  6 ++++++
 .../optimizer/FilterPushdownSuite.scala         | 20 ++++++++++++++++++++
 2 files changed, 26 insertions(+)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/spark/blob/91559d27/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/predicates.scala
----------------------------------------------------------------------
diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/predicates.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/predicates.scala
index e5d1a1e..1235204 100644
--- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/predicates.scala
+++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/predicates.scala
@@ -90,6 +90,12 @@ trait PredicateHelper {
    * Returns true iff `expr` could be evaluated as a condition within join.
    */
   protected def canEvaluateWithinJoin(expr: Expression): Boolean = expr match {
+    case l: ListQuery =>
+      // A ListQuery defines the query which we want to search in an IN subquery expression.
+      // Currently the only way to evaluate an IN subquery is to convert it to a
+      // LeftSemi/LeftAnti/ExistenceJoin by `RewritePredicateSubquery` rule.
+      // It cannot be evaluated as part of a Join operator.
+      false
     case e: SubqueryExpression =>
       // non-correlated subquery will be replaced as literal
       e.children.isEmpty

http://git-wip-us.apache.org/repos/asf/spark/blob/91559d27/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/optimizer/FilterPushdownSuite.scala
----------------------------------------------------------------------
diff --git a/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/optimizer/FilterPushdownSuite.scala b/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/optimizer/FilterPushdownSuite.scala
index 6feea40..d846786 100644
--- a/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/optimizer/FilterPushdownSuite.scala
+++ b/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/optimizer/FilterPushdownSuite.scala
@@ -836,6 +836,26 @@ class FilterPushdownSuite extends PlanTest {
     comparePlans(optimized, answer)
   }
 
+  test("SPARK-20094: don't push predicate with IN subquery into join condition") {
+    val x = testRelation.subquery('x)
+    val z = testRelation.subquery('z)
+    val w = testRelation1.subquery('w)
+
+    val queryPlan = x
+      .join(z)
+      .where(("x.b".attr === "z.b".attr) &&
+        ("x.a".attr > 1 || "z.c".attr.in(ListQuery(w.select("w.d".attr)))))
+      .analyze
+
+    val expectedPlan = x
+      .join(z, Inner, Some("x.b".attr === "z.b".attr))
+      .where("x.a".attr > 1 || "z.c".attr.in(ListQuery(w.select("w.d".attr))))
+      .analyze
+
+    val optimized = Optimize.execute(queryPlan)
+    comparePlans(optimized, expectedPlan)
+  }
+
   test("Window: predicate push down -- basic") {
     val winExpr = windowExpr(count('b), windowSpec('a :: Nil, 'b.asc :: Nil, UnspecifiedFrame))
 


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@spark.apache.org
For additional commands, e-mail: commits-help@spark.apache.org