You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@spark.apache.org by li...@apache.org on 2018/01/17 01:57:38 UTC

spark git commit: [SPARK-23095][SQL] Decorrelation of scalar subquery fails with java.util.NoSuchElementException

Repository: spark
Updated Branches:
  refs/heads/master 5ae333391 -> 0c2ba427b


[SPARK-23095][SQL] Decorrelation of scalar subquery fails with java.util.NoSuchElementException

## What changes were proposed in this pull request?
The following SQL involving scalar correlated query returns a map exception.
``` SQL
SELECT t1a
FROM   t1
WHERE  t1a = (SELECT   count(*)
              FROM     t2
              WHERE    t2c = t1c
              HAVING   count(*) >= 1)
```
``` SQL
key not found: ExprId(278,786682bb-41f9-4bd5-a397-928272cc8e4e)
java.util.NoSuchElementException: key not found: ExprId(278,786682bb-41f9-4bd5-a397-928272cc8e4e)
        at scala.collection.MapLike$class.default(MapLike.scala:228)
        at scala.collection.AbstractMap.default(Map.scala:59)
        at scala.collection.MapLike$class.apply(MapLike.scala:141)
        at scala.collection.AbstractMap.apply(Map.scala:59)
        at org.apache.spark.sql.catalyst.optimizer.RewriteCorrelatedScalarSubquery$.org$apache$spark$sql$catalyst$optimizer$RewriteCorrelatedScalarSubquery$$evalSubqueryOnZeroTups(subquery.scala:378)
        at org.apache.spark.sql.catalyst.optimizer.RewriteCorrelatedScalarSubquery$$anonfun$org$apache$spark$sql$catalyst$optimizer$RewriteCorrelatedScalarSubquery$$constructLeftJoins$1.apply(subquery.scala:430)
        at org.apache.spark.sql.catalyst.optimizer.RewriteCorrelatedScalarSubquery$$anonfun$org$apache$spark$sql$catalyst$optimizer$RewriteCorrelatedScalarSubquery$$constructLeftJoins$1.apply(subquery.scala:426)
```

In this case, after evaluating the HAVING clause "count(*) > 1" statically
against the binding of aggregtation result on empty input, we determine
that this query will not have a the count bug. We should simply return
the evalSubqueryOnZeroTups with empty value.
(Please fill in changes proposed in this fix)

## How was this patch tested?
A new test was added in the Subquery bucket.

Author: Dilip Biswal <db...@us.ibm.com>

Closes #20283 from dilipbiswal/scalar-count-defect.


Project: http://git-wip-us.apache.org/repos/asf/spark/repo
Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/0c2ba427
Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/0c2ba427
Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/0c2ba427

Branch: refs/heads/master
Commit: 0c2ba427bc7323729e6ffb34f1f06a97f0bf0c1d
Parents: 5ae3333
Author: Dilip Biswal <db...@us.ibm.com>
Authored: Wed Jan 17 09:57:30 2018 +0800
Committer: gatorsmile <ga...@gmail.com>
Committed: Wed Jan 17 09:57:30 2018 +0800

----------------------------------------------------------------------
 .../spark/sql/catalyst/optimizer/subquery.scala |  5 +-
 .../scalar-subquery-predicate.sql               | 10 ++++
 .../scalar-subquery-predicate.sql.out           | 57 ++++++++++++--------
 3 files changed, 49 insertions(+), 23 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/spark/blob/0c2ba427/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/optimizer/subquery.scala
----------------------------------------------------------------------
diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/optimizer/subquery.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/optimizer/subquery.scala
index 2673bea..709db6d 100644
--- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/optimizer/subquery.scala
+++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/optimizer/subquery.scala
@@ -369,13 +369,14 @@ object RewriteCorrelatedScalarSubquery extends Rule[LogicalPlan] {
           case ne => (ne.exprId, evalAggOnZeroTups(ne))
         }.toMap
 
-      case _ => sys.error(s"Unexpected operator in scalar subquery: $lp")
+      case _ =>
+        sys.error(s"Unexpected operator in scalar subquery: $lp")
     }
 
     val resultMap = evalPlan(plan)
 
     // By convention, the scalar subquery result is the leftmost field.
-    resultMap(plan.output.head.exprId)
+    resultMap.getOrElse(plan.output.head.exprId, None)
   }
 
   /**

http://git-wip-us.apache.org/repos/asf/spark/blob/0c2ba427/sql/core/src/test/resources/sql-tests/inputs/subquery/scalar-subquery/scalar-subquery-predicate.sql
----------------------------------------------------------------------
diff --git a/sql/core/src/test/resources/sql-tests/inputs/subquery/scalar-subquery/scalar-subquery-predicate.sql b/sql/core/src/test/resources/sql-tests/inputs/subquery/scalar-subquery/scalar-subquery-predicate.sql
index fb0d07f..1661209 100644
--- a/sql/core/src/test/resources/sql-tests/inputs/subquery/scalar-subquery/scalar-subquery-predicate.sql
+++ b/sql/core/src/test/resources/sql-tests/inputs/subquery/scalar-subquery/scalar-subquery-predicate.sql
@@ -173,6 +173,16 @@ WHERE  t1a = (SELECT   max(t2a)
               HAVING   count(*) >= 0)
 OR     t1i > '2014-12-31';
 
+-- TC 02.03.01
+SELECT t1a
+FROM   t1
+WHERE  t1a = (SELECT   max(t2a)
+              FROM     t2
+              WHERE    t2c = t1c
+              GROUP BY t2c
+              HAVING   count(*) >= 1)
+OR     t1i > '2014-12-31';
+
 -- TC 02.04
 -- t1 on the right of an outer join
 -- can be reduced to inner join

http://git-wip-us.apache.org/repos/asf/spark/blob/0c2ba427/sql/core/src/test/resources/sql-tests/results/subquery/scalar-subquery/scalar-subquery-predicate.sql.out
----------------------------------------------------------------------
diff --git a/sql/core/src/test/resources/sql-tests/results/subquery/scalar-subquery/scalar-subquery-predicate.sql.out b/sql/core/src/test/resources/sql-tests/results/subquery/scalar-subquery/scalar-subquery-predicate.sql.out
index 8b29300..a2b86db 100644
--- a/sql/core/src/test/resources/sql-tests/results/subquery/scalar-subquery/scalar-subquery-predicate.sql.out
+++ b/sql/core/src/test/resources/sql-tests/results/subquery/scalar-subquery/scalar-subquery-predicate.sql.out
@@ -1,5 +1,5 @@
 -- Automatically generated by SQLQueryTestSuite
--- Number of queries: 26
+-- Number of queries: 29
 
 
 -- !query 0
@@ -293,6 +293,21 @@ val1d
 
 
 -- !query 19
+SELECT t1a
+FROM   t1
+WHERE  t1a = (SELECT   max(t2a)
+              FROM     t2
+              WHERE    t2c = t1c
+              GROUP BY t2c
+              HAVING   count(*) >= 1)
+OR     t1i > '2014-12-31'
+-- !query 19 schema
+struct<t1a:string>
+-- !query 19 output
+val1c
+val1d
+
+-- !query 22
 SELECT count(t1a)
 FROM   t1 RIGHT JOIN t2
 ON     t1d = t2d
@@ -300,13 +315,13 @@ WHERE  t1a < (SELECT   max(t2a)
               FROM     t2
               WHERE    t2c = t1c
               GROUP BY t2c)
--- !query 19 schema
+-- !query 22 schema
 struct<count(t1a):bigint>
--- !query 19 output
+-- !query 22 output
 7
 
 
--- !query 20
+-- !query 23
 SELECT t1a
 FROM   t1
 WHERE  t1b <= (SELECT   max(t2b)
@@ -317,14 +332,14 @@ AND    t1b >= (SELECT   min(t2b)
                FROM     t2
                WHERE    t2c = t1c
                GROUP BY t2c)
--- !query 20 schema
+-- !query 23 schema
 struct<t1a:string>
--- !query 20 output
+-- !query 23 output
 val1b
 val1c
 
 
--- !query 21
+-- !query 24
 SELECT t1a
 FROM   t1
 WHERE  t1a <= (SELECT   max(t2a)
@@ -338,14 +353,14 @@ WHERE  t1a >= (SELECT   min(t2a)
                FROM     t2
                WHERE    t2c = t1c
                GROUP BY t2c)
--- !query 21 schema
+-- !query 24 schema
 struct<t1a:string>
--- !query 21 output
+-- !query 24 output
 val1b
 val1c
 
 
--- !query 22
+-- !query 25
 SELECT t1a
 FROM   t1
 WHERE  t1a <= (SELECT   max(t2a)
@@ -359,9 +374,9 @@ WHERE  t1a >= (SELECT   min(t2a)
                FROM     t2
                WHERE    t2c = t1c
                GROUP BY t2c)
--- !query 22 schema
+-- !query 25 schema
 struct<t1a:string>
--- !query 22 output
+-- !query 25 output
 val1a
 val1a
 val1b
@@ -372,7 +387,7 @@ val1d
 val1d
 
 
--- !query 23
+-- !query 26
 SELECT t1a
 FROM   t1
 WHERE  t1a <= (SELECT   max(t2a)
@@ -386,16 +401,16 @@ WHERE  t1a >= (SELECT   min(t2a)
                FROM     t2
                WHERE    t2c = t1c
                GROUP BY t2c)
--- !query 23 schema
+-- !query 26 schema
 struct<t1a:string>
--- !query 23 output
+-- !query 26 output
 val1a
 val1b
 val1c
 val1d
 
 
--- !query 24
+-- !query 27
 SELECT t1a
 FROM   t1
 WHERE  t1a <= (SELECT   max(t2a)
@@ -409,13 +424,13 @@ WHERE  t1a >= (SELECT   min(t2a)
                FROM     t2
                WHERE    t2c = t1c
                GROUP BY t2c)
--- !query 24 schema
+-- !query 27 schema
 struct<t1a:string>
--- !query 24 output
+-- !query 27 output
 val1a
 
 
--- !query 25
+-- !query 28
 SELECT   t1a
 FROM     t1
 GROUP BY t1a, t1c
@@ -423,8 +438,8 @@ HAVING   max(t1b) <= (SELECT   max(t2b)
                       FROM     t2
                       WHERE    t2c = t1c
                       GROUP BY t2c)
--- !query 25 schema
+-- !query 28 schema
 struct<t1a:string>
--- !query 25 output
+-- !query 28 output
 val1b
 val1c


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@spark.apache.org
For additional commands, e-mail: commits-help@spark.apache.org