You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@spark.apache.org by hv...@apache.org on 2016/11/24 20:08:04 UTC

spark git commit: [SPARK-18578][SQL] Full outer join in correlated subquery returns incorrect results

Repository: spark
Updated Branches:
  refs/heads/master 2dfabec38 -> a367d5ff0


[SPARK-18578][SQL] Full outer join in correlated subquery returns incorrect results

## What changes were proposed in this pull request?

- Raise Analysis exception when correlated predicates exist in the descendant operators of either operand of a Full outer join in a subquery as well as in a FOJ operator itself
- Raise Analysis exception when correlated predicates exists in a Window operator (a side effect inadvertently introduced by SPARK-17348)

## How was this patch tested?

Run sql/test catalyst/test and new test cases, added to SubquerySuite, showing the reported incorrect results.

Author: Nattavut Sutyanyong <ns...@gmail.com>

Closes #16005 from nsyca/FOJ-incorrect.1.


Project: http://git-wip-us.apache.org/repos/asf/spark/repo
Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/a367d5ff
Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/a367d5ff
Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/a367d5ff

Branch: refs/heads/master
Commit: a367d5ff005884322fb8bb43a1cfa4d4bf54b31a
Parents: 2dfabec
Author: Nattavut Sutyanyong <ns...@gmail.com>
Authored: Thu Nov 24 12:07:55 2016 -0800
Committer: Herman van Hovell <hv...@databricks.com>
Committed: Thu Nov 24 12:07:55 2016 -0800

----------------------------------------------------------------------
 .../spark/sql/catalyst/analysis/Analyzer.scala  | 10 +++++
 .../org/apache/spark/sql/SubquerySuite.scala    | 45 ++++++++++++++++++++
 2 files changed, 55 insertions(+)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/spark/blob/a367d5ff/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/Analyzer.scala
----------------------------------------------------------------------
diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/Analyzer.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/Analyzer.scala
index 0155741..1db4449 100644
--- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/Analyzer.scala
+++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/Analyzer.scala
@@ -1076,6 +1076,10 @@ class Analyzer(
 
       // Simplify the predicates before pulling them out.
       val transformed = BooleanSimplification(sub) transformUp {
+        // WARNING:
+        // Only Filter can host correlated expressions at this time
+        // Anyone adding a new "case" below needs to add the call to
+        // "failOnOuterReference" to disallow correlated expressions in it.
         case f @ Filter(cond, child) =>
           // Find all predicates with an outer reference.
           val (correlated, local) = splitConjunctivePredicates(cond).partition(containsOuter)
@@ -1116,12 +1120,18 @@ class Analyzer(
             a
           }
         case w : Window =>
+          failOnOuterReference(w)
           failOnNonEqualCorrelatedPredicate(foundNonEqualCorrelatedPred, w)
           w
         case j @ Join(left, _, RightOuter, _) =>
           failOnOuterReference(j)
           failOnOuterReferenceInSubTree(left, "a RIGHT OUTER JOIN")
           j
+        // SPARK-18578: Do not allow any correlated predicate
+        // in a Full (Outer) Join operator and its descendants
+        case j @ Join(_, _, FullOuter, _) =>
+          failOnOuterReferenceInSubTree(j, "a FULL OUTER JOIN")
+          j
         case j @ Join(_, right, jt, _) if !jt.isInstanceOf[InnerLike] =>
           failOnOuterReference(j)
           failOnOuterReferenceInSubTree(right, "a LEFT (OUTER) JOIN")

http://git-wip-us.apache.org/repos/asf/spark/blob/a367d5ff/sql/core/src/test/scala/org/apache/spark/sql/SubquerySuite.scala
----------------------------------------------------------------------
diff --git a/sql/core/src/test/scala/org/apache/spark/sql/SubquerySuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/SubquerySuite.scala
index f1dd1c6..73a5394 100644
--- a/sql/core/src/test/scala/org/apache/spark/sql/SubquerySuite.scala
+++ b/sql/core/src/test/scala/org/apache/spark/sql/SubquerySuite.scala
@@ -744,4 +744,49 @@ class SubquerySuite extends QueryTest with SharedSQLContext {
       }
     }
   }
+  // This restriction applies to
+  // the permutation of { LOJ, ROJ, FOJ } x { EXISTS, IN, scalar subquery }
+  // where correlated predicates appears in right operand of LOJ,
+  // or in left operand of ROJ, or in either operand of FOJ.
+  // The test cases below cover the representatives of the patterns
+  test("Correlated subqueries in outer joins") {
+    withTempView("t1", "t2", "t3") {
+      Seq(1).toDF("c1").createOrReplaceTempView("t1")
+      Seq(2).toDF("c1").createOrReplaceTempView("t2")
+      Seq(1).toDF("c1").createOrReplaceTempView("t3")
+
+      // Left outer join (LOJ) in IN subquery context
+      intercept[AnalysisException] {
+        sql(
+          """
+            | select t1.c1
+            | from   t1
+            | where  1 IN (select 1
+            |              from   t3 left outer join
+            |                     (select c1 from t2 where t1.c1 = 2) t2
+            |                     on t2.c1 = t3.c1)""".stripMargin).collect()
+      }
+      // Right outer join (ROJ) in EXISTS subquery context
+      intercept[AnalysisException] {
+        sql(
+          """
+            | select t1.c1
+            | from   t1
+            | where  exists (select 1
+            |                from   (select c1 from t2 where t1.c1 = 2) t2
+            |                       right outer join t3
+            |                       on t2.c1 = t3.c1)""".stripMargin).collect()
+      }
+      // SPARK-18578: Full outer join (FOJ) in scalar subquery context
+      intercept[AnalysisException] {
+        sql(
+          """
+            | select (select max(1)
+            |         from   (select c1 from  t2 where t1.c1 = 2 and t1.c1=t2.c1) t2
+            |                full join t3
+            |                on t2.c1=t3.c1)
+            | from   t1""".stripMargin).collect()
+      }
+    }
+  }
 }


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@spark.apache.org
For additional commands, e-mail: commits-help@spark.apache.org