You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@spark.apache.org by li...@apache.org on 2017/09/06 14:42:25 UTC

spark git commit: [SPARK-21835][SQL] RewritePredicateSubquery should not produce unresolved query plans

Repository: spark
Updated Branches:
  refs/heads/master 64936c14a -> f2e22aebf


[SPARK-21835][SQL] RewritePredicateSubquery should not produce unresolved query plans

## What changes were proposed in this pull request?

Correlated predicate subqueries are rewritten into `Join` by the rule `RewritePredicateSubquery`  during optimization.

It is possibly that the two sides of the `Join` have conflicting attributes. The query plans produced by `RewritePredicateSubquery` become unresolved and break structural integrity.

We should check if there are conflicting attributes in the `Join` and de-duplicate them by adding a `Project`.

## How was this patch tested?

Added tests.

Author: Liang-Chi Hsieh <vi...@gmail.com>

Closes #19050 from viirya/SPARK-21835.


Project: http://git-wip-us.apache.org/repos/asf/spark/repo
Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/f2e22aeb
Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/f2e22aeb
Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/f2e22aeb

Branch: refs/heads/master
Commit: f2e22aebfe49cdfdf20f060305772971bcea9266
Parents: 64936c1
Author: Liang-Chi Hsieh <vi...@gmail.com>
Authored: Wed Sep 6 07:42:19 2017 -0700
Committer: gatorsmile <ga...@gmail.com>
Committed: Wed Sep 6 07:42:19 2017 -0700

----------------------------------------------------------------------
 .../spark/sql/catalyst/optimizer/subquery.scala | 39 ++++++++++--
 .../org/apache/spark/sql/SubquerySuite.scala    | 63 ++++++++++++++++++++
 2 files changed, 98 insertions(+), 4 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/spark/blob/f2e22aeb/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/optimizer/subquery.scala
----------------------------------------------------------------------
diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/optimizer/subquery.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/optimizer/subquery.scala
index 4386a10..7ff8915 100644
--- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/optimizer/subquery.scala
+++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/optimizer/subquery.scala
@@ -49,6 +49,33 @@ object RewritePredicateSubquery extends Rule[LogicalPlan] with PredicateHelper {
     }
   }
 
+  private def dedupJoin(joinPlan: Join): Join = joinPlan match {
+    // SPARK-21835: It is possibly that the two sides of the join have conflicting attributes,
+    // the produced join then becomes unresolved and break structural integrity. We should
+    // de-duplicate conflicting attributes. We don't use transformation here because we only
+    // care about the most top join converted from correlated predicate subquery.
+    case j @ Join(left, right, joinType @ (LeftSemi | LeftAnti), joinCond) =>
+      val duplicates = right.outputSet.intersect(left.outputSet)
+      if (duplicates.nonEmpty) {
+        val aliasMap = AttributeMap(duplicates.map { dup =>
+          dup -> Alias(dup, dup.toString)()
+        }.toSeq)
+        val aliasedExpressions = right.output.map { ref =>
+          aliasMap.getOrElse(ref, ref)
+        }
+        val newRight = Project(aliasedExpressions, right)
+        val newJoinCond = joinCond.map { condExpr =>
+          condExpr transform {
+            case a: Attribute => aliasMap.getOrElse(a, a).toAttribute
+          }
+        }
+        Join(left, newRight, joinType, newJoinCond)
+      } else {
+        j
+      }
+    case _ => joinPlan
+  }
+
   def apply(plan: LogicalPlan): LogicalPlan = plan transform {
     case Filter(condition, child) =>
       val (withSubquery, withoutSubquery) =
@@ -64,14 +91,17 @@ object RewritePredicateSubquery extends Rule[LogicalPlan] with PredicateHelper {
       withSubquery.foldLeft(newFilter) {
         case (p, Exists(sub, conditions, _)) =>
           val (joinCond, outerPlan) = rewriteExistentialExpr(conditions, p)
-          Join(outerPlan, sub, LeftSemi, joinCond)
+          // Deduplicate conflicting attributes if any.
+          dedupJoin(Join(outerPlan, sub, LeftSemi, joinCond))
         case (p, Not(Exists(sub, conditions, _))) =>
           val (joinCond, outerPlan) = rewriteExistentialExpr(conditions, p)
-          Join(outerPlan, sub, LeftAnti, joinCond)
+          // Deduplicate conflicting attributes if any.
+          dedupJoin(Join(outerPlan, sub, LeftAnti, joinCond))
         case (p, In(value, Seq(ListQuery(sub, conditions, _, _)))) =>
           val inConditions = getValueExpression(value).zip(sub.output).map(EqualTo.tupled)
           val (joinCond, outerPlan) = rewriteExistentialExpr(inConditions ++ conditions, p)
-          Join(outerPlan, sub, LeftSemi, joinCond)
+          // Deduplicate conflicting attributes if any.
+          dedupJoin(Join(outerPlan, sub, LeftSemi, joinCond))
         case (p, Not(In(value, Seq(ListQuery(sub, conditions, _, _))))) =>
           // This is a NULL-aware (left) anti join (NAAJ) e.g. col NOT IN expr
           // Construct the condition. A NULL in one of the conditions is regarded as a positive
@@ -93,7 +123,8 @@ object RewritePredicateSubquery extends Rule[LogicalPlan] with PredicateHelper {
           // will have the final conditions in the LEFT ANTI as
           // (A.A1 = B.B1 OR ISNULL(A.A1 = B.B1)) AND (B.B2 = A.A2)
           val pairs = (joinConds.map(c => Or(c, IsNull(c))) ++ conditions).reduceLeft(And)
-          Join(outerPlan, sub, LeftAnti, Option(pairs))
+          // Deduplicate conflicting attributes if any.
+          dedupJoin(Join(outerPlan, sub, LeftAnti, Option(pairs)))
         case (p, predicate) =>
           val (newCond, inputPlan) = rewriteExistentialExpr(Seq(predicate), p)
           Project(p.output, Filter(newCond.get, inputPlan))

http://git-wip-us.apache.org/repos/asf/spark/blob/f2e22aeb/sql/core/src/test/scala/org/apache/spark/sql/SubquerySuite.scala
----------------------------------------------------------------------
diff --git a/sql/core/src/test/scala/org/apache/spark/sql/SubquerySuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/SubquerySuite.scala
index 274694b..ee6905e 100644
--- a/sql/core/src/test/scala/org/apache/spark/sql/SubquerySuite.scala
+++ b/sql/core/src/test/scala/org/apache/spark/sql/SubquerySuite.scala
@@ -17,6 +17,7 @@
 
 package org.apache.spark.sql
 
+import org.apache.spark.sql.catalyst.plans.logical.Join
 import org.apache.spark.sql.test.SharedSQLContext
 
 class SubquerySuite extends QueryTest with SharedSQLContext {
@@ -875,4 +876,66 @@ class SubquerySuite extends QueryTest with SharedSQLContext {
       assert(e.message.contains("cannot resolve '`a`' given input columns: [t.i, t.j]"))
     }
   }
+
+  test("SPARK-21835: Join in correlated subquery should be duplicateResolved: case 1") {
+    withTable("t1") {
+      withTempPath { path =>
+        Seq(1 -> "a").toDF("i", "j").write.parquet(path.getCanonicalPath)
+        sql(s"CREATE TABLE t1 USING parquet LOCATION '${path.toURI}'")
+
+        val sqlText =
+          """
+            |SELECT * FROM t1
+            |WHERE
+            |NOT EXISTS (SELECT * FROM t1)
+          """.stripMargin
+        val optimizedPlan = sql(sqlText).queryExecution.optimizedPlan
+        val join = optimizedPlan.collectFirst { case j: Join => j }.get
+        assert(join.duplicateResolved)
+        assert(optimizedPlan.resolved)
+      }
+    }
+  }
+
+  test("SPARK-21835: Join in correlated subquery should be duplicateResolved: case 2") {
+    withTable("t1", "t2", "t3") {
+      withTempPath { path =>
+        val data = Seq((1, 1, 1), (2, 0, 2))
+
+        data.toDF("t1a", "t1b", "t1c").write.parquet(path.getCanonicalPath + "/t1")
+        data.toDF("t2a", "t2b", "t2c").write.parquet(path.getCanonicalPath + "/t2")
+        data.toDF("t3a", "t3b", "t3c").write.parquet(path.getCanonicalPath + "/t3")
+
+        sql(s"CREATE TABLE t1 USING parquet LOCATION '${path.toURI}/t1'")
+        sql(s"CREATE TABLE t2 USING parquet LOCATION '${path.toURI}/t2'")
+        sql(s"CREATE TABLE t3 USING parquet LOCATION '${path.toURI}/t3'")
+
+        val sqlText =
+          s"""
+             |SELECT *
+             |FROM   (SELECT *
+             |        FROM   t2
+             |        WHERE  t2c IN (SELECT t1c
+             |                       FROM   t1
+             |                       WHERE  t1a = t2a)
+             |        UNION
+             |        SELECT *
+             |        FROM   t3
+             |        WHERE  t3a IN (SELECT t2a
+             |                       FROM   t2
+             |                       UNION ALL
+             |                       SELECT t1a
+             |                       FROM   t1
+             |                       WHERE  t1b > 0)) t4
+             |WHERE  t4.t2b IN (SELECT Min(t3b)
+             |                          FROM   t3
+             |                          WHERE  t4.t2a = t3a)
+           """.stripMargin
+        val optimizedPlan = sql(sqlText).queryExecution.optimizedPlan
+        val joinNodes = optimizedPlan.collect { case j: Join => j }
+        joinNodes.foreach(j => assert(j.duplicateResolved))
+        assert(optimizedPlan.resolved)
+      }
+    }
+  }
 }


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@spark.apache.org
For additional commands, e-mail: commits-help@spark.apache.org