You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@spark.apache.org by li...@apache.org on 2017/09/07 05:15:28 UTC

spark git commit: [SPARK-21835][SQL][FOLLOW-UP] RewritePredicateSubquery should not produce unresolved query plans

Repository: spark
Updated Branches:
  refs/heads/master aad212547 -> ce7293c15


[SPARK-21835][SQL][FOLLOW-UP] RewritePredicateSubquery should not produce unresolved query plans

## What changes were proposed in this pull request?

This is a follow-up of #19050 to deal with `ExistenceJoin` case.

## How was this patch tested?

Added test.

Author: Liang-Chi Hsieh <vi...@gmail.com>

Closes #19151 from viirya/SPARK-21835-followup.


Project: http://git-wip-us.apache.org/repos/asf/spark/repo
Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/ce7293c1
Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/ce7293c1
Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/ce7293c1

Branch: refs/heads/master
Commit: ce7293c150c71a872d20beda44b12dec9deca18d
Parents: aad2125
Author: Liang-Chi Hsieh <vi...@gmail.com>
Authored: Wed Sep 6 22:15:25 2017 -0700
Committer: gatorsmile <ga...@gmail.com>
Committed: Wed Sep 6 22:15:25 2017 -0700

----------------------------------------------------------------------
 .../apache/spark/sql/catalyst/optimizer/subquery.scala  | 11 +++++++----
 .../test/scala/org/apache/spark/sql/SubquerySuite.scala | 12 ++++++++++++
 2 files changed, 19 insertions(+), 4 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/spark/blob/ce7293c1/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/optimizer/subquery.scala
----------------------------------------------------------------------
diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/optimizer/subquery.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/optimizer/subquery.scala
index 7ff8915..64b2856 100644
--- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/optimizer/subquery.scala
+++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/optimizer/subquery.scala
@@ -49,12 +49,12 @@ object RewritePredicateSubquery extends Rule[LogicalPlan] with PredicateHelper {
     }
   }
 
-  private def dedupJoin(joinPlan: Join): Join = joinPlan match {
+  private def dedupJoin(joinPlan: LogicalPlan): LogicalPlan = joinPlan match {
     // SPARK-21835: It is possibly that the two sides of the join have conflicting attributes,
     // the produced join then becomes unresolved and break structural integrity. We should
     // de-duplicate conflicting attributes. We don't use transformation here because we only
     // care about the most top join converted from correlated predicate subquery.
-    case j @ Join(left, right, joinType @ (LeftSemi | LeftAnti), joinCond) =>
+    case j @ Join(left, right, joinType @ (LeftSemi | LeftAnti | ExistenceJoin(_)), joinCond) =>
       val duplicates = right.outputSet.intersect(left.outputSet)
       if (duplicates.nonEmpty) {
         val aliasMap = AttributeMap(duplicates.map { dup =>
@@ -145,13 +145,16 @@ object RewritePredicateSubquery extends Rule[LogicalPlan] with PredicateHelper {
       e transformUp {
         case Exists(sub, conditions, _) =>
           val exists = AttributeReference("exists", BooleanType, nullable = false)()
-          newPlan = Join(newPlan, sub, ExistenceJoin(exists), conditions.reduceLeftOption(And))
+          // Deduplicate conflicting attributes if any.
+          newPlan = dedupJoin(
+            Join(newPlan, sub, ExistenceJoin(exists), conditions.reduceLeftOption(And)))
           exists
         case In(value, Seq(ListQuery(sub, conditions, _, _))) =>
           val exists = AttributeReference("exists", BooleanType, nullable = false)()
           val inConditions = getValueExpression(value).zip(sub.output).map(EqualTo.tupled)
           val newConditions = (inConditions ++ conditions).reduceLeftOption(And)
-          newPlan = Join(newPlan, sub, ExistenceJoin(exists), newConditions)
+          // Deduplicate conflicting attributes if any.
+          newPlan = dedupJoin(Join(newPlan, sub, ExistenceJoin(exists), newConditions))
           exists
       }
     }

http://git-wip-us.apache.org/repos/asf/spark/blob/ce7293c1/sql/core/src/test/scala/org/apache/spark/sql/SubquerySuite.scala
----------------------------------------------------------------------
diff --git a/sql/core/src/test/scala/org/apache/spark/sql/SubquerySuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/SubquerySuite.scala
index ee6905e..8673dc1 100644
--- a/sql/core/src/test/scala/org/apache/spark/sql/SubquerySuite.scala
+++ b/sql/core/src/test/scala/org/apache/spark/sql/SubquerySuite.scala
@@ -938,4 +938,16 @@ class SubquerySuite extends QueryTest with SharedSQLContext {
       }
     }
   }
+
+  test("SPARK-21835: Join in correlated subquery should be duplicateResolved: case 3") {
+    val sqlText =
+      """
+        |SELECT * FROM l, r WHERE l.a = r.c + 1 AND
+        |(EXISTS (SELECT * FROM r) OR l.a = r.c)
+      """.stripMargin
+    val optimizedPlan = sql(sqlText).queryExecution.optimizedPlan
+    val join = optimizedPlan.collectFirst { case j: Join => j }.get
+    assert(join.duplicateResolved)
+    assert(optimizedPlan.resolved)
+  }
 }


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@spark.apache.org
For additional commands, e-mail: commits-help@spark.apache.org