You are viewing a plain text version of this content. The canonical link for it is here.
Posted to reviews@spark.apache.org by "agubichev (via GitHub)" <gi...@apache.org> on 2023/05/24 23:50:22 UTC

[GitHub] [spark] agubichev opened a new pull request, #41301: [SPARK-43780][SQL] Support correlated references in join predicates

agubichev opened a new pull request, #41301:
URL: https://github.com/apache/spark/pull/41301

   <!--
   Thanks for sending a pull request!  Here are some tips for you:
     1. If this is your first time, please read our contributor guidelines: https://spark.apache.org/contributing.html
     2. Ensure you have added or run the appropriate tests for your PR: https://spark.apache.org/developer-tools.html
     3. If the PR is unfinished, add '[WIP]' in your PR title, e.g., '[WIP][SPARK-XXXX] Your PR title ...'.
     4. Be sure to keep the PR description updated to reflect all changes.
     5. Please write your PR title to summarize what this PR proposes.
     6. If possible, provide a concise example to reproduce the issue for a faster review.
     7. If you want to add a new configuration, please read the guideline first for naming configurations in
        'core/src/main/scala/org/apache/spark/internal/config/ConfigEntry.scala'.
     8. If you want to add or modify an error type or message, please read the guideline first in
        'core/src/main/resources/error/README.md'.
   -->
   
   ### What changes were proposed in this pull request?
   <!--
   Please clarify what changes you are proposing. The purpose of this section is to outline the changes and how this PR fixes the issue. 
   If possible, please consider writing useful notes for better and faster reviews in your PR. See the examples below.
     1. If you refactor some codes with changing classes, showing the class hierarchy will help reviewers.
     2. If you fix some SQL features, you can provide some references of other DBMSes.
     3. If there is design documentation, please add the link.
     4. If there is a discussion in the mailing list, please add the link.
   -->
   This PR adds support to subqueries that involve joins with correlated references in join predicates, e.g.
   
   ```
   select * from t0 join lateral (select * from t1 join t2 on t1a = t2a and t1a = t0a);
   ```
   
   (full example in https://issues.apache.org/jira/browse/SPARK-43780)
   
   ### Why are the changes needed?
   <!--
   Please clarify why the changes are needed. For instance,
     1. If you propose a new API, clarify the use case for a new API.
     2. If you fix a bug, you can clarify why it is a bug.
   -->
   This is a valid SQL that is not yet supported by Spark SQL.
   
   ### Does this PR introduce _any_ user-facing change?
   <!--
   Note that it means *any* user-facing change including all aspects such as the documentation fix.
   If yes, please clarify the previous behavior and the change this PR proposes - provide the console output, description and/or an example to show the behavior difference if possible.
   If possible, please also clarify if this is a user-facing change compared to the released Spark versions or within the unreleased branches such as master.
   If no, write 'No'.
   -->
   Yes, previously unsupported queries become supported.
   
   ### How was this patch tested?
   <!--
   If tests were added, say they were added here. Please make sure to add some test cases that check the changes thoroughly including negative and positive cases if possible.
   If it was tested in a way different from regular unit tests, please clarify how you tested step by step, ideally copy and paste-able, so that other reviewers can test and check, and descendants can verify in the future.
   If tests were not added, please describe why they were not added and/or why it was difficult to add.
   If benchmark tests were added, please run the benchmarks in GitHub Actions for the consistent environment, and the instructions could accord to: https://spark.apache.org/developer-tools.html#github-workflow-benchmarks.
   -->
   Query and unit tests
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] [spark] allisonwang-db commented on a diff in pull request #41301: [SPARK-43780][SQL] Support correlated references in join predicates

Posted by "allisonwang-db (via GitHub)" <gi...@apache.org>.
allisonwang-db commented on code in PR #41301:
URL: https://github.com/apache/spark/pull/41301#discussion_r1290738214


##########
sql/catalyst/src/main/scala/org/apache/spark/sql/internal/SQLConf.scala:
##########
@@ -4268,6 +4268,14 @@ object SQLConf {
       .checkValue(_ >= 0, "The threshold of cached local relations must not be negative")
       .createWithDefault(64 * 1024 * 1024)
 
+  val DECORRELATE_JOIN_PREDICATE_ENABLED =
+    buildConf("spark.sql.optimizer.decorrelateJoinPredicate.enabled")
+      .internal()
+      .doc("Decorrelate subqueries with correlated references in join predicates.")
+      .version("3.5.0")

Review Comment:
   Should this be `4.0.0`?



##########
sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/optimizer/DecorrelateInnerQuery.scala:
##########
@@ -804,18 +804,74 @@ object DecorrelateInnerQuery extends PredicateHelper {
             (d.copy(child = newChild), joinCond, outerReferenceMap)
 
           case j @ Join(left, right, joinType, condition, _) =>
-            val outerReferences = collectOuterReferences(j.expressions)
-            // Join condition containing outer references is not supported.
-            assert(outerReferences.isEmpty, s"Correlated column is not allowed in join: $j")
-            val newOuterReferences = parentOuterReferences ++ outerReferences
-            val shouldPushToLeft = joinType match {
+            // Given 'condition', computes the tuple of
+            // (correlated, uncorrelated, equalityCond, predicates, equivalences).
+            // 'correlated' and 'uncorrelated' are the conjuncts with (resp. without)
+            // outer (correlated) references. Furthermore, correlated conjuncts are split
+            // into 'equalityCond' (those that are equalities) and all rest ('predicates').
+            // 'equivalences' track equivalent attributes given 'equalityCond'.
+            def splitCorrelatedPredicate(condition: Option[Expression],
+                                         isInnerJoin: Boolean,
+                                         shouldDecorrelatePredicates: Boolean):
+            (Seq[Expression], Seq[Expression], Seq[Expression],
+              Seq[Expression], AttributeMap[Attribute]) = {

Review Comment:
   ```suggestion
               def splitCorrelatedPredicate(
                   condition: Option[Expression],
                   isInnerJoin: Boolean,
                   shouldDecorrelatePredicates: Boolean):
               (Seq[Expression], Seq[Expression], Seq[Expression],
                 Seq[Expression], AttributeMap[Attribute]) = {
   ```



##########
sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/optimizer/DecorrelateInnerQuery.scala:
##########
@@ -804,18 +804,67 @@ object DecorrelateInnerQuery extends PredicateHelper {
             (d.copy(child = newChild), joinCond, outerReferenceMap)
 
           case j @ Join(left, right, joinType, condition, _) =>
-            val outerReferences = collectOuterReferences(j.expressions)
-            // Join condition containing outer references is not supported.
-            assert(outerReferences.isEmpty, s"Correlated column is not allowed in join: $j")
-            val newOuterReferences = parentOuterReferences ++ outerReferences
-            val shouldPushToLeft = joinType match {
+            def splitCorrelatedPredicate(condition: Option[Expression],
+                                         isInnerJoin: Boolean,
+                                         shouldDecorrelatePredicates: Boolean):
+            (Seq[Expression], Seq[Expression], Seq[Expression],
+              Seq[Expression], AttributeMap[Attribute]) = {

Review Comment:
   Thanks for adding the comment! If possible, can we also include a minimal example in this comment?



##########
sql/catalyst/src/main/scala/org/apache/spark/sql/internal/SQLConf.scala:
##########
@@ -4268,6 +4268,14 @@ object SQLConf {
       .checkValue(_ >= 0, "The threshold of cached local relations must not be negative")
       .createWithDefault(64 * 1024 * 1024)
 
+  val DECORRELATE_JOIN_PREDICATE_ENABLED =
+    buildConf("spark.sql.optimizer.decorrelateJoinPredicate.enabled")
+      .internal()
+      .doc("Decorrelate subqueries with correlated references in join predicates.")

Review Comment:
   We should also mention that the decorrelate inner query config



##########
sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/optimizer/DecorrelateInnerQuery.scala:
##########
@@ -804,18 +804,74 @@ object DecorrelateInnerQuery extends PredicateHelper {
             (d.copy(child = newChild), joinCond, outerReferenceMap)
 
           case j @ Join(left, right, joinType, condition, _) =>
-            val outerReferences = collectOuterReferences(j.expressions)
-            // Join condition containing outer references is not supported.
-            assert(outerReferences.isEmpty, s"Correlated column is not allowed in join: $j")
-            val newOuterReferences = parentOuterReferences ++ outerReferences
-            val shouldPushToLeft = joinType match {
+            // Given 'condition', computes the tuple of
+            // (correlated, uncorrelated, equalityCond, predicates, equivalences).
+            // 'correlated' and 'uncorrelated' are the conjuncts with (resp. without)
+            // outer (correlated) references. Furthermore, correlated conjuncts are split
+            // into 'equalityCond' (those that are equalities) and all rest ('predicates').
+            // 'equivalences' track equivalent attributes given 'equalityCond'.
+            def splitCorrelatedPredicate(condition: Option[Expression],
+                                         isInnerJoin: Boolean,
+                                         shouldDecorrelatePredicates: Boolean):

Review Comment:
   Can we also mention these two parameters in the comment?



##########
sql/core/src/test/resources/sql-tests/inputs/join-lateral.sql:
##########
@@ -101,6 +101,10 @@ SELECT * FROM t1 WHERE c1 = (SELECT MIN(a) FROM t2, LATERAL (SELECT c1 AS a));
 -- lateral join inside correlated subquery
 SELECT * FROM t1 WHERE c1 = (SELECT MIN(a) FROM t2, LATERAL (SELECT c1 AS a) WHERE c1 = t1.c1);
 
+-- join condition has a correlated reference to the left side of the lateral join
+SELECT * FROM t1 JOIN lateral (SELECT * FROM t2 JOIN t4 ON t2.c1 = t4.c1 AND t2.c1 = t1.c1);
+SELECT * FROM t1 LEFT JOIN lateral (SELECT * FROM t4 LEFT JOIN t2 ON t2.c1 = t4.c1 AND t2.c1 = t1.c1);
+

Review Comment:
   Shall we add one more test for inner join with non-equality join conditions here?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] [spark] MaxGekk commented on a diff in pull request #41301: [SPARK-43780][SQL] Support correlated references in join predicates for scalar and lateral subqueries

Posted by "MaxGekk (via GitHub)" <gi...@apache.org>.
MaxGekk commented on code in PR #41301:
URL: https://github.com/apache/spark/pull/41301#discussion_r1301444388


##########
sql/catalyst/src/main/scala/org/apache/spark/sql/internal/SQLConf.scala:
##########
@@ -4268,6 +4268,16 @@ object SQLConf {
       .checkValue(_ >= 0, "The threshold of cached local relations must not be negative")
       .createWithDefault(64 * 1024 * 1024)
 
+  val DECORRELATE_JOIN_PREDICATE_ENABLED =
+    buildConf("spark.sql.optimizer.decorrelateJoinPredicate.enabled")
+      .internal()
+      .doc("Decorrelate scalar and lateral subqueries with correlated references in join " +
+        "predicates. This configuration is only effective when " +
+        "'${DECORRELATE_INNER_QUERY_ENABLED.key}' is true.")

Review Comment:
   Missed string interpolation, here is the PR w/ the fix: https://github.com/apache/spark/pull/42607



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] [spark] agubichev commented on a diff in pull request #41301: [SPARK-43780][SQL] Support correlated references in join predicates

Posted by "agubichev (via GitHub)" <gi...@apache.org>.
agubichev commented on code in PR #41301:
URL: https://github.com/apache/spark/pull/41301#discussion_r1282510234


##########
sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/optimizer/DecorrelateInnerQuery.scala:
##########
@@ -826,8 +882,13 @@ object DecorrelateInnerQuery extends PredicateHelper {
             } else {
               (right, Nil, AttributeMap.empty[Attribute])
             }
-            val newOuterReferenceMap = leftOuterReferenceMap ++ rightOuterReferenceMap
-            val newJoinCond = leftJoinCond ++ rightJoinCond
+            val newOuterReferenceMap = leftOuterReferenceMap ++ rightOuterReferenceMap ++
+              equivalences
+            val newCorrelated =
+              if (shouldDecorrelatePredicates) {
+                replaceOuterReferences(predicates, newOuterReferenceMap)

Review Comment:
   done, added a test as well.
   we can just keep all the join's predicates for now.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] [spark] agubichev commented on a diff in pull request #41301: [SPARK-43780][SQL] Support correlated references in join predicates

Posted by "agubichev (via GitHub)" <gi...@apache.org>.
agubichev commented on code in PR #41301:
URL: https://github.com/apache/spark/pull/41301#discussion_r1267038310


##########
sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/optimizer/DecorrelateInnerQuery.scala:
##########
@@ -804,18 +804,67 @@ object DecorrelateInnerQuery extends PredicateHelper {
             (d.copy(child = newChild), joinCond, outerReferenceMap)
 
           case j @ Join(left, right, joinType, condition, _) =>
-            val outerReferences = collectOuterReferences(j.expressions)
-            // Join condition containing outer references is not supported.
-            assert(outerReferences.isEmpty, s"Correlated column is not allowed in join: $j")
-            val newOuterReferences = parentOuterReferences ++ outerReferences
-            val shouldPushToLeft = joinType match {
+            def splitCorrelatedPredicate(condition: Option[Expression],
+                                         isInnerJoin: Boolean,
+                                         shouldDecorrelatePredicates: Boolean):
+            (Seq[Expression], Seq[Expression], Seq[Expression],
+              Seq[Expression], AttributeMap[Attribute]) = {
+              // Similar to Filters above, we split the join condition (if present) into correlated
+              // and uncorrelated predicates, and separately handle joins under set and aggregation
+              // operations.
+              if (shouldDecorrelatePredicates) {
+                val conditions =
+                  if (condition.isDefined) splitConjunctivePredicates(condition.get)
+                  else Seq.empty[Expression]
+                val (correlated, uncorrelated) = conditions.partition(containsOuter)
+                val equivalences =
+                  if (underSetOp) AttributeMap.empty[Attribute]
+                  else collectEquivalentOuterReferences(correlated)
+                var (equalityCond, predicates) =
+                  if (underSetOp) (Seq.empty[Expression], correlated)
+                  else correlated.partition(canPullUpOverAgg)
+                // Fully preserve the join predicate for non-inner joins.
+                if (!isInnerJoin) {
+                  predicates = predicates ++ equalityCond
+                }
+                (correlated, uncorrelated, equalityCond, predicates, equivalences)
+              } else {
+                (Seq.empty[Expression],
+                  if (condition.isEmpty) Seq.empty[Expression] else Seq(condition.get),
+                  Seq.empty[Expression],
+                  Seq.empty[Expression],
+                  AttributeMap.empty[Attribute])
+              }
+            }
+
+            val shouldDecorrelatePredicates =
+              SQLConf.get.getConf(SQLConf.DECORRELATE_JOIN_PREDICATE_ENABLED)
+            if (!shouldDecorrelatePredicates) {
+              val outerReferences = collectOuterReferences(j.expressions)
+              // Join condition containing outer references is not supported.
+              assert(outerReferences.isEmpty, s"Correlated column is not allowed in join: $j")
+            }
+            val (correlated, uncorrelated, equalityCond, predicates, equivalences) =
+              splitCorrelatedPredicate(condition, joinType == Inner, shouldDecorrelatePredicates)
+            val outerReferences = collectOuterReferences(j.expressions) ++
+              collectOuterReferences(predicates)
+            val newOuterReferences =
+              parentOuterReferences ++ outerReferences -- equivalences.keySet
+            var shouldPushToLeft = joinType match {
               case LeftOuter | LeftSemiOrAnti(_) | FullOuter => true
               case _ => hasOuterReferences(left)
             }
             val shouldPushToRight = joinType match {
               case RightOuter | FullOuter => true
               case _ => hasOuterReferences(right)
             }
+            if (shouldDecorrelatePredicates && !shouldPushToLeft && !shouldPushToRight
+              && !correlated.isEmpty) {

Review Comment:
   good catch!
   
   note that there would not be any domain join even with 'correlated.isEmpty', as newOuterReferences would have those references involved in equivalences removed (few lines above this one).
   However, with this change it is a bit more explicit.
   



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] [spark] jchen5 commented on pull request #41301: [SPARK-43780][SQL] Support correlated references in join predicates

Posted by "jchen5 (via GitHub)" <gi...@apache.org>.
jchen5 commented on PR #41301:
URL: https://github.com/apache/spark/pull/41301#issuecomment-1577100399

   CC @cloud-fan @allisonwang-db 


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] [spark] jchen5 commented on a diff in pull request #41301: [SPARK-43780][SQL] Support correlated references in join predicates

Posted by "jchen5 (via GitHub)" <gi...@apache.org>.
jchen5 commented on code in PR #41301:
URL: https://github.com/apache/spark/pull/41301#discussion_r1214962726


##########
sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/optimizer/DecorrelateInnerQuery.scala:
##########
@@ -804,18 +804,67 @@ object DecorrelateInnerQuery extends PredicateHelper {
             (d.copy(child = newChild), joinCond, outerReferenceMap)
 
           case j @ Join(left, right, joinType, condition, _) =>
-            val outerReferences = collectOuterReferences(j.expressions)
-            // Join condition containing outer references is not supported.
-            assert(outerReferences.isEmpty, s"Correlated column is not allowed in join: $j")
-            val newOuterReferences = parentOuterReferences ++ outerReferences
-            val shouldPushToLeft = joinType match {
+            def splitCorrelatedPredicate(condition: Option[Expression],
+                                         isInnerJoin: Boolean,
+                                         shouldDecorrelatePredicates: Boolean):
+            (Seq[Expression], Seq[Expression], Seq[Expression],
+              Seq[Expression], AttributeMap[Attribute]) = {
+              // Similar to Filters above, we split the join condition (if present) into correlated
+              // and uncorrelated predicates, and separately handle joins under set and aggregation
+              // operations.
+              if (shouldDecorrelatePredicates) {
+                val conditions =
+                  if (condition.isDefined) splitConjunctivePredicates(condition.get)
+                  else Seq.empty[Expression]
+                val (correlated, uncorrelated) = conditions.partition(containsOuter)
+                val equivalences =
+                  if (underSetOp) AttributeMap.empty[Attribute]
+                  else collectEquivalentOuterReferences(correlated)
+                var (equalityCond, predicates) =
+                  if (underSetOp) (Seq.empty[Expression], correlated)
+                  else correlated.partition(canPullUpOverAgg)

Review Comment:
   I think we can do
   ```
   else if (aggregated) collectEquivalentOuterReferences(correlated)
   else correlated
   ```
   so that if it's not aggregated, we can directly use all the correlated predicates without adding a DomainJoin, equivalent to the Filter logic around
   ```
                 // Results of this sub-tree is not aggregated, so all correlated predicates
                 // can be directly used as outer query join conditions.
   ```



##########
sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/optimizer/DecorrelateInnerQuery.scala:
##########
@@ -804,18 +804,67 @@ object DecorrelateInnerQuery extends PredicateHelper {
             (d.copy(child = newChild), joinCond, outerReferenceMap)
 
           case j @ Join(left, right, joinType, condition, _) =>
-            val outerReferences = collectOuterReferences(j.expressions)
-            // Join condition containing outer references is not supported.
-            assert(outerReferences.isEmpty, s"Correlated column is not allowed in join: $j")
-            val newOuterReferences = parentOuterReferences ++ outerReferences
-            val shouldPushToLeft = joinType match {
+            def splitCorrelatedPredicate(condition: Option[Expression],
+                                         isInnerJoin: Boolean,
+                                         shouldDecorrelatePredicates: Boolean):
+            (Seq[Expression], Seq[Expression], Seq[Expression],
+              Seq[Expression], AttributeMap[Attribute]) = {
+              // Similar to Filters above, we split the join condition (if present) into correlated
+              // and uncorrelated predicates, and separately handle joins under set and aggregation
+              // operations.
+              if (shouldDecorrelatePredicates) {
+                val conditions =
+                  if (condition.isDefined) splitConjunctivePredicates(condition.get)
+                  else Seq.empty[Expression]
+                val (correlated, uncorrelated) = conditions.partition(containsOuter)
+                val equivalences =
+                  if (underSetOp) AttributeMap.empty[Attribute]
+                  else collectEquivalentOuterReferences(correlated)
+                var (equalityCond, predicates) =
+                  if (underSetOp) (Seq.empty[Expression], correlated)
+                  else correlated.partition(canPullUpOverAgg)
+                // Fully preserve the join predicate for non-inner joins.
+                if (!isInnerJoin) {
+                  predicates = predicates ++ equalityCond

Review Comment:
   Maybe slightly better to set `predicates = correlated`, to preserve the ordering of the original predicates?



##########
sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/optimizer/DecorrelateInnerQuery.scala:
##########
@@ -804,18 +804,67 @@ object DecorrelateInnerQuery extends PredicateHelper {
             (d.copy(child = newChild), joinCond, outerReferenceMap)
 
           case j @ Join(left, right, joinType, condition, _) =>
-            val outerReferences = collectOuterReferences(j.expressions)
-            // Join condition containing outer references is not supported.
-            assert(outerReferences.isEmpty, s"Correlated column is not allowed in join: $j")
-            val newOuterReferences = parentOuterReferences ++ outerReferences
-            val shouldPushToLeft = joinType match {
+            def splitCorrelatedPredicate(condition: Option[Expression],
+                                         isInnerJoin: Boolean,
+                                         shouldDecorrelatePredicates: Boolean):
+            (Seq[Expression], Seq[Expression], Seq[Expression],
+              Seq[Expression], AttributeMap[Attribute]) = {

Review Comment:
   Can you add a comment that the results are (correlated, uncorrelated, equalityCond, predicates, equivalences) and maybe a brief description of them



##########
sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/optimizer/DecorrelateInnerQuery.scala:
##########
@@ -804,18 +804,67 @@ object DecorrelateInnerQuery extends PredicateHelper {
             (d.copy(child = newChild), joinCond, outerReferenceMap)
 
           case j @ Join(left, right, joinType, condition, _) =>
-            val outerReferences = collectOuterReferences(j.expressions)
-            // Join condition containing outer references is not supported.
-            assert(outerReferences.isEmpty, s"Correlated column is not allowed in join: $j")
-            val newOuterReferences = parentOuterReferences ++ outerReferences
-            val shouldPushToLeft = joinType match {
+            def splitCorrelatedPredicate(condition: Option[Expression],
+                                         isInnerJoin: Boolean,
+                                         shouldDecorrelatePredicates: Boolean):
+            (Seq[Expression], Seq[Expression], Seq[Expression],
+              Seq[Expression], AttributeMap[Attribute]) = {
+              // Similar to Filters above, we split the join condition (if present) into correlated
+              // and uncorrelated predicates, and separately handle joins under set and aggregation
+              // operations.
+              if (shouldDecorrelatePredicates) {
+                val conditions =
+                  if (condition.isDefined) splitConjunctivePredicates(condition.get)
+                  else Seq.empty[Expression]
+                val (correlated, uncorrelated) = conditions.partition(containsOuter)
+                val equivalences =
+                  if (underSetOp) AttributeMap.empty[Attribute]
+                  else collectEquivalentOuterReferences(correlated)
+                var (equalityCond, predicates) =
+                  if (underSetOp) (Seq.empty[Expression], correlated)
+                  else correlated.partition(canPullUpOverAgg)
+                // Fully preserve the join predicate for non-inner joins.
+                if (!isInnerJoin) {
+                  predicates = predicates ++ equalityCond
+                }
+                (correlated, uncorrelated, equalityCond, predicates, equivalences)
+              } else {
+                (Seq.empty[Expression],
+                  if (condition.isEmpty) Seq.empty[Expression] else Seq(condition.get),
+                  Seq.empty[Expression],
+                  Seq.empty[Expression],
+                  AttributeMap.empty[Attribute])
+              }
+            }
+
+            val shouldDecorrelatePredicates =
+              SQLConf.get.getConf(SQLConf.DECORRELATE_JOIN_PREDICATE_ENABLED)
+            if (!shouldDecorrelatePredicates) {
+              val outerReferences = collectOuterReferences(j.expressions)
+              // Join condition containing outer references is not supported.
+              assert(outerReferences.isEmpty, s"Correlated column is not allowed in join: $j")
+            }
+            val (correlated, uncorrelated, equalityCond, predicates, equivalences) =
+              splitCorrelatedPredicate(condition, joinType == Inner, shouldDecorrelatePredicates)
+            val outerReferences = collectOuterReferences(j.expressions) ++
+              collectOuterReferences(predicates)
+            val newOuterReferences =
+              parentOuterReferences ++ outerReferences -- equivalences.keySet
+            var shouldPushToLeft = joinType match {
               case LeftOuter | LeftSemiOrAnti(_) | FullOuter => true
               case _ => hasOuterReferences(left)
             }
             val shouldPushToRight = joinType match {
               case RightOuter | FullOuter => true
               case _ => hasOuterReferences(right)
             }
+            if (shouldDecorrelatePredicates && !shouldPushToLeft && !shouldPushToRight
+              && !correlated.isEmpty) {

Review Comment:
   Can we change this check from correlated.isEmpty to predicates.isEmpty? I.e. if all the correlated predicates are in equalityCond and we can directly use them as join conditions, then I think we shouldn't need to add a DomainJoin.



##########
sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/optimizer/DecorrelateInnerQuerySuite.scala:
##########
@@ -454,4 +460,85 @@ class DecorrelateInnerQuerySuite extends PlanTest {
             DomainJoin(Seq(x), testRelation))))
     check(innerPlan, outerPlan, correctAnswer, Seq(x <=> x))
   }
+
+  test("SPARK-43780: aggregation in subquery with correlated equi-join") {
+    // Join in the subquery is on equi-predicates, so all the correlated references can be
+    // substituted by equivalent ones from the outer query, and domain join is not needed.
+    val outerPlan = testRelation
+    val innerPlan =
+      Aggregate(
+        Seq.empty[Expression], Seq(Alias(count(Literal(1)), "a")()),
+        Project(Seq(x, y, a3, b3),
+          Join(testRelation2, testRelation3, Inner,
+            Some(And(x === a3, y === OuterReference(a))), JoinHint.NONE)))
+
+    val correctAnswer =
+      Aggregate(
+        Seq(y), Seq(Alias(count(Literal(1)), "a")(), y),
+        Project(Seq(x, y, a3, b3),
+          Join(testRelation2, testRelation3, Inner, Some(x === a3), JoinHint.NONE)))
+    check(innerPlan, outerPlan, correctAnswer, Seq(y === a))
+  }
+
+  test("SPARK-43780: aggregation in subquery with correlated non-equi-join") {
+    // Join in the subquery is on non-equi-predicate, so we introduce a DomainJoin.
+    val outerPlan = testRelation
+    val innerPlan =
+      Aggregate(
+        Seq.empty[Expression], Seq(Alias(count(Literal(1)), "a")()),
+        Project(Seq(x, y, a3, b3),
+          Join(testRelation2, testRelation3, Inner,
+            Some(And(x === a3, y > OuterReference(a))), JoinHint.NONE)))
+    val correctAnswer =
+      Aggregate(
+        Seq(a), Seq(Alias(count(Literal(1)), "a")(), a),
+        Project(Seq(x, y, a3, b3, a),
+          Join(
+            DomainJoin(Seq(a), testRelation2),
+            testRelation3, Inner, Some(And(x === a3, y > a)), JoinHint.NONE)))
+    check(innerPlan, outerPlan, correctAnswer, Seq(a <=> a))
+  }
+
+  test("SPARK-43780: aggregation in subquery with correlated left join") {
+    // Join in the subquery is on equi-predicates, so all the correlated references can be
+    // substituted by equivalent ones from the outer query, and domain join is not needed.
+    val outerPlan = testRelation
+    val innerPlan =
+      Aggregate(
+        Seq.empty[Expression], Seq(Alias(count(Literal(1)), "a")()),
+        Project(Seq(x, y, a3, b3),
+          Join(testRelation2, testRelation3, LeftOuter,
+            Some(And(x === a3, y === OuterReference(a))), JoinHint.NONE)))

Review Comment:
   Can we also add a test with left join where the correlated predicate involves the right (outer) side? Then we would need to have a DomainJoin, right?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] [spark] allisonwang-db commented on pull request #41301: [SPARK-43780][SQL] Support correlated references in join predicates for scalar and lateral subqueries

Posted by "allisonwang-db (via GitHub)" <gi...@apache.org>.
allisonwang-db commented on PR #41301:
URL: https://github.com/apache/spark/pull/41301#issuecomment-1677719248

   cc @cloud-fan 


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] [spark] agubichev commented on a diff in pull request #41301: [SPARK-43780][SQL] Support correlated references in join predicates for scalar and lateral subqueries

Posted by "agubichev (via GitHub)" <gi...@apache.org>.
agubichev commented on code in PR #41301:
URL: https://github.com/apache/spark/pull/41301#discussion_r1290761412


##########
sql/catalyst/src/main/scala/org/apache/spark/sql/internal/SQLConf.scala:
##########
@@ -4268,6 +4268,14 @@ object SQLConf {
       .checkValue(_ >= 0, "The threshold of cached local relations must not be negative")
       .createWithDefault(64 * 1024 * 1024)
 
+  val DECORRELATE_JOIN_PREDICATE_ENABLED =
+    buildConf("spark.sql.optimizer.decorrelateJoinPredicate.enabled")
+      .internal()
+      .doc("Decorrelate subqueries with correlated references in join predicates.")
+      .version("3.5.0")

Review Comment:
   Done



##########
sql/core/src/test/resources/sql-tests/inputs/join-lateral.sql:
##########
@@ -101,6 +101,10 @@ SELECT * FROM t1 WHERE c1 = (SELECT MIN(a) FROM t2, LATERAL (SELECT c1 AS a));
 -- lateral join inside correlated subquery
 SELECT * FROM t1 WHERE c1 = (SELECT MIN(a) FROM t2, LATERAL (SELECT c1 AS a) WHERE c1 = t1.c1);
 
+-- join condition has a correlated reference to the left side of the lateral join
+SELECT * FROM t1 JOIN lateral (SELECT * FROM t2 JOIN t4 ON t2.c1 = t4.c1 AND t2.c1 = t1.c1);
+SELECT * FROM t1 LEFT JOIN lateral (SELECT * FROM t4 LEFT JOIN t2 ON t2.c1 = t4.c1 AND t2.c1 = t1.c1);
+

Review Comment:
   Done



##########
sql/catalyst/src/main/scala/org/apache/spark/sql/internal/SQLConf.scala:
##########
@@ -4268,6 +4268,14 @@ object SQLConf {
       .checkValue(_ >= 0, "The threshold of cached local relations must not be negative")
       .createWithDefault(64 * 1024 * 1024)
 
+  val DECORRELATE_JOIN_PREDICATE_ENABLED =
+    buildConf("spark.sql.optimizer.decorrelateJoinPredicate.enabled")
+      .internal()
+      .doc("Decorrelate subqueries with correlated references in join predicates.")

Review Comment:
   Done



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] [spark] agubichev commented on a diff in pull request #41301: [SPARK-43780][SQL] Support correlated references in join predicates for scalar and lateral subqueries

Posted by "agubichev (via GitHub)" <gi...@apache.org>.
agubichev commented on code in PR #41301:
URL: https://github.com/apache/spark/pull/41301#discussion_r1290765152


##########
sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/optimizer/DecorrelateInnerQuery.scala:
##########
@@ -804,18 +804,67 @@ object DecorrelateInnerQuery extends PredicateHelper {
             (d.copy(child = newChild), joinCond, outerReferenceMap)
 
           case j @ Join(left, right, joinType, condition, _) =>
-            val outerReferences = collectOuterReferences(j.expressions)
-            // Join condition containing outer references is not supported.
-            assert(outerReferences.isEmpty, s"Correlated column is not allowed in join: $j")
-            val newOuterReferences = parentOuterReferences ++ outerReferences
-            val shouldPushToLeft = joinType match {
+            def splitCorrelatedPredicate(condition: Option[Expression],
+                                         isInnerJoin: Boolean,
+                                         shouldDecorrelatePredicates: Boolean):
+            (Seq[Expression], Seq[Expression], Seq[Expression],
+              Seq[Expression], AttributeMap[Attribute]) = {

Review Comment:
   done



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] [spark] agubichev commented on a diff in pull request #41301: [SPARK-43780][SQL] Support correlated references in join predicates for scalar and lateral subqueries

Posted by "agubichev (via GitHub)" <gi...@apache.org>.
agubichev commented on code in PR #41301:
URL: https://github.com/apache/spark/pull/41301#discussion_r1290761285


##########
sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/optimizer/DecorrelateInnerQuery.scala:
##########
@@ -804,18 +804,74 @@ object DecorrelateInnerQuery extends PredicateHelper {
             (d.copy(child = newChild), joinCond, outerReferenceMap)
 
           case j @ Join(left, right, joinType, condition, _) =>
-            val outerReferences = collectOuterReferences(j.expressions)
-            // Join condition containing outer references is not supported.
-            assert(outerReferences.isEmpty, s"Correlated column is not allowed in join: $j")
-            val newOuterReferences = parentOuterReferences ++ outerReferences
-            val shouldPushToLeft = joinType match {
+            // Given 'condition', computes the tuple of
+            // (correlated, uncorrelated, equalityCond, predicates, equivalences).
+            // 'correlated' and 'uncorrelated' are the conjuncts with (resp. without)
+            // outer (correlated) references. Furthermore, correlated conjuncts are split
+            // into 'equalityCond' (those that are equalities) and all rest ('predicates').
+            // 'equivalences' track equivalent attributes given 'equalityCond'.
+            def splitCorrelatedPredicate(condition: Option[Expression],
+                                         isInnerJoin: Boolean,
+                                         shouldDecorrelatePredicates: Boolean):
+            (Seq[Expression], Seq[Expression], Seq[Expression],
+              Seq[Expression], AttributeMap[Attribute]) = {

Review Comment:
   Done



##########
sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/optimizer/DecorrelateInnerQuery.scala:
##########
@@ -804,18 +804,74 @@ object DecorrelateInnerQuery extends PredicateHelper {
             (d.copy(child = newChild), joinCond, outerReferenceMap)
 
           case j @ Join(left, right, joinType, condition, _) =>
-            val outerReferences = collectOuterReferences(j.expressions)
-            // Join condition containing outer references is not supported.
-            assert(outerReferences.isEmpty, s"Correlated column is not allowed in join: $j")
-            val newOuterReferences = parentOuterReferences ++ outerReferences
-            val shouldPushToLeft = joinType match {
+            // Given 'condition', computes the tuple of
+            // (correlated, uncorrelated, equalityCond, predicates, equivalences).
+            // 'correlated' and 'uncorrelated' are the conjuncts with (resp. without)
+            // outer (correlated) references. Furthermore, correlated conjuncts are split
+            // into 'equalityCond' (those that are equalities) and all rest ('predicates').
+            // 'equivalences' track equivalent attributes given 'equalityCond'.
+            def splitCorrelatedPredicate(condition: Option[Expression],
+                                         isInnerJoin: Boolean,
+                                         shouldDecorrelatePredicates: Boolean):

Review Comment:
   Done



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] [spark] cloud-fan commented on pull request #41301: [SPARK-43780][SQL] Support correlated references in join predicates for scalar and lateral subqueries

Posted by "cloud-fan (via GitHub)" <gi...@apache.org>.
cloud-fan commented on PR #41301:
URL: https://github.com/apache/spark/pull/41301#issuecomment-1678316497

   The failure is unrelated, thanks, merging to master!


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] [spark] jchen5 commented on a diff in pull request #41301: [SPARK-43780][SQL] Support correlated references in join predicates

Posted by "jchen5 (via GitHub)" <gi...@apache.org>.
jchen5 commented on code in PR #41301:
URL: https://github.com/apache/spark/pull/41301#discussion_r1273821132


##########
sql/core/src/test/resources/sql-tests/results/join-lateral.sql.out:
##########
@@ -572,6 +572,36 @@ struct<c1:int,c2:int>
 0	1
 
 
+-- !query
+SELECT * FROM t1 JOIN lateral (SELECT * FROM t2 JOIN t4 ON t2.c1 = t4.c1 AND t2.c1 = t1.c1)
+-- !query schema
+struct<c1:int,c2:int,c1:int,c2:int,c1:int,c2:int>
+-- !query output
+0	1	0	2	0	1
+0	1	0	2	0	2
+0	1	0	3	0	1
+0	1	0	3	0	2
+
+
+-- !query
+SELECT * FROM t1 LEFT JOIN lateral (SELECT * FROM t4 LEFT JOIN t2 ON t2.c1 = t4.c1 AND t2.c1 = t1.c1)
+-- !query schema
+struct<c1:int,c2:int,c1:int,c2:int,c1:int,c2:int>
+-- !query output
+0	1	0	1	0	2
+0	1	0	1	0	3
+0	1	0	2	0	2
+0	1	0	2	0	3
+0	1	1	1	NULL	NULL
+0	1	1	3	NULL	NULL
+1	2	0	1	0	2

Review Comment:
   Looks like these results are incorrect - for example, this tuple has t1.c1 = 1, t2.c1 = 0, t4.c1 = 0. So the result of t4 left join t2 should be a null row because t2.c1 = t1.c1 is false. 
   
   I checked out the PR and tried running it locally and actually got a different set of results which looks correct, so maybe you just need to update the golden file?



##########
sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/optimizer/DecorrelateInnerQuery.scala:
##########
@@ -826,8 +882,13 @@ object DecorrelateInnerQuery extends PredicateHelper {
             } else {
               (right, Nil, AttributeMap.empty[Attribute])
             }
-            val newOuterReferenceMap = leftOuterReferenceMap ++ rightOuterReferenceMap
-            val newJoinCond = leftJoinCond ++ rightJoinCond
+            val newOuterReferenceMap = leftOuterReferenceMap ++ rightOuterReferenceMap ++
+              equivalences
+            val newCorrelated =
+              if (shouldDecorrelatePredicates) {
+                replaceOuterReferences(predicates, newOuterReferenceMap)

Review Comment:
   Here we are pulling up the equalityCond predicates (predicates is everything that isn't in equalityCond) - they get moved from the join ON cond to the top-level join.
   
   I was thinking that it might not be safe to pull them up, we might need to leave them in the ON cond. It seems like there could be a problem if other columns of the outer table are referenced after the join, because whether they are null or not would depend on the outer join cond and we can't just pull it all the way to the top-level join.
   
   Example: what about inner query `t1 left join t2 on t1.x = t2.x and t2.y = outer(a) where t2.z is null` - looks like in the current code we'd pull the predicate up and it would become `t1 left join t2 on t1.x = t2.x where t2.z is null`, the set of rows that match the ON cond would change and so the `t2.z is null` filter would change.



##########
sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/optimizer/DecorrelateInnerQuery.scala:
##########
@@ -804,18 +804,67 @@ object DecorrelateInnerQuery extends PredicateHelper {
             (d.copy(child = newChild), joinCond, outerReferenceMap)
 
           case j @ Join(left, right, joinType, condition, _) =>
-            val outerReferences = collectOuterReferences(j.expressions)
-            // Join condition containing outer references is not supported.
-            assert(outerReferences.isEmpty, s"Correlated column is not allowed in join: $j")
-            val newOuterReferences = parentOuterReferences ++ outerReferences
-            val shouldPushToLeft = joinType match {
+            def splitCorrelatedPredicate(condition: Option[Expression],
+                                         isInnerJoin: Boolean,
+                                         shouldDecorrelatePredicates: Boolean):
+            (Seq[Expression], Seq[Expression], Seq[Expression],
+              Seq[Expression], AttributeMap[Attribute]) = {
+              // Similar to Filters above, we split the join condition (if present) into correlated
+              // and uncorrelated predicates, and separately handle joins under set and aggregation
+              // operations.
+              if (shouldDecorrelatePredicates) {
+                val conditions =
+                  if (condition.isDefined) splitConjunctivePredicates(condition.get)
+                  else Seq.empty[Expression]
+                val (correlated, uncorrelated) = conditions.partition(containsOuter)
+                val equivalences =
+                  if (underSetOp) AttributeMap.empty[Attribute]
+                  else collectEquivalentOuterReferences(correlated)
+                var (equalityCond, predicates) =
+                  if (underSetOp) (Seq.empty[Expression], correlated)
+                  else correlated.partition(canPullUpOverAgg)

Review Comment:
   Sorry, I mixed up some code in what I wrote, what I meant was like this:
   ```
   val (predicatesPulledUp, predicatesNotPulledUp) =
   if (underSetOp || !isInnerJoin) (Seq.empty[Expression], correlated)
   else if (aggregated) correlated.partition(canPullUpOverAgg)
   else (correlated, Seq.empty[Expression])
   ```
   HOWEVER on second thought I'm not sure if we actually can safely pull up those predicates for non-inner joins - see my other comment.
   
   But in the case of Filter or inner join conds, I think this would work. If we are under an aggregate, then only equality conds can be pulled up. But if we're not under an aggregate or set op, then we can pull all conds up.
   
   If we walk through the code in the Filter case for `if (aggregated || underSetOp)` compared to the `else` case, I think this works out to be equivalent. In the aggregate case, we add the pull-up-able predicates (equalityCond) to `newJoinCond` and the non-pull-upable predicates stay in `newFilter`. In `else` case (the non-aggregate non-setop case) we add all of `correlated` to `newJoinCond` and remove it all from `newFilter`.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] [spark] agubichev commented on a diff in pull request #41301: [SPARK-43780][SQL] Support correlated references in join predicates

Posted by "agubichev (via GitHub)" <gi...@apache.org>.
agubichev commented on code in PR #41301:
URL: https://github.com/apache/spark/pull/41301#discussion_r1282513447


##########
sql/core/src/test/resources/sql-tests/results/join-lateral.sql.out:
##########
@@ -572,6 +572,36 @@ struct<c1:int,c2:int>
 0	1
 
 
+-- !query
+SELECT * FROM t1 JOIN lateral (SELECT * FROM t2 JOIN t4 ON t2.c1 = t4.c1 AND t2.c1 = t1.c1)
+-- !query schema
+struct<c1:int,c2:int,c1:int,c2:int,c1:int,c2:int>
+-- !query output
+0	1	0	2	0	1
+0	1	0	2	0	2
+0	1	0	3	0	1
+0	1	0	3	0	2
+
+
+-- !query
+SELECT * FROM t1 LEFT JOIN lateral (SELECT * FROM t4 LEFT JOIN t2 ON t2.c1 = t4.c1 AND t2.c1 = t1.c1)
+-- !query schema
+struct<c1:int,c2:int,c1:int,c2:int,c1:int,c2:int>
+-- !query output
+0	1	0	1	0	2
+0	1	0	1	0	3
+0	1	0	2	0	2
+0	1	0	2	0	3
+0	1	1	1	NULL	NULL
+0	1	1	3	NULL	NULL
+1	2	0	1	0	2

Review Comment:
   fixed!



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] [spark] agubichev commented on a diff in pull request #41301: [SPARK-43780][SQL] Support correlated references in join predicates

Posted by "agubichev (via GitHub)" <gi...@apache.org>.
agubichev commented on code in PR #41301:
URL: https://github.com/apache/spark/pull/41301#discussion_r1267038851


##########
sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/optimizer/DecorrelateInnerQuery.scala:
##########
@@ -804,18 +804,67 @@ object DecorrelateInnerQuery extends PredicateHelper {
             (d.copy(child = newChild), joinCond, outerReferenceMap)
 
           case j @ Join(left, right, joinType, condition, _) =>
-            val outerReferences = collectOuterReferences(j.expressions)
-            // Join condition containing outer references is not supported.
-            assert(outerReferences.isEmpty, s"Correlated column is not allowed in join: $j")
-            val newOuterReferences = parentOuterReferences ++ outerReferences
-            val shouldPushToLeft = joinType match {
+            def splitCorrelatedPredicate(condition: Option[Expression],
+                                         isInnerJoin: Boolean,
+                                         shouldDecorrelatePredicates: Boolean):
+            (Seq[Expression], Seq[Expression], Seq[Expression],
+              Seq[Expression], AttributeMap[Attribute]) = {
+              // Similar to Filters above, we split the join condition (if present) into correlated
+              // and uncorrelated predicates, and separately handle joins under set and aggregation
+              // operations.
+              if (shouldDecorrelatePredicates) {
+                val conditions =
+                  if (condition.isDefined) splitConjunctivePredicates(condition.get)
+                  else Seq.empty[Expression]
+                val (correlated, uncorrelated) = conditions.partition(containsOuter)
+                val equivalences =
+                  if (underSetOp) AttributeMap.empty[Attribute]
+                  else collectEquivalentOuterReferences(correlated)
+                var (equalityCond, predicates) =
+                  if (underSetOp) (Seq.empty[Expression], correlated)
+                  else correlated.partition(canPullUpOverAgg)

Review Comment:
   Not sure I follow exactly, we still need to split the 'correlated' into two sets (equalities and everything else), right?



##########
sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/optimizer/DecorrelateInnerQuery.scala:
##########
@@ -804,18 +804,67 @@ object DecorrelateInnerQuery extends PredicateHelper {
             (d.copy(child = newChild), joinCond, outerReferenceMap)
 
           case j @ Join(left, right, joinType, condition, _) =>
-            val outerReferences = collectOuterReferences(j.expressions)
-            // Join condition containing outer references is not supported.
-            assert(outerReferences.isEmpty, s"Correlated column is not allowed in join: $j")
-            val newOuterReferences = parentOuterReferences ++ outerReferences
-            val shouldPushToLeft = joinType match {
+            def splitCorrelatedPredicate(condition: Option[Expression],
+                                         isInnerJoin: Boolean,
+                                         shouldDecorrelatePredicates: Boolean):
+            (Seq[Expression], Seq[Expression], Seq[Expression],
+              Seq[Expression], AttributeMap[Attribute]) = {
+              // Similar to Filters above, we split the join condition (if present) into correlated
+              // and uncorrelated predicates, and separately handle joins under set and aggregation
+              // operations.
+              if (shouldDecorrelatePredicates) {
+                val conditions =
+                  if (condition.isDefined) splitConjunctivePredicates(condition.get)
+                  else Seq.empty[Expression]
+                val (correlated, uncorrelated) = conditions.partition(containsOuter)
+                val equivalences =
+                  if (underSetOp) AttributeMap.empty[Attribute]
+                  else collectEquivalentOuterReferences(correlated)
+                var (equalityCond, predicates) =
+                  if (underSetOp) (Seq.empty[Expression], correlated)
+                  else correlated.partition(canPullUpOverAgg)
+                // Fully preserve the join predicate for non-inner joins.
+                if (!isInnerJoin) {
+                  predicates = predicates ++ equalityCond

Review Comment:
   Done



##########
sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/optimizer/DecorrelateInnerQuery.scala:
##########
@@ -804,18 +804,67 @@ object DecorrelateInnerQuery extends PredicateHelper {
             (d.copy(child = newChild), joinCond, outerReferenceMap)
 
           case j @ Join(left, right, joinType, condition, _) =>
-            val outerReferences = collectOuterReferences(j.expressions)
-            // Join condition containing outer references is not supported.
-            assert(outerReferences.isEmpty, s"Correlated column is not allowed in join: $j")
-            val newOuterReferences = parentOuterReferences ++ outerReferences
-            val shouldPushToLeft = joinType match {
+            def splitCorrelatedPredicate(condition: Option[Expression],
+                                         isInnerJoin: Boolean,
+                                         shouldDecorrelatePredicates: Boolean):
+            (Seq[Expression], Seq[Expression], Seq[Expression],
+              Seq[Expression], AttributeMap[Attribute]) = {

Review Comment:
   Done



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] [spark] agubichev commented on a diff in pull request #41301: [SPARK-43780][SQL] Support correlated references in join predicates

Posted by "agubichev (via GitHub)" <gi...@apache.org>.
agubichev commented on code in PR #41301:
URL: https://github.com/apache/spark/pull/41301#discussion_r1267038014


##########
sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/optimizer/DecorrelateInnerQuerySuite.scala:
##########
@@ -454,4 +460,85 @@ class DecorrelateInnerQuerySuite extends PlanTest {
             DomainJoin(Seq(x), testRelation))))
     check(innerPlan, outerPlan, correctAnswer, Seq(x <=> x))
   }
+
+  test("SPARK-43780: aggregation in subquery with correlated equi-join") {
+    // Join in the subquery is on equi-predicates, so all the correlated references can be
+    // substituted by equivalent ones from the outer query, and domain join is not needed.
+    val outerPlan = testRelation
+    val innerPlan =
+      Aggregate(
+        Seq.empty[Expression], Seq(Alias(count(Literal(1)), "a")()),
+        Project(Seq(x, y, a3, b3),
+          Join(testRelation2, testRelation3, Inner,
+            Some(And(x === a3, y === OuterReference(a))), JoinHint.NONE)))
+
+    val correctAnswer =
+      Aggregate(
+        Seq(y), Seq(Alias(count(Literal(1)), "a")(), y),
+        Project(Seq(x, y, a3, b3),
+          Join(testRelation2, testRelation3, Inner, Some(x === a3), JoinHint.NONE)))
+    check(innerPlan, outerPlan, correctAnswer, Seq(y === a))
+  }
+
+  test("SPARK-43780: aggregation in subquery with correlated non-equi-join") {
+    // Join in the subquery is on non-equi-predicate, so we introduce a DomainJoin.
+    val outerPlan = testRelation
+    val innerPlan =
+      Aggregate(
+        Seq.empty[Expression], Seq(Alias(count(Literal(1)), "a")()),
+        Project(Seq(x, y, a3, b3),
+          Join(testRelation2, testRelation3, Inner,
+            Some(And(x === a3, y > OuterReference(a))), JoinHint.NONE)))
+    val correctAnswer =
+      Aggregate(
+        Seq(a), Seq(Alias(count(Literal(1)), "a")(), a),
+        Project(Seq(x, y, a3, b3, a),
+          Join(
+            DomainJoin(Seq(a), testRelation2),
+            testRelation3, Inner, Some(And(x === a3, y > a)), JoinHint.NONE)))
+    check(innerPlan, outerPlan, correctAnswer, Seq(a <=> a))
+  }
+
+  test("SPARK-43780: aggregation in subquery with correlated left join") {
+    // Join in the subquery is on equi-predicates, so all the correlated references can be
+    // substituted by equivalent ones from the outer query, and domain join is not needed.
+    val outerPlan = testRelation
+    val innerPlan =
+      Aggregate(
+        Seq.empty[Expression], Seq(Alias(count(Literal(1)), "a")()),
+        Project(Seq(x, y, a3, b3),
+          Join(testRelation2, testRelation3, LeftOuter,
+            Some(And(x === a3, y === OuterReference(a))), JoinHint.NONE)))

Review Comment:
   Added.
   However, there is no DomainJoin. I don't think it matters which side (optional or preserved) is involved in the equivalence class, the variable can still be replaced without the domain join. 
   In the test that I added, the correlated conjunct is b3 == OuterReference(b). Since it is an outer join, after the join either b3 == b or b3 == NULL. Since the top-level join (introduced by the subquery) is a left outer join (unless narrowed), we can just substitute OuterReference(b) with b3, so that the NULL values for b3 will be present after the top-level join.
   
   



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] [spark] agubichev commented on a diff in pull request #41301: [SPARK-43780][SQL] Support correlated references in join predicates

Posted by "agubichev (via GitHub)" <gi...@apache.org>.
agubichev commented on code in PR #41301:
URL: https://github.com/apache/spark/pull/41301#discussion_r1282510138


##########
sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/optimizer/DecorrelateInnerQuery.scala:
##########
@@ -804,18 +804,67 @@ object DecorrelateInnerQuery extends PredicateHelper {
             (d.copy(child = newChild), joinCond, outerReferenceMap)
 
           case j @ Join(left, right, joinType, condition, _) =>
-            val outerReferences = collectOuterReferences(j.expressions)
-            // Join condition containing outer references is not supported.
-            assert(outerReferences.isEmpty, s"Correlated column is not allowed in join: $j")
-            val newOuterReferences = parentOuterReferences ++ outerReferences
-            val shouldPushToLeft = joinType match {
+            def splitCorrelatedPredicate(condition: Option[Expression],
+                                         isInnerJoin: Boolean,
+                                         shouldDecorrelatePredicates: Boolean):
+            (Seq[Expression], Seq[Expression], Seq[Expression],
+              Seq[Expression], AttributeMap[Attribute]) = {
+              // Similar to Filters above, we split the join condition (if present) into correlated
+              // and uncorrelated predicates, and separately handle joins under set and aggregation
+              // operations.
+              if (shouldDecorrelatePredicates) {
+                val conditions =
+                  if (condition.isDefined) splitConjunctivePredicates(condition.get)
+                  else Seq.empty[Expression]
+                val (correlated, uncorrelated) = conditions.partition(containsOuter)
+                val equivalences =
+                  if (underSetOp) AttributeMap.empty[Attribute]
+                  else collectEquivalentOuterReferences(correlated)
+                var (equalityCond, predicates) =
+                  if (underSetOp) (Seq.empty[Expression], correlated)
+                  else correlated.partition(canPullUpOverAgg)

Review Comment:
   I agree about non-equi joins -- no pullups.
   For equi joins, indeed looks like we can pull something up, but would not it be less efficient? usually we want to keep applicable filters as low as possible..
   



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] [spark] jchen5 commented on a diff in pull request #41301: [SPARK-43780][SQL] Support correlated references in join predicates

Posted by "jchen5 (via GitHub)" <gi...@apache.org>.
jchen5 commented on code in PR #41301:
URL: https://github.com/apache/spark/pull/41301#discussion_r1283253415


##########
sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/optimizer/DecorrelateInnerQuery.scala:
##########
@@ -804,18 +804,67 @@ object DecorrelateInnerQuery extends PredicateHelper {
             (d.copy(child = newChild), joinCond, outerReferenceMap)
 
           case j @ Join(left, right, joinType, condition, _) =>
-            val outerReferences = collectOuterReferences(j.expressions)
-            // Join condition containing outer references is not supported.
-            assert(outerReferences.isEmpty, s"Correlated column is not allowed in join: $j")
-            val newOuterReferences = parentOuterReferences ++ outerReferences
-            val shouldPushToLeft = joinType match {
+            def splitCorrelatedPredicate(condition: Option[Expression],
+                                         isInnerJoin: Boolean,
+                                         shouldDecorrelatePredicates: Boolean):
+            (Seq[Expression], Seq[Expression], Seq[Expression],
+              Seq[Expression], AttributeMap[Attribute]) = {
+              // Similar to Filters above, we split the join condition (if present) into correlated
+              // and uncorrelated predicates, and separately handle joins under set and aggregation
+              // operations.
+              if (shouldDecorrelatePredicates) {
+                val conditions =
+                  if (condition.isDefined) splitConjunctivePredicates(condition.get)
+                  else Seq.empty[Expression]
+                val (correlated, uncorrelated) = conditions.partition(containsOuter)
+                val equivalences =
+                  if (underSetOp) AttributeMap.empty[Attribute]
+                  else collectEquivalentOuterReferences(correlated)
+                var (equalityCond, predicates) =
+                  if (underSetOp) (Seq.empty[Expression], correlated)
+                  else correlated.partition(canPullUpOverAgg)

Review Comment:
   Hmm, good point. In fact, probably what is happening today in the Filter codepath is decorrelation takes the filter `a = outer(a)` and moves it to the top-level join, and then later it gets pushed back down again (if possible).



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] [spark] agubichev commented on a diff in pull request #41301: [SPARK-43780][SQL] Support correlated references in join predicates for scalar and lateral subqueries

Posted by "agubichev (via GitHub)" <gi...@apache.org>.
agubichev commented on code in PR #41301:
URL: https://github.com/apache/spark/pull/41301#discussion_r1293716062


##########
sql/catalyst/src/main/scala/org/apache/spark/sql/internal/SQLConf.scala:
##########
@@ -4268,6 +4268,15 @@ object SQLConf {
       .checkValue(_ >= 0, "The threshold of cached local relations must not be negative")
       .createWithDefault(64 * 1024 * 1024)
 
+  val DECORRELATE_JOIN_PREDICATE_ENABLED =
+    buildConf("spark.sql.optimizer.decorrelateJoinPredicate.enabled")
+      .internal()
+      .doc("Decorrelate scalar and lateral subqueries with correlated references in join " +
+        "predicates. Only enabled if DECORRELATE_INNER_QUERY_ENABLED is enabled")

Review Comment:
   done



##########
sql/core/src/test/resources/sql-tests/results/subquery/scalar-subquery/scalar-subquery-predicate.sql.out:
##########
@@ -660,3 +660,39 @@ where (select t2.id c
 struct<id:bigint>
 -- !query output
 1
+
+
+-- !query
+SELECT * FROM t0 WHERE t0a <
+(SELECT sum(t1c) FROM
+  (SELECT t1c
+   FROM   t1 JOIN t2 ON (t1a = t0a AND t2b = t1b))
+)
+-- !query schema
+struct<t0a:int,t0b:int>
+-- !query output
+1	1
+
+
+-- !query
+SELECT * FROM t0 WHERE t0a <
+(SELECT sum(t1c) FROM
+  (SELECT t1c
+   FROM   t1 JOIN t2 ON (t1a < t0a AND t2b >= t1b))
+)
+-- !query schema
+struct<t0a:int,t0b:int>
+-- !query output
+2	0

Review Comment:
   yes, verified with Postgresql



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] [spark] allisonwang-db commented on a diff in pull request #41301: [SPARK-43780][SQL] Support correlated references in join predicates for scalar and lateral subqueries

Posted by "allisonwang-db (via GitHub)" <gi...@apache.org>.
allisonwang-db commented on code in PR #41301:
URL: https://github.com/apache/spark/pull/41301#discussion_r1292408953


##########
sql/catalyst/src/main/scala/org/apache/spark/sql/internal/SQLConf.scala:
##########
@@ -4268,6 +4268,15 @@ object SQLConf {
       .checkValue(_ >= 0, "The threshold of cached local relations must not be negative")
       .createWithDefault(64 * 1024 * 1024)
 
+  val DECORRELATE_JOIN_PREDICATE_ENABLED =
+    buildConf("spark.sql.optimizer.decorrelateJoinPredicate.enabled")
+      .internal()
+      .doc("Decorrelate scalar and lateral subqueries with correlated references in join " +
+        "predicates. Only enabled if DECORRELATE_INNER_QUERY_ENABLED is enabled")

Review Comment:
   ```suggestion
           s"predicates. This configuration is only effective when '${DECORRELATE_INNER_QUERY_ENABLED.key}' is true.")
   ```



##########
sql/core/src/test/resources/sql-tests/results/subquery/scalar-subquery/scalar-subquery-predicate.sql.out:
##########
@@ -660,3 +660,39 @@ where (select t2.id c
 struct<id:bigint>
 -- !query output
 1
+
+
+-- !query
+SELECT * FROM t0 WHERE t0a <
+(SELECT sum(t1c) FROM
+  (SELECT t1c
+   FROM   t1 JOIN t2 ON (t1a = t0a AND t2b = t1b))
+)
+-- !query schema
+struct<t0a:int,t0b:int>
+-- !query output
+1	1
+
+
+-- !query
+SELECT * FROM t0 WHERE t0a <
+(SELECT sum(t1c) FROM
+  (SELECT t1c
+   FROM   t1 JOIN t2 ON (t1a < t0a AND t2b >= t1b))
+)
+-- !query schema
+struct<t0a:int,t0b:int>
+-- !query output
+2	0

Review Comment:
   Just want to make sure, we have verified these results are correct, right?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] [spark] cloud-fan closed pull request #41301: [SPARK-43780][SQL] Support correlated references in join predicates for scalar and lateral subqueries

Posted by "cloud-fan (via GitHub)" <gi...@apache.org>.
cloud-fan closed pull request #41301: [SPARK-43780][SQL] Support correlated references in join predicates for scalar and lateral subqueries
URL: https://github.com/apache/spark/pull/41301


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org