You are viewing a plain text version of this content. The canonical link for it is here.
Posted to reviews@spark.apache.org by "agubichev (via GitHub)" <gi...@apache.org> on 2023/08/10 23:09:40 UTC

[GitHub] [spark] agubichev commented on a diff in pull request #41301: [SPARK-43780][SQL] Support correlated references in join predicates for scalar and lateral subqueries

agubichev commented on code in PR #41301:
URL: https://github.com/apache/spark/pull/41301#discussion_r1290761285


##########
sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/optimizer/DecorrelateInnerQuery.scala:
##########
@@ -804,18 +804,74 @@ object DecorrelateInnerQuery extends PredicateHelper {
             (d.copy(child = newChild), joinCond, outerReferenceMap)
 
           case j @ Join(left, right, joinType, condition, _) =>
-            val outerReferences = collectOuterReferences(j.expressions)
-            // Join condition containing outer references is not supported.
-            assert(outerReferences.isEmpty, s"Correlated column is not allowed in join: $j")
-            val newOuterReferences = parentOuterReferences ++ outerReferences
-            val shouldPushToLeft = joinType match {
+            // Given 'condition', computes the tuple of
+            // (correlated, uncorrelated, equalityCond, predicates, equivalences).
+            // 'correlated' and 'uncorrelated' are the conjuncts with (resp. without)
+            // outer (correlated) references. Furthermore, correlated conjuncts are split
+            // into 'equalityCond' (those that are equalities) and all rest ('predicates').
+            // 'equivalences' track equivalent attributes given 'equalityCond'.
+            def splitCorrelatedPredicate(condition: Option[Expression],
+                                         isInnerJoin: Boolean,
+                                         shouldDecorrelatePredicates: Boolean):
+            (Seq[Expression], Seq[Expression], Seq[Expression],
+              Seq[Expression], AttributeMap[Attribute]) = {

Review Comment:
   Done



##########
sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/optimizer/DecorrelateInnerQuery.scala:
##########
@@ -804,18 +804,74 @@ object DecorrelateInnerQuery extends PredicateHelper {
             (d.copy(child = newChild), joinCond, outerReferenceMap)
 
           case j @ Join(left, right, joinType, condition, _) =>
-            val outerReferences = collectOuterReferences(j.expressions)
-            // Join condition containing outer references is not supported.
-            assert(outerReferences.isEmpty, s"Correlated column is not allowed in join: $j")
-            val newOuterReferences = parentOuterReferences ++ outerReferences
-            val shouldPushToLeft = joinType match {
+            // Given 'condition', computes the tuple of
+            // (correlated, uncorrelated, equalityCond, predicates, equivalences).
+            // 'correlated' and 'uncorrelated' are the conjuncts with (resp. without)
+            // outer (correlated) references. Furthermore, correlated conjuncts are split
+            // into 'equalityCond' (those that are equalities) and all rest ('predicates').
+            // 'equivalences' track equivalent attributes given 'equalityCond'.
+            def splitCorrelatedPredicate(condition: Option[Expression],
+                                         isInnerJoin: Boolean,
+                                         shouldDecorrelatePredicates: Boolean):

Review Comment:
   Done



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org