You are viewing a plain text version of this content. The canonical link for it is here.
Posted to reviews@spark.apache.org by "cloud-fan (via GitHub)" <gi...@apache.org> on 2023/09/13 04:56:26 UTC

[GitHub] [spark] cloud-fan commented on a diff in pull request #42705: [SPARK-36191][SQL] Handle limit and order by in correlated scalar (lateral) subqueries

cloud-fan commented on code in PR #42705:
URL: https://github.com/apache/spark/pull/42705#discussion_r1323967912


##########
sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/optimizer/DecorrelateInnerQuery.scala:
##########
@@ -655,6 +655,39 @@ object DecorrelateInnerQuery extends PredicateHelper {
             val newProject = Project(newProjectList ++ referencesToAdd, newChild)
             (newProject, joinCond, outerReferenceMap)
 
+          case Limit(limit, input) =>
+            // LIMIT K (with potential ORDER BY) is decorrelated by computing K rows per every
+            // domain value via a row_number() window function. For example, for a subquery
+            // (SELECT T2.a FROM T2 WHERE T2.b = OuterReference(x) ORDER BY T2.c LIMIT 3)
+            // -- we need to get top 3 values of T2.a (ordering by T2.c) for every value of x.
+            // Following our general decorrelation procedure, 'x' is then replaced by T2.b, so the
+            // subquery is decorrelated as:
+            // SELECT * FROM (
+            //   SELECT T2.a, row_number() OVER (PARTITION BY T2.b ORDER BY T2.c) AS rn FROM T2)
+            // WHERE rn <= 3
+            val (child, ordering) = input match {
+              case Sort(order, _, child) => (child, order)
+              case _ => (input, Seq())
+            }
+            val (newChild, joinCond, outerReferenceMap) =
+              decorrelate(child, parentOuterReferences, aggregated = true, underSetOp)
+            val collectedChildOuterReferences = collectOuterReferencesInPlanTree(child)
+            // Add outer references to the PARTITION BY clause
+            val partitionFields = collectedChildOuterReferences.map(outerReferenceMap(_)).toSeq
+            val orderByFields = replaceOuterReferences(ordering, outerReferenceMap)
+
+            val rowNumber = WindowExpression(RowNumber(),
+              WindowSpecDefinition(partitionFields, orderByFields,
+                SpecifiedWindowFrame(RowFrame, UnboundedPreceding, CurrentRow)))
+            val rowNumberAlias = Alias(rowNumber, "rn_" + NamedExpression.newExprId.id)()

Review Comment:
   I'm not sure putting the id in the name is useful. We may refresh attribute ids in a query plan, and the id in name will be confusing. Shall we just use `rn`? The EXPLAIN result will print the Alias expr id, so having the id in the name is not useful.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org