You are viewing a plain text version of this content. The canonical link for it is here.
Posted to reviews@spark.apache.org by GitBox <gi...@apache.org> on 2021/03/01 22:41:31 UTC

[GitHub] [spark] c21 commented on a change in pull request #31691: [SPARK-34575][SQL] Push down limit through window when partitionSpec is empty

c21 commented on a change in pull request #31691:
URL: https://github.com/apache/spark/pull/31691#discussion_r585100057



##########
File path: sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/optimizer/Optimizer.scala
##########
@@ -613,6 +613,15 @@ object LimitPushDown extends Rule[LogicalPlan] {
         case _ => join
       }
       LocalLimit(exp, newJoin)
+
+    case LocalLimit(limitExpr @ IntegerLiteral(limitVal),

Review comment:
       nit: shall we update comment in L554 and here, similar to `UNION ALL` and `JOIN`?

##########
File path: sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/optimizer/Optimizer.scala
##########
@@ -613,6 +613,15 @@ object LimitPushDown extends Rule[LogicalPlan] {
         case _ => join
       }
       LocalLimit(exp, newJoin)
+
+    case LocalLimit(limitExpr @ IntegerLiteral(limitVal),
+        window @ Window(Seq(Alias(WindowExpression(_: RankLike | _: RowNumber,
+            WindowSpecDefinition(Nil, orderSpec,
+                SpecifiedWindowFrame(RowFrame, UnboundedPreceding, CurrentRow))), _)), _, _, child))
+        if child.maxRows.forall( _ > limitVal) =>
+      LocalLimit(

Review comment:
       Do we still need the `LocalLimit` here? We already restrict the window expression to be `RankLike` and `RowNumber`, so we know the number of rows will not change before & after window, right?

##########
File path: sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/optimizer/Optimizer.scala
##########
@@ -613,6 +613,15 @@ object LimitPushDown extends Rule[LogicalPlan] {
         case _ => join
       }
       LocalLimit(exp, newJoin)
+
+    case LocalLimit(limitExpr @ IntegerLiteral(limitVal),
+        window @ Window(Seq(Alias(WindowExpression(_: RankLike | _: RowNumber,
+            WindowSpecDefinition(Nil, orderSpec,
+                SpecifiedWindowFrame(RowFrame, UnboundedPreceding, CurrentRow))), _)), _, _, child))
+        if child.maxRows.forall( _ > limitVal) =>
+      LocalLimit(
+        limitExpr = limitExpr,
+        child = window.copy(child = Limit(limitExpr, Sort(orderSpec, true, child))))

Review comment:
       Wondering why do we need an extra `Sort` here? Shouldn't physical plan rule `EnsureRequirements` add the sort between window and limit?

##########
File path: sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/optimizer/Optimizer.scala
##########
@@ -613,6 +613,15 @@ object LimitPushDown extends Rule[LogicalPlan] {
         case _ => join
       }
       LocalLimit(exp, newJoin)
+
+    case LocalLimit(limitExpr @ IntegerLiteral(limitVal),
+        window @ Window(Seq(Alias(WindowExpression(_: RankLike | _: RowNumber,
+            WindowSpecDefinition(Nil, orderSpec,
+                SpecifiedWindowFrame(RowFrame, UnboundedPreceding, CurrentRow))), _)), _, _, child))
+        if child.maxRows.forall( _ > limitVal) =>

Review comment:
       nit: one extra unnecessary space: `( _`




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org