You are viewing a plain text version of this content. The canonical link for it is here.
Posted to reviews@spark.apache.org by GitBox <gi...@apache.org> on 2022/10/06 18:20:14 UTC

[GitHub] [spark] allisonwang-db opened a new pull request, #38135: [SPARK-36114][SQL] Support subqueries with correlated non-equality predicates

allisonwang-db opened a new pull request, #38135:
URL: https://github.com/apache/spark/pull/38135

   <!--
   Thanks for sending a pull request!  Here are some tips for you:
     1. If this is your first time, please read our contributor guidelines: https://spark.apache.org/contributing.html
     2. Ensure you have added or run the appropriate tests for your PR: https://spark.apache.org/developer-tools.html
     3. If the PR is unfinished, add '[WIP]' in your PR title, e.g., '[WIP][SPARK-XXXX] Your PR title ...'.
     4. Be sure to keep the PR description updated to reflect all changes.
     5. Please write your PR title to summarize what this PR proposes.
     6. If possible, provide a concise example to reproduce the issue for a faster review.
     7. If you want to add a new configuration, please read the guideline first for naming configurations in
        'core/src/main/scala/org/apache/spark/internal/config/ConfigEntry.scala'.
     8. If you want to add or modify an error type or message, please read the guideline first in
        'core/src/main/resources/error/README.md'.
   -->
   
   ### What changes were proposed in this pull request?
   <!--
   Please clarify what changes you are proposing. The purpose of this section is to outline the changes and how this PR fixes the issue. 
   If possible, please consider writing useful notes for better and faster reviews in your PR. See the examples below.
     1. If you refactor some codes with changing classes, showing the class hierarchy will help reviewers.
     2. If you fix some SQL features, you can provide some references of other DBMSes.
     3. If there is design documentation, please add the link.
     4. If there is a discussion in the mailing list, please add the link.
   -->
   This PR supports correlated non-equality predicates in subqueries. It leverages the DecorrelateInnerQuery framework to decorrelate subqueries with non-equality predicates. DecorrelateInnerQuery inserts domain joins in the query plan and the rule RewriteCorrelatedScalarSubquery rewrites the domain joins into actual joins with the outer query.
   
   Note, correlated non-equality predicates can lead to query plans with non-equality join conditions, which may be planned as a broadcast NL join or cartesian product.
   
   ### Why are the changes needed?
   <!--
   Please clarify why the changes are needed. For instance,
     1. If you propose a new API, clarify the use case for a new API.
     2. If you fix a bug, you can clarify why it is a bug.
   -->
   To improve subquery support in Spark.
   
   ### Does this PR introduce _any_ user-facing change?
   <!--
   Note that it means *any* user-facing change including all aspects such as the documentation fix.
   If yes, please clarify the previous behavior and the change this PR proposes - provide the console output, description and/or an example to show the behavior difference if possible.
   If possible, please also clarify if this is a user-facing change compared to the released Spark versions or within the unreleased branches such as master.
   If no, write 'No'.
   -->
   Yes. Before this PR, Spark does not allow correlated non-equality predicates in subqueries. 
   For example:
   ```sql
   SELECT (SELECT min(c2) FROM t2 WHERE t1.c1 > t2.c1) FROM t1
   ```
   This will throw an exception: `Correlated column is not allowed in a non-equality predicate`
   
   After this PR, this query can run successfully.
   
   ### How was this patch tested?
   <!--
   If tests were added, say they were added here. Please make sure to add some test cases that check the changes thoroughly including negative and positive cases if possible.
   If it was tested in a way different from regular unit tests, please clarify how you tested step by step, ideally copy and paste-able, so that other reviewers can test and check, and descendants can verify in the future.
   If tests were not added, please describe why they were not added and/or why it was difficult to add.
   If benchmark tests were added, please run the benchmarks in GitHub Actions for the consistent environment, and the instructions could accord to: https://spark.apache.org/developer-tools.html#github-workflow-benchmarks.
   -->
   Unit tests and SQL query tests.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] [spark] allisonwang-db commented on a diff in pull request #38135: [SPARK-36114][SQL] Support subqueries with correlated non-equality predicates

Posted by GitBox <gi...@apache.org>.
allisonwang-db commented on code in PR #38135:
URL: https://github.com/apache/spark/pull/38135#discussion_r1000040515


##########
sql/core/src/test/scala/org/apache/spark/sql/SubquerySuite.scala:
##########
@@ -2044,19 +2034,13 @@ class SubquerySuite extends QueryTest
           |FROM (SELECT CAST(c1 AS STRING) a FROM t1)
           |""".stripMargin),
         Row(5) :: Row(null) :: Nil)
-      val exception1 = intercept[AnalysisException] {
-        sql(
-          """SELECT (SELECT SUM(c2) FROM t2 WHERE CAST(c1 AS SHORT) = a)
-            |FROM (SELECT CAST(c1 AS SHORT) a FROM t1)""".stripMargin)
-      }
-      checkErrorMatchPVals(
-        exception1,
-        errorClass = "UNSUPPORTED_SUBQUERY_EXPRESSION_CATEGORY." +
-          "CORRELATED_COLUMN_IS_NOT_ALLOWED_IN_PREDICATE",
-        parameters = Map("treeNode" -> "(?s).*"),
-        sqlState = None,
-        context = ExpectedContext(
-          fragment = "SELECT SUM(c2) FROM t2 WHERE CAST(c1 AS SHORT) = a", start = 8, stop = 57))
+      // SPARK-36114: we now allow non-safe cast expressions in correlated predicates.
+      val df = sql(
+        """SELECT (SELECT SUM(c2) FROM t2 WHERE CAST(c1 AS SHORT) = a)
+          |FROM (SELECT CAST(c1 AS SHORT) a FROM t1)
+          |""".stripMargin)
+      checkAnswer(df, Row(5) :: Row(null) :: Nil)
+      checkNumJoins(df.queryExecution.optimizedPlan, 2)

Review Comment:
   I am verifying the optimized plan should have 1 left outer join and 1 domain (inner) join.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] [spark] cloud-fan closed pull request #38135: [SPARK-36114][SQL] Support subqueries with correlated non-equality predicates

Posted by GitBox <gi...@apache.org>.
cloud-fan closed pull request #38135: [SPARK-36114][SQL] Support subqueries with correlated non-equality predicates
URL: https://github.com/apache/spark/pull/38135


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] [spark] allisonwang-db commented on a diff in pull request #38135: [SPARK-36114][SQL] Support subqueries with correlated non-equality predicates

Posted by GitBox <gi...@apache.org>.
allisonwang-db commented on code in PR #38135:
URL: https://github.com/apache/spark/pull/38135#discussion_r1000039916


##########
sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/analysis/AnalysisErrorSuite.scala:
##########
@@ -881,11 +881,10 @@ class AnalysisErrorSuite extends AnalysisTest {
       ($"a" + $"c" === $"b", "(a#x + outer(c#x)) = b#x"),
       (And($"a" === $"c", Cast($"d", IntegerType) === $"c"), "CAST(d#x AS INT) = outer(c#x)"))
     conditions.foreach { case (cond, msg) =>
-      val plan = Project(
-        ScalarSubquery(
+      val plan = Filter(

Review Comment:
   You are right. Project can host IN/EXISTS now.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] [spark] allisonwang-db commented on pull request #38135: [SPARK-36114][SQL] Support subqueries with correlated non-equality predicates

Posted by GitBox <gi...@apache.org>.
allisonwang-db commented on PR #38135:
URL: https://github.com/apache/spark/pull/38135#issuecomment-1275246395

   cc @cloud-fan @dtenedor


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] [spark] cloud-fan commented on pull request #38135: [SPARK-36114][SQL] Support subqueries with correlated non-equality predicates

Posted by GitBox <gi...@apache.org>.
cloud-fan commented on PR #38135:
URL: https://github.com/apache/spark/pull/38135#issuecomment-1284096692

   is this only for scalar subquery?


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] [spark] cloud-fan commented on a diff in pull request #38135: [SPARK-36114][SQL] Support subqueries with correlated non-equality predicates

Posted by GitBox <gi...@apache.org>.
cloud-fan commented on code in PR #38135:
URL: https://github.com/apache/spark/pull/38135#discussion_r1000465237


##########
sql/core/src/test/scala/org/apache/spark/sql/SubquerySuite.scala:
##########
@@ -2044,19 +2034,13 @@ class SubquerySuite extends QueryTest
           |FROM (SELECT CAST(c1 AS STRING) a FROM t1)
           |""".stripMargin),
         Row(5) :: Row(null) :: Nil)
-      val exception1 = intercept[AnalysisException] {
-        sql(
-          """SELECT (SELECT SUM(c2) FROM t2 WHERE CAST(c1 AS SHORT) = a)
-            |FROM (SELECT CAST(c1 AS SHORT) a FROM t1)""".stripMargin)
-      }
-      checkErrorMatchPVals(
-        exception1,
-        errorClass = "UNSUPPORTED_SUBQUERY_EXPRESSION_CATEGORY." +
-          "CORRELATED_COLUMN_IS_NOT_ALLOWED_IN_PREDICATE",
-        parameters = Map("treeNode" -> "(?s).*"),
-        sqlState = None,
-        context = ExpectedContext(
-          fragment = "SELECT SUM(c2) FROM t2 WHERE CAST(c1 AS SHORT) = a", start = 8, stop = 57))
+      // SPARK-36114: we now allow non-safe cast expressions in correlated predicates.
+      val df = sql(
+        """SELECT (SELECT SUM(c2) FROM t2 WHERE CAST(c1 AS SHORT) = a)
+          |FROM (SELECT CAST(c1 AS SHORT) a FROM t1)
+          |""".stripMargin)
+      checkAnswer(df, Row(5) :: Row(null) :: Nil)
+      checkNumJoins(df.queryExecution.optimizedPlan, 2)

Review Comment:
   Can we check the number of joins for the safe cast case as well?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] [spark] cloud-fan commented on a diff in pull request #38135: [SPARK-36114][SQL] Support subqueries with correlated non-equality predicates

Posted by GitBox <gi...@apache.org>.
cloud-fan commented on code in PR #38135:
URL: https://github.com/apache/spark/pull/38135#discussion_r1000464929


##########
sql/core/src/test/scala/org/apache/spark/sql/SubquerySuite.scala:
##########
@@ -2139,19 +2129,14 @@ class SubquerySuite extends QueryTest
           |FROM (SELECT CAST(c1 AS STRING) a FROM t1)
           |""".stripMargin),
         Row(5) :: Row(null) :: Nil)
-      val exception1 = intercept[AnalysisException] {
-        sql(
-          """SELECT (SELECT SUM(c2) FROM t2 WHERE CAST(c1 AS SHORT) = a)
-            |FROM (SELECT CAST(c1 AS SHORT) a FROM t1)""".stripMargin)
-      }
-      checkErrorMatchPVals(
-        exception1,
-        errorClass = "UNSUPPORTED_SUBQUERY_EXPRESSION_CATEGORY." +
-          "CORRELATED_COLUMN_IS_NOT_ALLOWED_IN_PREDICATE",
-        parameters = Map("treeNode" -> "(?s).*"),
-        sqlState = None,
-        context = ExpectedContext(
-          fragment = "SELECT SUM(c2) FROM t2 WHERE CAST(c1 AS SHORT) = a", start = 8, stop = 57))
+      // SPARK-36114: we now allow non-safe cast expressions in correlated predicates.

Review Comment:
   can we change the test name? `SPARK-38180, SPARK-36114: cast expressions in correlated equality conditions`



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] [spark] allisonwang-db commented on pull request #38135: [SPARK-36114][SQL] Support subqueries with correlated non-equality predicates

Posted by GitBox <gi...@apache.org>.
allisonwang-db commented on PR #38135:
URL: https://github.com/apache/spark/pull/38135#issuecomment-1284741006

   > is this only for scalar subquery?
   
   Scalar and lateral subqueries. IN and EXISTS subqueries are not supported because they are not using the new decorrelation framework.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] [spark] amaliujia commented on a diff in pull request #38135: [SPARK-36114][SQL] Support subqueries with correlated non-equality predicates

Posted by GitBox <gi...@apache.org>.
amaliujia commented on code in PR #38135:
URL: https://github.com/apache/spark/pull/38135#discussion_r995236973


##########
sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/analysis/AnalysisErrorSuite.scala:
##########
@@ -881,11 +881,10 @@ class AnalysisErrorSuite extends AnalysisTest {
       ($"a" + $"c" === $"b", "(a#x + outer(c#x)) = b#x"),
       (And($"a" === $"c", Cast($"d", IntegerType) === $"c"), "CAST(d#x AS INT) = outer(c#x)"))
     conditions.foreach { case (cond, msg) =>
-      val plan = Project(
-        ScalarSubquery(
+      val plan = Filter(

Review Comment:
   Nit: looks like this line of `Project` -> `Filter` change is not necessary.



##########
sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/CheckAnalysis.scala:
##########
@@ -1052,7 +1052,12 @@ trait CheckAnalysis extends PredicateHelper with LookupCatalog {
     //      1     | 2 | 4
     // and the plan after rewrite will give the original query incorrect results.
     def failOnUnsupportedCorrelatedPredicate(predicates: Seq[Expression], p: LogicalPlan): Unit = {
-      if (predicates.nonEmpty) {
+      // Correlated non-equality predicates are only supported with the decorrelate
+      // inner query framework. Currently we only use this new framework for scalar
+      // and lateral subqueries.
+      val allowNonEqualityPredicates =
+        SQLConf.get.decorrelateInnerQueryEnabled && (isScalar || isLateral)
+      if (!allowNonEqualityPredicates && predicates.nonEmpty) {

Review Comment:
   Sorry I have been missing context:
   
   After the non-equality predicates are supported, what are the left gap? I assuming all the predicates are supported now?



##########
sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/CheckAnalysis.scala:
##########
@@ -1052,7 +1052,12 @@ trait CheckAnalysis extends PredicateHelper with LookupCatalog {
     //      1     | 2 | 4
     // and the plan after rewrite will give the original query incorrect results.
     def failOnUnsupportedCorrelatedPredicate(predicates: Seq[Expression], p: LogicalPlan): Unit = {
-      if (predicates.nonEmpty) {
+      // Correlated non-equality predicates are only supported with the decorrelate
+      // inner query framework. Currently we only use this new framework for scalar
+      // and lateral subqueries.
+      val allowNonEqualityPredicates =
+        SQLConf.get.decorrelateInnerQueryEnabled && (isScalar || isLateral)
+      if (!allowNonEqualityPredicates && predicates.nonEmpty) {

Review Comment:
   oh you have an example below  which makes sense:
   ```
   -- Correlated equality predicates that are not supported after SPARK-35080
   SELECT c, (
       SELECT count(*)
       FROM (VALUES ('ab'), ('abc'), ('bc')) t2(c)
       WHERE t1.c = substring(t2.c, 1, 1)
   ) FROM (VALUES ('a'), ('b')) t1(c);
   ```



##########
sql/core/src/test/scala/org/apache/spark/sql/SubquerySuite.scala:
##########
@@ -2044,19 +2034,13 @@ class SubquerySuite extends QueryTest
           |FROM (SELECT CAST(c1 AS STRING) a FROM t1)
           |""".stripMargin),
         Row(5) :: Row(null) :: Nil)
-      val exception1 = intercept[AnalysisException] {
-        sql(
-          """SELECT (SELECT SUM(c2) FROM t2 WHERE CAST(c1 AS SHORT) = a)
-            |FROM (SELECT CAST(c1 AS SHORT) a FROM t1)""".stripMargin)
-      }
-      checkErrorMatchPVals(
-        exception1,
-        errorClass = "UNSUPPORTED_SUBQUERY_EXPRESSION_CATEGORY." +
-          "CORRELATED_COLUMN_IS_NOT_ALLOWED_IN_PREDICATE",
-        parameters = Map("treeNode" -> "(?s).*"),
-        sqlState = None,
-        context = ExpectedContext(
-          fragment = "SELECT SUM(c2) FROM t2 WHERE CAST(c1 AS SHORT) = a", start = 8, stop = 57))
+      // SPARK-36114: we now allow non-safe cast expressions in correlated predicates.
+      val df = sql(
+        """SELECT (SELECT SUM(c2) FROM t2 WHERE CAST(c1 AS SHORT) = a)
+          |FROM (SELECT CAST(c1 AS SHORT) a FROM t1)
+          |""".stripMargin)
+      checkAnswer(df, Row(5) :: Row(null) :: Nil)
+      checkNumJoins(df.queryExecution.optimizedPlan, 2)

Review Comment:
   I am missing context here: why need to check `NumJoins` only for this case? I did a code search and seems like other test cases in this suite do not care `NumJoins`.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] [spark] cloud-fan commented on pull request #38135: [SPARK-36114][SQL] Support subqueries with correlated non-equality predicates

Posted by GitBox <gi...@apache.org>.
cloud-fan commented on PR #38135:
URL: https://github.com/apache/spark/pull/38135#issuecomment-1288354293

   thanks, merging to master!


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org