You are viewing a plain text version of this content. The canonical link for it is here.
Posted to reviews@spark.apache.org by GitBox <gi...@apache.org> on 2022/10/14 00:57:16 UTC

[GitHub] [spark] amaliujia commented on a diff in pull request #38135: [SPARK-36114][SQL] Support subqueries with correlated non-equality predicates

amaliujia commented on code in PR #38135:
URL: https://github.com/apache/spark/pull/38135#discussion_r995236973


##########
sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/analysis/AnalysisErrorSuite.scala:
##########
@@ -881,11 +881,10 @@ class AnalysisErrorSuite extends AnalysisTest {
       ($"a" + $"c" === $"b", "(a#x + outer(c#x)) = b#x"),
       (And($"a" === $"c", Cast($"d", IntegerType) === $"c"), "CAST(d#x AS INT) = outer(c#x)"))
     conditions.foreach { case (cond, msg) =>
-      val plan = Project(
-        ScalarSubquery(
+      val plan = Filter(

Review Comment:
   Nit: looks like this line of `Project` -> `Filter` change is not necessary.



##########
sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/CheckAnalysis.scala:
##########
@@ -1052,7 +1052,12 @@ trait CheckAnalysis extends PredicateHelper with LookupCatalog {
     //      1     | 2 | 4
     // and the plan after rewrite will give the original query incorrect results.
     def failOnUnsupportedCorrelatedPredicate(predicates: Seq[Expression], p: LogicalPlan): Unit = {
-      if (predicates.nonEmpty) {
+      // Correlated non-equality predicates are only supported with the decorrelate
+      // inner query framework. Currently we only use this new framework for scalar
+      // and lateral subqueries.
+      val allowNonEqualityPredicates =
+        SQLConf.get.decorrelateInnerQueryEnabled && (isScalar || isLateral)
+      if (!allowNonEqualityPredicates && predicates.nonEmpty) {

Review Comment:
   Sorry I have been missing context:
   
   After the non-equality predicates are supported, what are the left gap? I assuming all the predicates are supported now?



##########
sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/CheckAnalysis.scala:
##########
@@ -1052,7 +1052,12 @@ trait CheckAnalysis extends PredicateHelper with LookupCatalog {
     //      1     | 2 | 4
     // and the plan after rewrite will give the original query incorrect results.
     def failOnUnsupportedCorrelatedPredicate(predicates: Seq[Expression], p: LogicalPlan): Unit = {
-      if (predicates.nonEmpty) {
+      // Correlated non-equality predicates are only supported with the decorrelate
+      // inner query framework. Currently we only use this new framework for scalar
+      // and lateral subqueries.
+      val allowNonEqualityPredicates =
+        SQLConf.get.decorrelateInnerQueryEnabled && (isScalar || isLateral)
+      if (!allowNonEqualityPredicates && predicates.nonEmpty) {

Review Comment:
   oh you have an example below  which makes sense:
   ```
   -- Correlated equality predicates that are not supported after SPARK-35080
   SELECT c, (
       SELECT count(*)
       FROM (VALUES ('ab'), ('abc'), ('bc')) t2(c)
       WHERE t1.c = substring(t2.c, 1, 1)
   ) FROM (VALUES ('a'), ('b')) t1(c);
   ```



##########
sql/core/src/test/scala/org/apache/spark/sql/SubquerySuite.scala:
##########
@@ -2044,19 +2034,13 @@ class SubquerySuite extends QueryTest
           |FROM (SELECT CAST(c1 AS STRING) a FROM t1)
           |""".stripMargin),
         Row(5) :: Row(null) :: Nil)
-      val exception1 = intercept[AnalysisException] {
-        sql(
-          """SELECT (SELECT SUM(c2) FROM t2 WHERE CAST(c1 AS SHORT) = a)
-            |FROM (SELECT CAST(c1 AS SHORT) a FROM t1)""".stripMargin)
-      }
-      checkErrorMatchPVals(
-        exception1,
-        errorClass = "UNSUPPORTED_SUBQUERY_EXPRESSION_CATEGORY." +
-          "CORRELATED_COLUMN_IS_NOT_ALLOWED_IN_PREDICATE",
-        parameters = Map("treeNode" -> "(?s).*"),
-        sqlState = None,
-        context = ExpectedContext(
-          fragment = "SELECT SUM(c2) FROM t2 WHERE CAST(c1 AS SHORT) = a", start = 8, stop = 57))
+      // SPARK-36114: we now allow non-safe cast expressions in correlated predicates.
+      val df = sql(
+        """SELECT (SELECT SUM(c2) FROM t2 WHERE CAST(c1 AS SHORT) = a)
+          |FROM (SELECT CAST(c1 AS SHORT) a FROM t1)
+          |""".stripMargin)
+      checkAnswer(df, Row(5) :: Row(null) :: Nil)
+      checkNumJoins(df.queryExecution.optimizedPlan, 2)

Review Comment:
   I am missing context here: why need to check `NumJoins` only for this case? I did a code search and seems like other test cases in this suite do not care `NumJoins`.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org