You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@spark.apache.org by do...@apache.org on 2019/01/05 12:43:53 UTC

[spark] branch branch-2.3 updated: [SPARK-26078][SQL][BACKPORT-2.3] Dedup self-join attributes on IN subqueries

This is an automated email from the ASF dual-hosted git repository.

dongjoon pushed a commit to branch branch-2.3
in repository https://gitbox.apache.org/repos/asf/spark.git


The following commit(s) were added to refs/heads/branch-2.3 by this push:
     new d618d27  [SPARK-26078][SQL][BACKPORT-2.3] Dedup self-join attributes on IN subqueries
d618d27 is described below

commit d618d27ac86b17659ec9cb5b3fa5d14612542ee8
Author: Marco Gaido <ma...@gmail.com>
AuthorDate: Sat Jan 5 04:43:38 2019 -0800

    [SPARK-26078][SQL][BACKPORT-2.3] Dedup self-join attributes on IN subqueries
    
    ## What changes were proposed in this pull request?
    
    When there is a self-join as result of a IN subquery, the join condition may be invalid, resulting in trivially true predicates and return wrong results.
    
    The PR deduplicates the subquery output in order to avoid the issue.
    
    ## How was this patch tested?
    
    added UT
    
    Closes #23450 from mgaido91/SPARK-26078_2.3.
    
    Authored-by: Marco Gaido <ma...@gmail.com>
    Signed-off-by: Dongjoon Hyun <do...@apache.org>
---
 .../spark/sql/catalyst/optimizer/subquery.scala    | 99 ++++++++++++++--------
 .../scala/org/apache/spark/sql/SubquerySuite.scala | 36 ++++++++
 2 files changed, 98 insertions(+), 37 deletions(-)

diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/optimizer/subquery.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/optimizer/subquery.scala
index 709db6d..d87f599 100644
--- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/optimizer/subquery.scala
+++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/optimizer/subquery.scala
@@ -19,6 +19,7 @@ package org.apache.spark.sql.catalyst.optimizer
 
 import scala.collection.mutable.ArrayBuffer
 
+import org.apache.spark.sql.AnalysisException
 import org.apache.spark.sql.catalyst.expressions._
 import org.apache.spark.sql.catalyst.expressions.SubExprUtils._
 import org.apache.spark.sql.catalyst.expressions.aggregate._
@@ -49,31 +50,52 @@ object RewritePredicateSubquery extends Rule[LogicalPlan] with PredicateHelper {
     }
   }
 
-  private def dedupJoin(joinPlan: LogicalPlan): LogicalPlan = joinPlan match {
+  private def buildJoin(
+      outerPlan: LogicalPlan,
+      subplan: LogicalPlan,
+      joinType: JoinType,
+      condition: Option[Expression]): Join = {
+    // Deduplicate conflicting attributes if any.
+    val dedupSubplan = dedupSubqueryOnSelfJoin(outerPlan, subplan, None, condition)
+    Join(outerPlan, dedupSubplan, joinType, condition)
+  }
+
+  private def dedupSubqueryOnSelfJoin(
+      outerPlan: LogicalPlan,
+      subplan: LogicalPlan,
+      valuesOpt: Option[Seq[Expression]],
+      condition: Option[Expression] = None): LogicalPlan = {
     // SPARK-21835: It is possibly that the two sides of the join have conflicting attributes,
     // the produced join then becomes unresolved and break structural integrity. We should
-    // de-duplicate conflicting attributes. We don't use transformation here because we only
-    // care about the most top join converted from correlated predicate subquery.
-    case j @ Join(left, right, joinType @ (LeftSemi | LeftAnti | ExistenceJoin(_)), joinCond) =>
-      val duplicates = right.outputSet.intersect(left.outputSet)
-      if (duplicates.nonEmpty) {
-        val aliasMap = AttributeMap(duplicates.map { dup =>
-          dup -> Alias(dup, dup.toString)()
-        }.toSeq)
-        val aliasedExpressions = right.output.map { ref =>
-          aliasMap.getOrElse(ref, ref)
-        }
-        val newRight = Project(aliasedExpressions, right)
-        val newJoinCond = joinCond.map { condExpr =>
-          condExpr transform {
-            case a: Attribute => aliasMap.getOrElse(a, a).toAttribute
+    // de-duplicate conflicting attributes.
+    // SPARK-26078: it may also happen that the subquery has conflicting attributes with the outer
+    // values. In this case, the resulting join would contain trivially true conditions (eg.
+    // id#3 = id#3) which cannot be de-duplicated after. In this method, if there are conflicting
+    // attributes in the join condition, the subquery's conflicting attributes are changed using
+    // a projection which aliases them and resolves the problem.
+    val outerReferences = valuesOpt.map(values =>
+      AttributeSet(values.flatMap(_.references))).getOrElse(AttributeSet.empty)
+    val outerRefs = outerPlan.outputSet ++ outerReferences
+    val duplicates = outerRefs.intersect(subplan.outputSet)
+    if (duplicates.nonEmpty) {
+      condition.foreach { e =>
+          val conflictingAttrs = e.references.intersect(duplicates)
+          if (conflictingAttrs.nonEmpty) {
+            throw new AnalysisException("Found conflicting attributes " +
+              s"${conflictingAttrs.mkString(",")} in the condition joining outer plan:\n  " +
+              s"$outerPlan\nand subplan:\n  $subplan")
           }
-        }
-        Join(left, newRight, joinType, newJoinCond)
-      } else {
-        j
       }
-    case _ => joinPlan
+      val rewrites = AttributeMap(duplicates.map { dup =>
+        dup -> Alias(dup, dup.toString)()
+      }.toSeq)
+      val aliasedExpressions = subplan.output.map { ref =>
+        rewrites.getOrElse(ref, ref)
+      }
+      Project(aliasedExpressions, subplan)
+    } else {
+      subplan
+    }
   }
 
   def apply(plan: LogicalPlan): LogicalPlan = plan transform {
@@ -91,17 +113,17 @@ object RewritePredicateSubquery extends Rule[LogicalPlan] with PredicateHelper {
       withSubquery.foldLeft(newFilter) {
         case (p, Exists(sub, conditions, _)) =>
           val (joinCond, outerPlan) = rewriteExistentialExpr(conditions, p)
-          // Deduplicate conflicting attributes if any.
-          dedupJoin(Join(outerPlan, sub, LeftSemi, joinCond))
+          buildJoin(outerPlan, sub, LeftSemi, joinCond)
         case (p, Not(Exists(sub, conditions, _))) =>
           val (joinCond, outerPlan) = rewriteExistentialExpr(conditions, p)
-          // Deduplicate conflicting attributes if any.
-          dedupJoin(Join(outerPlan, sub, LeftAnti, joinCond))
+          buildJoin(outerPlan, sub, LeftAnti, joinCond)
         case (p, In(value, Seq(ListQuery(sub, conditions, _, _)))) =>
-          val inConditions = getValueExpression(value).zip(sub.output).map(EqualTo.tupled)
-          val (joinCond, outerPlan) = rewriteExistentialExpr(inConditions ++ conditions, p)
           // Deduplicate conflicting attributes if any.
-          dedupJoin(Join(outerPlan, sub, LeftSemi, joinCond))
+          val values = getValueExpression(value)
+          val newSub = dedupSubqueryOnSelfJoin(p, sub, Some(values))
+          val inConditions = values.zip(newSub.output).map(EqualTo.tupled)
+          val (joinCond, outerPlan) = rewriteExistentialExpr(inConditions ++ conditions, p)
+          Join(outerPlan, newSub, LeftSemi, joinCond)
         case (p, Not(In(value, Seq(ListQuery(sub, conditions, _, _))))) =>
           // This is a NULL-aware (left) anti join (NAAJ) e.g. col NOT IN expr
           // Construct the condition. A NULL in one of the conditions is regarded as a positive
@@ -109,7 +131,10 @@ object RewritePredicateSubquery extends Rule[LogicalPlan] with PredicateHelper {
 
           // Note that will almost certainly be planned as a Broadcast Nested Loop join.
           // Use EXISTS if performance matters to you.
-          val inConditions = getValueExpression(value).zip(sub.output).map(EqualTo.tupled)
+          val values = getValueExpression(value)
+          // Deduplicate conflicting attributes if any.
+          val newSub = dedupSubqueryOnSelfJoin(p, sub, Some(values))
+          val inConditions = values.zip(newSub.output).map(EqualTo.tupled)
           val (joinCond, outerPlan) = rewriteExistentialExpr(inConditions, p)
           // Expand the NOT IN expression with the NULL-aware semantic
           // to its full form. That is from:
@@ -123,8 +148,7 @@ object RewritePredicateSubquery extends Rule[LogicalPlan] with PredicateHelper {
           // will have the final conditions in the LEFT ANTI as
           // (A.A1 = B.B1 OR ISNULL(A.A1 = B.B1)) AND (B.B2 = A.A2)
           val pairs = (joinConds.map(c => Or(c, IsNull(c))) ++ conditions).reduceLeft(And)
-          // Deduplicate conflicting attributes if any.
-          dedupJoin(Join(outerPlan, sub, LeftAnti, Option(pairs)))
+          Join(outerPlan, newSub, LeftAnti, Option(pairs))
         case (p, predicate) =>
           val (newCond, inputPlan) = rewriteExistentialExpr(Seq(predicate), p)
           Project(p.output, Filter(newCond.get, inputPlan))
@@ -145,16 +169,17 @@ object RewritePredicateSubquery extends Rule[LogicalPlan] with PredicateHelper {
       e transformUp {
         case Exists(sub, conditions, _) =>
           val exists = AttributeReference("exists", BooleanType, nullable = false)()
-          // Deduplicate conflicting attributes if any.
-          newPlan = dedupJoin(
-            Join(newPlan, sub, ExistenceJoin(exists), conditions.reduceLeftOption(And)))
+          newPlan =
+            buildJoin(newPlan, sub, ExistenceJoin(exists), conditions.reduceLeftOption(And))
           exists
         case In(value, Seq(ListQuery(sub, conditions, _, _))) =>
           val exists = AttributeReference("exists", BooleanType, nullable = false)()
-          val inConditions = getValueExpression(value).zip(sub.output).map(EqualTo.tupled)
-          val newConditions = (inConditions ++ conditions).reduceLeftOption(And)
+          val values = getValueExpression(value)
           // Deduplicate conflicting attributes if any.
-          newPlan = dedupJoin(Join(newPlan, sub, ExistenceJoin(exists), newConditions))
+          val newSub = dedupSubqueryOnSelfJoin(newPlan, sub, Some(values))
+          val inConditions = values.zip(newSub.output).map(EqualTo.tupled)
+          val newConditions = (inConditions ++ conditions).reduceLeftOption(And)
+          newPlan = Join(newPlan, newSub, ExistenceJoin(exists), newConditions)
           exists
       }
     }
diff --git a/sql/core/src/test/scala/org/apache/spark/sql/SubquerySuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/SubquerySuite.scala
index acef62d..74c17ba 100644
--- a/sql/core/src/test/scala/org/apache/spark/sql/SubquerySuite.scala
+++ b/sql/core/src/test/scala/org/apache/spark/sql/SubquerySuite.scala
@@ -970,4 +970,40 @@ class SubquerySuite extends QueryTest with SharedSQLContext {
         Row("3", "b") :: Row("4", "b") :: Nil)
     }
   }
+
+  test("SPARK-26078: deduplicate fake self joins for IN subqueries") {
+    withTempView("a", "b") {
+      Seq("a" -> 2, "b" -> 1).toDF("id", "num").createTempView("a")
+      Seq("a" -> 2, "b" -> 1).toDF("id", "num").createTempView("b")
+
+      val df1 = spark.sql(
+        """
+          |SELECT id,num,source FROM (
+          |  SELECT id, num, 'a' as source FROM a
+          |  UNION ALL
+          |  SELECT id, num, 'b' as source FROM b
+          |) AS c WHERE c.id IN (SELECT id FROM b WHERE num = 2)
+        """.stripMargin)
+      checkAnswer(df1, Seq(Row("a", 2, "a"), Row("a", 2, "b")))
+      val df2 = spark.sql(
+        """
+          |SELECT id,num,source FROM (
+          |  SELECT id, num, 'a' as source FROM a
+          |  UNION ALL
+          |  SELECT id, num, 'b' as source FROM b
+          |) AS c WHERE c.id NOT IN (SELECT id FROM b WHERE num = 2)
+        """.stripMargin)
+      checkAnswer(df2, Seq(Row("b", 1, "a"), Row("b", 1, "b")))
+      val df3 = spark.sql(
+        """
+          |SELECT id,num,source FROM (
+          |  SELECT id, num, 'a' as source FROM a
+          |  UNION ALL
+          |  SELECT id, num, 'b' as source FROM b
+          |) AS c WHERE c.id IN (SELECT id FROM b WHERE num = 2) OR
+          |c.id IN (SELECT id FROM b WHERE num = 3)
+        """.stripMargin)
+      checkAnswer(df3, Seq(Row("a", 2, "a"), Row("a", 2, "b")))
+    }
+  }
 }


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@spark.apache.org
For additional commands, e-mail: commits-help@spark.apache.org