You are viewing a plain text version of this content. The canonical link for it is here.
Posted to reviews@spark.apache.org by "allisonwang-db (via GitHub)" <gi...@apache.org> on 2023/09/12 16:48:06 UTC

[GitHub] [spark] allisonwang-db commented on a diff in pull request #42705: [SPARK-36191][SQL] Handle limit and order by in correlated scalar (lateral) subqueries

allisonwang-db commented on code in PR #42705:
URL: https://github.com/apache/spark/pull/42705#discussion_r1323305701


##########
sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/CheckAnalysis.scala:
##########
@@ -1405,6 +1405,11 @@ trait CheckAnalysis extends PredicateHelper with LookupCatalog with QueryErrorsB
           failOnInvalidOuterReference(g)
           checkPlan(g.child, aggregated, canContainOuter)
 
+        // Correlated subquery can have a LIMIT clause
+        case l@Limit(_, input) =>

Review Comment:
   ```suggestion
           case l @ Limit(_, input) =>
   ```



##########
sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/optimizer/DecorrelateInnerQuery.scala:
##########
@@ -655,6 +655,39 @@ object DecorrelateInnerQuery extends PredicateHelper {
             val newProject = Project(newProjectList ++ referencesToAdd, newChild)
             (newProject, joinCond, outerReferenceMap)
 
+          case Limit(limit, input) =>
+            // LIMIT K (with potential ORDER BY) is decorrelated by computing K rows per every
+            // domain value via a row_number() window function. For example, for a subquery
+            // (SELECT T2.a FROM T2 WHERE T2.b = OuterReference(x) ORDER BY T2.c LIMIT 3)
+            // -- we need to get top 3 values of T2.a (ordering by T2.c) for every value of x.
+            // Following our general decorrelation procedure, 'x' is then replaced by T2.b, so the
+            // subquery is decorrelated as:
+            // SELECT * FROM (

Review Comment:
   Great explanation here!



##########
sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/optimizer/DecorrelateInnerQuerySuite.scala:
##########
@@ -59,6 +59,25 @@ class DecorrelateInnerQuerySuite extends PlanTest {
     joinCond.zip(conditions).foreach(e => compareExpressions(e._1, e._2))
   }
 
+  private def check(
+                     outputPlan: LogicalPlan,
+                     joinCond: Seq[Expression],
+                     correctAnswer: LogicalPlan,
+                     conditions: Seq[Expression]): Unit = {
+    assert(!hasOuterReferences(outputPlan))

Review Comment:
   nit: indent
   ```suggestion
     private def check(
         outputPlan: LogicalPlan,
         joinCond: Seq[Expression],
         correctAnswer: LogicalPlan,
         conditions: Seq[Expression]): Unit = {
       assert(!hasOuterReferences(outputPlan))
   ```



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org