You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@spark.apache.org by hv...@apache.org on 2017/04/20 20:35:54 UTC

spark git commit: [SPARK-20334][SQL] Return a better error message when correlated predicates contain aggregate expression that has mixture of outer and local references.

Repository: spark
Updated Branches:
  refs/heads/master b2ebadfd5 -> d95e4d9d6


[SPARK-20334][SQL] Return a better error message when correlated predicates contain aggregate expression that has mixture of outer and local references.

## What changes were proposed in this pull request?
Address a follow up in [comment](https://github.com/apache/spark/pull/16954#discussion_r105718880)
Currently subqueries with correlated predicates containing aggregate expression having mixture of outer references and local references generate a codegen error like following :

```SQL
SELECT t1a
FROM   t1
GROUP  BY 1
HAVING EXISTS (SELECT 1
               FROM  t2
               WHERE t2a < min(t1a + t2a));
```
Exception snippet.
```
Cannot evaluate expression: min((input[0, int, false] + input[4, int, false]))
	at org.apache.spark.sql.catalyst.expressions.Unevaluable$class.doGenCode(Expression.scala:226)
	at org.apache.spark.sql.catalyst.expressions.aggregate.AggregateExpression.doGenCode(interfaces.scala:87)
	at org.apache.spark.sql.catalyst.expressions.Expression$$anonfun$genCode$2.apply(Expression.scala:106)
	at org.apache.spark.sql.catalyst.expressions.Expression$$anonfun$genCode$2.apply(Expression.scala:103)
	at scala.Option.getOrElse(Option.scala:121)
	at org.apache.spark.sql.catalyst.expressions.Expression.genCode(Expression.scala:103)

```
After this PR, a better error message is issued.
```
org.apache.spark.sql.AnalysisException
Error in query: Found an aggregate expression in a correlated
predicate that has both outer and local references, which is not supported yet.
Aggregate expression: min((t1.`t1a` + t2.`t2a`)),
Outer references: t1.`t1a`,
Local references: t2.`t2a`.;
```
## How was this patch tested?
Added tests in SQLQueryTestSuite.

Author: Dilip Biswal <db...@us.ibm.com>

Closes #17636 from dilipbiswal/subquery_followup1.


Project: http://git-wip-us.apache.org/repos/asf/spark/repo
Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/d95e4d9d
Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/d95e4d9d
Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/d95e4d9d

Branch: refs/heads/master
Commit: d95e4d9d6a9705c534549add6d4a73d554e47274
Parents: b2ebadf
Author: Dilip Biswal <db...@us.ibm.com>
Authored: Thu Apr 20 22:35:48 2017 +0200
Committer: Herman van Hovell <hv...@databricks.com>
Committed: Thu Apr 20 22:35:48 2017 +0200

----------------------------------------------------------------------
 .../spark/sql/catalyst/analysis/Analyzer.scala  | 49 +++++++---
 .../negative-cases/invalid-correlation.sql      | 74 ++++++++++-----
 .../negative-cases/invalid-correlation.sql.out  | 96 +++++++++++++++-----
 .../org/apache/spark/sql/SubquerySuite.scala    | 23 ++++-
 4 files changed, 181 insertions(+), 61 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/spark/blob/d95e4d9d/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/Analyzer.scala
----------------------------------------------------------------------
diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/Analyzer.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/Analyzer.scala
index 175bfb3..eafeb4a 100644
--- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/Analyzer.scala
+++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/Analyzer.scala
@@ -1204,6 +1204,28 @@ class Analyzer(
     private def checkAndGetOuterReferences(sub: LogicalPlan): Seq[Expression] = {
       val outerReferences = ArrayBuffer.empty[Expression]
 
+      // Validate that correlated aggregate expression do not contain a mixture
+      // of outer and local references.
+      def checkMixedReferencesInsideAggregateExpr(expr: Expression): Unit = {
+        expr.foreach {
+          case a: AggregateExpression if containsOuter(a) =>
+            val outer = a.collect { case OuterReference(e) => e.toAttribute }
+            val local = a.references -- outer
+            if (local.nonEmpty) {
+              val msg =
+                s"""
+                   |Found an aggregate expression in a correlated predicate that has both
+                   |outer and local references, which is not supported yet.
+                   |Aggregate expression: ${SubExprUtils.stripOuterReference(a).sql},
+                   |Outer references: ${outer.map(_.sql).mkString(", ")},
+                   |Local references: ${local.map(_.sql).mkString(", ")}.
+                 """.stripMargin.replace("\n", " ").trim()
+              failAnalysis(msg)
+            }
+          case _ =>
+        }
+      }
+
       // Make sure a plan's subtree does not contain outer references
       def failOnOuterReferenceInSubTree(p: LogicalPlan): Unit = {
         if (hasOuterReferences(p)) {
@@ -1211,9 +1233,12 @@ class Analyzer(
         }
       }
 
-      // Make sure a plan's expressions do not contain outer references
-      def failOnOuterReference(p: LogicalPlan): Unit = {
-        if (p.expressions.exists(containsOuter)) {
+      // Make sure a plan's expressions do not contain :
+      // 1. Aggregate expressions that have mixture of outer and local references.
+      // 2. Expressions containing outer references on plan nodes other than Filter.
+      def failOnInvalidOuterReference(p: LogicalPlan): Unit = {
+        p.expressions.foreach(checkMixedReferencesInsideAggregateExpr)
+        if (!p.isInstanceOf[Filter] && p.expressions.exists(containsOuter)) {
           failAnalysis(
             "Expressions referencing the outer query are not supported outside of WHERE/HAVING " +
               s"clauses:\n$p")
@@ -1283,9 +1308,9 @@ class Analyzer(
         // These operators can be anywhere in a correlated subquery.
         // so long as they do not host outer references in the operators.
         case s: Sort =>
-          failOnOuterReference(s)
+          failOnInvalidOuterReference(s)
         case r: RepartitionByExpression =>
-          failOnOuterReference(r)
+          failOnInvalidOuterReference(r)
 
         // Category 3:
         // Filter is one of the two operators allowed to host correlated expressions.
@@ -1299,6 +1324,8 @@ class Analyzer(
             case _: EqualTo | _: EqualNullSafe => false
             case _ => true
           }
+
+          failOnInvalidOuterReference(f)
           // The aggregate expressions are treated in a special way by getOuterReferences. If the
           // aggregate expression contains only outer reference attributes then the entire aggregate
           // expression is isolated as an OuterReference.
@@ -1308,7 +1335,7 @@ class Analyzer(
         // Project cannot host any correlated expressions
         // but can be anywhere in a correlated subquery.
         case p: Project =>
-          failOnOuterReference(p)
+          failOnInvalidOuterReference(p)
 
         // Aggregate cannot host any correlated expressions
         // It can be on a correlation path if the correlation contains
@@ -1316,7 +1343,7 @@ class Analyzer(
         // It cannot be on a correlation path if the correlation has
         // non-equality correlated predicates.
         case a: Aggregate =>
-          failOnOuterReference(a)
+          failOnInvalidOuterReference(a)
           failOnNonEqualCorrelatedPredicate(foundNonEqualCorrelatedPred, a)
 
         // Join can host correlated expressions.
@@ -1324,7 +1351,7 @@ class Analyzer(
           joinType match {
             // Inner join, like Filter, can be anywhere.
             case _: InnerLike =>
-              failOnOuterReference(j)
+              failOnInvalidOuterReference(j)
 
             // Left outer join's right operand cannot be on a correlation path.
             // LeftAnti and ExistenceJoin are special cases of LeftOuter.
@@ -1335,12 +1362,12 @@ class Analyzer(
             // Any correlated references in the subplan
             // of the right operand cannot be pulled up.
             case LeftOuter | LeftSemi | LeftAnti | ExistenceJoin(_) =>
-              failOnOuterReference(j)
+              failOnInvalidOuterReference(j)
               failOnOuterReferenceInSubTree(right)
 
             // Likewise, Right outer join's left operand cannot be on a correlation path.
             case RightOuter =>
-              failOnOuterReference(j)
+              failOnInvalidOuterReference(j)
               failOnOuterReferenceInSubTree(left)
 
             // Any other join types not explicitly listed above,
@@ -1356,7 +1383,7 @@ class Analyzer(
         // Note:
         // Generator with join=false is treated as Category 4.
         case g: Generate if g.join =>
-          failOnOuterReference(g)
+          failOnInvalidOuterReference(g)
 
         // Category 4: Any other operators not in the above 3 categories
         // cannot be on a correlation path, that is they are allowed only

http://git-wip-us.apache.org/repos/asf/spark/blob/d95e4d9d/sql/core/src/test/resources/sql-tests/inputs/subquery/negative-cases/invalid-correlation.sql
----------------------------------------------------------------------
diff --git a/sql/core/src/test/resources/sql-tests/inputs/subquery/negative-cases/invalid-correlation.sql b/sql/core/src/test/resources/sql-tests/inputs/subquery/negative-cases/invalid-correlation.sql
index cf93c5a..e22cade 100644
--- a/sql/core/src/test/resources/sql-tests/inputs/subquery/negative-cases/invalid-correlation.sql
+++ b/sql/core/src/test/resources/sql-tests/inputs/subquery/negative-cases/invalid-correlation.sql
@@ -1,42 +1,72 @@
 -- The test file contains negative test cases
 -- of invalid queries where error messages are expected.
 
-create temporary view t1 as select * from values
+CREATE TEMPORARY VIEW t1 AS SELECT * FROM VALUES
   (1, 2, 3)
-as t1(t1a, t1b, t1c);
+AS t1(t1a, t1b, t1c);
 
-create temporary view t2 as select * from values
+CREATE TEMPORARY VIEW t2 AS SELECT * FROM VALUES
   (1, 0, 1)
-as t2(t2a, t2b, t2c);
+AS t2(t2a, t2b, t2c);
 
-create temporary view t3 as select * from values
+CREATE TEMPORARY VIEW t3 AS SELECT * FROM VALUES
   (3, 1, 2)
-as t3(t3a, t3b, t3c);
+AS t3(t3a, t3b, t3c);
 
 -- TC 01.01
 -- The column t2b in the SELECT of the subquery is invalid
 -- because it is neither an aggregate function nor a GROUP BY column.
-select t1a, t2b
-from   t1, t2
-where  t1b = t2c
-and    t2b = (select max(avg)
-              from   (select   t2b, avg(t2b) avg
-                      from     t2
-                      where    t2a = t1.t1b
+SELECT t1a, t2b
+FROM   t1, t2
+WHERE  t1b = t2c
+AND    t2b = (SELECT max(avg)
+              FROM   (SELECT   t2b, avg(t2b) avg
+                      FROM     t2
+                      WHERE    t2a = t1.t1b
                      )
              )
 ;
 
 -- TC 01.02
 -- Invalid due to the column t2b not part of the output from table t2.
-select *
-from   t1
-where  t1a in (select   min(t2a)
-               from     t2
-               group by t2c
-               having   t2c in (select   max(t3c)
-                                from     t3
-                                group by t3b
-                                having   t3b > t2b ))
+SELECT *
+FROM   t1
+WHERE  t1a IN (SELECT   min(t2a)
+               FROM     t2
+               GROUP BY t2c
+               HAVING   t2c IN (SELECT   max(t3c)
+                                FROM     t3
+                                GROUP BY t3b
+                                HAVING   t3b > t2b ))
 ;
 
+-- TC 01.03
+-- Invalid due to mixure of outer and local references under an AggegatedExpression 
+-- in a correlated predicate
+SELECT t1a 
+FROM   t1
+GROUP  BY 1
+HAVING EXISTS (SELECT 1 
+               FROM  t2
+               WHERE t2a < min(t1a + t2a));
+
+-- TC 01.04
+-- Invalid due to mixure of outer and local references under an AggegatedExpression 
+SELECT t1a 
+FROM   t1
+WHERE  t1a IN (SELECT t2a 
+               FROM   t2
+               WHERE  EXISTS (SELECT 1 
+                              FROM   t3
+                              GROUP BY 1
+                              HAVING min(t2a + t3a) > 1));
+
+-- TC 01.05
+-- Invalid due to outer reference appearing in projection list
+SELECT t1a 
+FROM   t1
+WHERE  t1a IN (SELECT t2a 
+               FROM   t2
+               WHERE  EXISTS (SELECT min(t2a) 
+                              FROM   t3));
+

http://git-wip-us.apache.org/repos/asf/spark/blob/d95e4d9d/sql/core/src/test/resources/sql-tests/results/subquery/negative-cases/invalid-correlation.sql.out
----------------------------------------------------------------------
diff --git a/sql/core/src/test/resources/sql-tests/results/subquery/negative-cases/invalid-correlation.sql.out b/sql/core/src/test/resources/sql-tests/results/subquery/negative-cases/invalid-correlation.sql.out
index f7bbb35..e4b1a2d 100644
--- a/sql/core/src/test/resources/sql-tests/results/subquery/negative-cases/invalid-correlation.sql.out
+++ b/sql/core/src/test/resources/sql-tests/results/subquery/negative-cases/invalid-correlation.sql.out
@@ -1,11 +1,11 @@
 -- Automatically generated by SQLQueryTestSuite
--- Number of queries: 5
+-- Number of queries: 8
 
 
 -- !query 0
-create temporary view t1 as select * from values
+CREATE TEMPORARY VIEW t1 AS SELECT * FROM VALUES
   (1, 2, 3)
-as t1(t1a, t1b, t1c)
+AS t1(t1a, t1b, t1c)
 -- !query 0 schema
 struct<>
 -- !query 0 output
@@ -13,9 +13,9 @@ struct<>
 
 
 -- !query 1
-create temporary view t2 as select * from values
+CREATE TEMPORARY VIEW t2 AS SELECT * FROM VALUES
   (1, 0, 1)
-as t2(t2a, t2b, t2c)
+AS t2(t2a, t2b, t2c)
 -- !query 1 schema
 struct<>
 -- !query 1 output
@@ -23,9 +23,9 @@ struct<>
 
 
 -- !query 2
-create temporary view t3 as select * from values
+CREATE TEMPORARY VIEW t3 AS SELECT * FROM VALUES
   (3, 1, 2)
-as t3(t3a, t3b, t3c)
+AS t3(t3a, t3b, t3c)
 -- !query 2 schema
 struct<>
 -- !query 2 output
@@ -33,13 +33,13 @@ struct<>
 
 
 -- !query 3
-select t1a, t2b
-from   t1, t2
-where  t1b = t2c
-and    t2b = (select max(avg)
-              from   (select   t2b, avg(t2b) avg
-                      from     t2
-                      where    t2a = t1.t1b
+SELECT t1a, t2b
+FROM   t1, t2
+WHERE  t1b = t2c
+AND    t2b = (SELECT max(avg)
+              FROM   (SELECT   t2b, avg(t2b) avg
+                      FROM     t2
+                      WHERE    t2a = t1.t1b
                      )
              )
 -- !query 3 schema
@@ -50,17 +50,67 @@ grouping expressions sequence is empty, and 't2.`t2b`' is not an aggregate funct
 
 
 -- !query 4
-select *
-from   t1
-where  t1a in (select   min(t2a)
-               from     t2
-               group by t2c
-               having   t2c in (select   max(t3c)
-                                from     t3
-                                group by t3b
-                                having   t3b > t2b ))
+SELECT *
+FROM   t1
+WHERE  t1a IN (SELECT   min(t2a)
+               FROM     t2
+               GROUP BY t2c
+               HAVING   t2c IN (SELECT   max(t3c)
+                                FROM     t3
+                                GROUP BY t3b
+                                HAVING   t3b > t2b ))
 -- !query 4 schema
 struct<>
 -- !query 4 output
 org.apache.spark.sql.AnalysisException
 resolved attribute(s) t2b#x missing from min(t2a)#x,t2c#x in operator !Filter t2c#x IN (list#x [t2b#x]);
+
+
+-- !query 5
+SELECT t1a 
+FROM   t1
+GROUP  BY 1
+HAVING EXISTS (SELECT 1 
+               FROM  t2
+               WHERE t2a < min(t1a + t2a))
+-- !query 5 schema
+struct<>
+-- !query 5 output
+org.apache.spark.sql.AnalysisException
+Found an aggregate expression in a correlated predicate that has both outer and local references, which is not supported yet. Aggregate expression: min((t1.`t1a` + t2.`t2a`)), Outer references: t1.`t1a`, Local references: t2.`t2a`.;
+
+
+-- !query 6
+SELECT t1a 
+FROM   t1
+WHERE  t1a IN (SELECT t2a 
+               FROM   t2
+               WHERE  EXISTS (SELECT 1 
+                              FROM   t3
+                              GROUP BY 1
+                              HAVING min(t2a + t3a) > 1))
+-- !query 6 schema
+struct<>
+-- !query 6 output
+org.apache.spark.sql.AnalysisException
+Found an aggregate expression in a correlated predicate that has both outer and local references, which is not supported yet. Aggregate expression: min((t2.`t2a` + t3.`t3a`)), Outer references: t2.`t2a`, Local references: t3.`t3a`.;
+
+
+-- !query 7
+SELECT t1a 
+FROM   t1
+WHERE  t1a IN (SELECT t2a 
+               FROM   t2
+               WHERE  EXISTS (SELECT min(t2a) 
+                              FROM   t3))
+-- !query 7 schema
+struct<>
+-- !query 7 output
+org.apache.spark.sql.AnalysisException
+Expressions referencing the outer query are not supported outside of WHERE/HAVING clauses:
+Aggregate [min(outer(t2a#x)) AS min(outer())#x]
++- SubqueryAlias t3
+   +- Project [t3a#x, t3b#x, t3c#x]
+      +- SubqueryAlias t3
+         +- LocalRelation [t3a#x, t3b#x, t3c#x]
+;

http://git-wip-us.apache.org/repos/asf/spark/blob/d95e4d9d/sql/core/src/test/scala/org/apache/spark/sql/SubquerySuite.scala
----------------------------------------------------------------------
diff --git a/sql/core/src/test/scala/org/apache/spark/sql/SubquerySuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/SubquerySuite.scala
index 0f0199c..131abf7 100644
--- a/sql/core/src/test/scala/org/apache/spark/sql/SubquerySuite.scala
+++ b/sql/core/src/test/scala/org/apache/spark/sql/SubquerySuite.scala
@@ -822,12 +822,25 @@ class SubquerySuite extends QueryTest with SharedSQLContext {
       checkAnswer(
         sql(
           """
-          | select c2
-          | from t1
-          | where exists (select *
-          |               from t2 lateral view explode(arr_c2) q as c2
-                          where t1.c1 = t2.c1)""".stripMargin),
+          | SELECT c2
+          | FROM t1
+          | WHERE EXISTS (SELECT *
+          |               FROM t2 LATERAL VIEW explode(arr_c2) q AS c2
+                          WHERE t1.c1 = t2.c1)""".stripMargin),
         Row(1) :: Row(0) :: Nil)
+
+      val msg1 = intercept[AnalysisException] {
+        sql(
+          """
+            | SELECT c1
+            | FROM t2
+            | WHERE EXISTS (SELECT *
+            |               FROM t1 LATERAL VIEW explode(t2.arr_c2) q AS c2
+            |               WHERE t1.c1 = t2.c1)
+          """.stripMargin)
+      }
+      assert(msg1.getMessage.contains(
+        "Expressions referencing the outer query are not supported outside of WHERE/HAVING"))
     }
   }
 


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@spark.apache.org
For additional commands, e-mail: commits-help@spark.apache.org