You are viewing a plain text version of this content. The canonical link for it is here.
Posted to reviews@spark.apache.org by "zml1206 (via GitHub)" <gi...@apache.org> on 2023/12/04 02:45:38 UTC

[PR] [SPARK-46228][SQL] Insert window group limit node for cumulative aggregation with limit [spark]

zml1206 opened a new pull request, #44145:
URL: https://github.com/apache/spark/pull/44145

   
   ### What changes were proposed in this pull request?
   Insert node WindowGroupLimit to filter out unnecessary rows based on cumulative aggregation with limit.
   
   it supports following pattern:
   ```
   SELECT (... (row_number|rank|dense_rank|sum|max...)()
       OVER (
   PARTITION BY ...
   ORDER BY  ... ) AS v)
   LIMIT ...
   ```
   
   ### Why are the changes needed?
   Reduce the shuffle write to improve the performance. 
   
   
   ### Does this PR introduce _any_ user-facing change?
   No.
   
   
   ### How was this patch tested?
   UT.
   
   
   ### Was this patch authored or co-authored using generative AI tooling?
   No.
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


Re: [PR] [SPARK-46228][SQL] Insert window group limit node for limit outside of window [spark]

Posted by "zml1206 (via GitHub)" <gi...@apache.org>.
zml1206 commented on code in PR #44145:
URL: https://github.com/apache/spark/pull/44145#discussion_r1421267218


##########
sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/optimizer/InferWindowGroupLimit.scala:
##########
@@ -68,10 +72,55 @@ object InferWindowGroupLimit extends Rule[LogicalPlan] with PredicateHelper {
     case _ => false
   }
 
+  /**
+   * Whether support inferring WindowGroupLimit from Limit outside of Window. Check if:
+   * 1. The window orderSpec exists unfoldable one or all window expressions should use the same
+   *  expanding window.
+   * 2. All window expressions should not have SizeBasedWindowFunction.
+   * 3. The Limit could not be pushed down through Window.
+   */
+  private def limitSupport(limit: Int, window: Window): Boolean =
+    limit <= conf.windowGroupLimitThreshold && window.child.maxRows.forall(_ > limit) &&
+      !window.child.isInstanceOf[WindowGroupLimit] &&
+      (window.orderSpec.exists(!_.child.foldable) ||
+        window.windowExpressions.forall(isExpandingWindow)) &&
+      window.windowExpressions.forall {
+        case Alias(WindowExpression(windowFunction, WindowSpecDefinition(_, _,
+        SpecifiedWindowFrame(_, UnboundedPreceding, CurrentRow))), _)
+          if !windowFunction.isInstanceOf[SizeBasedWindowFunction] &&

Review Comment:
   Limit outside of window can be early pruned by `WindowGroupLimit`, the following three conditions must be met:
   1.The window orderSpec exists unfoldable one or all window expressions are `RowFrame`. Because when orderSpec is foldable and window expressions is `RangeFrame`, aggregation calculation requires the use of all rows in the window group.
   2.All window expressions should not have `SizeBasedWindowFunction`. Because aggregation calculation of `SizeBasedWindowFunction` same requires the use of all rows in the window group.
   3.The Limit could not be pushed down through Window. Because `LimitPushDownThroughWindow` have better performance than `WindowGroupLimit`.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


Re: [PR] [SPARK-46228][SQL] Insert window group limit node for cumulative aggregation with limit [spark]

Posted by "zml1206 (via GitHub)" <gi...@apache.org>.
zml1206 commented on PR #44145:
URL: https://github.com/apache/spark/pull/44145#issuecomment-1839890684

   @beliefer If you have time, can you help me take a look? cc @cloud-fan 


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


Re: [PR] [SPARK-46228][SQL] Insert window group limit node for cumulative aggregation with limit [spark]

Posted by "beliefer (via GitHub)" <gi...@apache.org>.
beliefer commented on code in PR #44145:
URL: https://github.com/apache/spark/pull/44145#discussion_r1416825349


##########
sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/optimizer/InferWindowGroupLimit.scala:
##########
@@ -17,21 +17,24 @@
 
 package org.apache.spark.sql.catalyst.optimizer
 
-import org.apache.spark.sql.catalyst.expressions.{Alias, Attribute, CurrentRow, DenseRank, EqualTo, Expression, GreaterThan, GreaterThanOrEqual, IntegerLiteral, LessThan, LessThanOrEqual, Literal, NamedExpression, PredicateHelper, Rank, RowFrame, RowNumber, SpecifiedWindowFrame, UnboundedPreceding, WindowExpression, WindowSpecDefinition}
-import org.apache.spark.sql.catalyst.plans.logical.{Filter, Limit, LocalRelation, LogicalPlan, Window, WindowGroupLimit}
+import org.apache.spark.sql.catalyst.expressions.{Alias, Attribute, CurrentRow, DenseRank, EqualTo, Expression, GreaterThan, GreaterThanOrEqual, IntegerLiteral, LessThan, LessThanOrEqual, Literal, NamedExpression, PredicateHelper, Rank, RowNumber, SizeBasedWindowFunction, SpecifiedWindowFrame, UnboundedPreceding, WindowExpression, WindowSpecDefinition}
+import org.apache.spark.sql.catalyst.plans.logical._
 import org.apache.spark.sql.catalyst.rules.Rule
-import org.apache.spark.sql.catalyst.trees.TreePattern.{FILTER, WINDOW}
+import org.apache.spark.sql.catalyst.trees.TreePattern.{FILTER, LIMIT, WINDOW}
 
 /**
  * Inserts a `WindowGroupLimit` below `Window` if the `Window` has rank-like functions
- * and the function results are further filtered by limit-like predicates. Example query:
+ * and the function results are further filtered by limit-like predicates or cumulative
+ * aggregation with limit excludes `SizeBasedWindowFunction`. Example query:
  * {{{
  *   SELECT *, ROW_NUMBER() OVER(PARTITION BY k ORDER BY a) AS rn FROM Tab1 WHERE rn = 5
  *   SELECT *, ROW_NUMBER() OVER(PARTITION BY k ORDER BY a) AS rn FROM Tab1 WHERE 5 = rn
  *   SELECT *, ROW_NUMBER() OVER(PARTITION BY k ORDER BY a) AS rn FROM Tab1 WHERE rn < 5
  *   SELECT *, ROW_NUMBER() OVER(PARTITION BY k ORDER BY a) AS rn FROM Tab1 WHERE 5 > rn
  *   SELECT *, ROW_NUMBER() OVER(PARTITION BY k ORDER BY a) AS rn FROM Tab1 WHERE rn <= 5
  *   SELECT *, ROW_NUMBER() OVER(PARTITION BY k ORDER BY a) AS rn FROM Tab1 WHERE 5 >= rn
+ *   SELECT *, ROW_NUMBER() OVER(PARTITION BY k ORDER BY a) AS rn FROM Tab1 limit 5

Review Comment:
   I got it now. I will take the time to review as soon as possible.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


Re: [PR] [SPARK-46228][SQL] Insert window group limit node for cumulative aggregation with limit [spark]

Posted by "beliefer (via GitHub)" <gi...@apache.org>.
beliefer commented on code in PR #44145:
URL: https://github.com/apache/spark/pull/44145#discussion_r1419856067


##########
sql/core/src/test/scala/org/apache/spark/sql/DataFrameWindowFunctionsSuite.scala:
##########
@@ -1276,6 +1276,147 @@ class DataFrameWindowFunctionsSuite extends QueryTest
     )
   }
 
+  test("SPARK-46228: Insert window group limit node for cumulative aggregation with limit") {
+
+    val nullStr: String = null
+    val df = Seq(
+      ("a", 0, "c"),
+      ("a", 1, "x"),
+      ("a", 2, "y"),
+      ("a", 3, "z"),
+      ("a", 4, ""),
+      ("a", 4, ""),
+      ("b", 1, "h"),
+      ("b", 1, "n"),
+      ("c", 1, "z"),
+      ("c", 1, "a"),
+      ("c", 2, nullStr)).toDF("key", "value", "order")
+    val window = Window.partitionBy($"key").orderBy($"order".asc_nulls_first)
+    val window2 = Window.partitionBy($"key").orderBy($"order".desc_nulls_first)
+    val window3 = Window.orderBy($"order".asc_nulls_first)
+
+    Seq(true, false).foreach { enableEvaluator =>
+      withSQLConf(SQLConf.USE_PARTITION_EVALUATOR.key -> enableEvaluator.toString) {
+        Seq(-1, 100).foreach { threshold =>
+          withSQLConf(SQLConf.WINDOW_GROUP_LIMIT_THRESHOLD.key -> threshold.toString) {
+            // RowFrame
+            val existWindowGroupLimitRowFrame =
+              df.withColumn("sum_value", sum("value").over(window))
+                .limit(1)
+                .queryExecution.optimizedPlan.exists {
+                case _: WindowGroupLimit => true
+                case _ => false
+              }
+            if (threshold == -1) {
+              assert(!existWindowGroupLimitRowFrame)
+            } else {
+              assert(existWindowGroupLimitRowFrame)
+            }
+            checkAnswer(df.withColumn("rn", row_number().over(window)).limit(1),
+              Seq(
+                Row("a", 4, "", 1)
+              )
+            )
+            checkAnswer(df.withColumn("rn", rank().over(window2)).limit(7),
+              Seq(
+                Row("a", 0, "c", 4),
+                Row("a", 1, "x", 3),
+                Row("a", 2, "y", 2),
+                Row("a", 3, "z", 1),
+                Row("a", 4, "", 5),
+                Row("a", 4, "", 5),
+                Row("b", 1, "n", 1)
+              )
+            )
+            checkAnswer(df.withColumn("rn", dense_rank().over(window3)).limit(11),
+              Seq(
+                Row("a", 0, "c", 4),
+                Row("a", 1, "x", 7),
+                Row("a", 2, "y", 8),
+                Row("a", 3, "z", 9),
+                Row("a", 4, "", 2),
+                Row("a", 4, "", 2),
+                Row("b", 1, "h", 5),
+                Row("b", 1, "n", 6),
+                Row("c", 1, "a", 3),
+                Row("c", 1, "z", 9),
+                Row("c", 2, nullStr, 1)
+              )
+            )
+
+            // RangeFrame
+            val existWindowGroupLimitRangeFrame =
+              df.withColumn("sum_value", sum("value").over(window))
+              .limit(1)
+              .queryExecution.optimizedPlan.exists {
+                case _: WindowGroupLimit => true
+                case _ => false
+              }
+            if (threshold == -1) {
+              assert(!existWindowGroupLimitRangeFrame)
+            } else {
+              assert(existWindowGroupLimitRangeFrame)
+            }
+            checkAnswer(df.withColumn("sum_value", sum("value").over(window)).limit(1),
+              Seq(
+                Row("a", 4, "", 8)
+              )
+            )
+            checkAnswer(df.withColumn("sum_value", sum("value").over(window2)).limit(7),
+              Seq(
+                Row("a", 0, "c", 6),
+                Row("a", 1, "x", 6),
+                Row("a", 2, "y", 5),
+                Row("a", 3, "z", 3),
+                Row("a", 4, "", 14),
+                Row("a", 4, "", 14),
+                Row("b", 1, "n", 1)
+              )
+            )
+            checkAnswer(df.withColumn("sum_value", sum("value").over(window3)).limit(11),
+              Seq(
+                Row("a", 0, "c", 11),
+                Row("a", 1, "x", 14),
+                Row("a", 2, "y", 16),
+                Row("a", 3, "z", 20),
+                Row("a", 4, "", 10),
+                Row("a", 4, "", 10),
+                Row("b", 1, "h", 12),
+                Row("b", 1, "n", 13),
+                Row("c", 1, "a", 11),
+                Row("c", 1, "z", 20),
+                Row("c", 2, nullStr, 2)
+              )
+            )
+
+            // Both RowFrame and RangeFrame exist
+            checkAnswer(
+              df.withColumn("sum_value1", sum("value")
+                .over(window.rowsBetween(Window.unboundedPreceding, Window.currentRow)))
+              .withColumn("sum_value2", sum("value")
+                .over(window.rangeBetween(Window.unboundedPreceding, Window.currentRow))).limit(1),
+              Seq(
+                Row("a", 4, "", 4, 8)

Review Comment:
   Looks good.



##########
sql/core/src/test/scala/org/apache/spark/sql/DataFrameWindowFunctionsSuite.scala:
##########
@@ -1276,6 +1276,147 @@ class DataFrameWindowFunctionsSuite extends QueryTest
     )
   }
 
+  test("SPARK-46228: Insert window group limit node for cumulative aggregation with limit") {
+
+    val nullStr: String = null
+    val df = Seq(
+      ("a", 0, "c"),
+      ("a", 1, "x"),
+      ("a", 2, "y"),
+      ("a", 3, "z"),
+      ("a", 4, ""),
+      ("a", 4, ""),
+      ("b", 1, "h"),
+      ("b", 1, "n"),
+      ("c", 1, "z"),
+      ("c", 1, "a"),
+      ("c", 2, nullStr)).toDF("key", "value", "order")
+    val window = Window.partitionBy($"key").orderBy($"order".asc_nulls_first)
+    val window2 = Window.partitionBy($"key").orderBy($"order".desc_nulls_first)
+    val window3 = Window.orderBy($"order".asc_nulls_first)
+
+    Seq(true, false).foreach { enableEvaluator =>
+      withSQLConf(SQLConf.USE_PARTITION_EVALUATOR.key -> enableEvaluator.toString) {
+        Seq(-1, 100).foreach { threshold =>
+          withSQLConf(SQLConf.WINDOW_GROUP_LIMIT_THRESHOLD.key -> threshold.toString) {
+            // RowFrame
+            val existWindowGroupLimitRowFrame =

Review Comment:
   We can move this one to sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/optimizer/InferWindowGroupLimitSuite.scala



##########
sql/core/src/test/scala/org/apache/spark/sql/DataFrameWindowFunctionsSuite.scala:
##########
@@ -1276,6 +1276,147 @@ class DataFrameWindowFunctionsSuite extends QueryTest
     )
   }
 
+  test("SPARK-46228: Insert window group limit node for cumulative aggregation with limit") {
+
+    val nullStr: String = null
+    val df = Seq(
+      ("a", 0, "c"),
+      ("a", 1, "x"),
+      ("a", 2, "y"),
+      ("a", 3, "z"),
+      ("a", 4, ""),
+      ("a", 4, ""),
+      ("b", 1, "h"),
+      ("b", 1, "n"),
+      ("c", 1, "z"),
+      ("c", 1, "a"),
+      ("c", 2, nullStr)).toDF("key", "value", "order")
+    val window = Window.partitionBy($"key").orderBy($"order".asc_nulls_first)
+    val window2 = Window.partitionBy($"key").orderBy($"order".desc_nulls_first)
+    val window3 = Window.orderBy($"order".asc_nulls_first)
+
+    Seq(true, false).foreach { enableEvaluator =>
+      withSQLConf(SQLConf.USE_PARTITION_EVALUATOR.key -> enableEvaluator.toString) {
+        Seq(-1, 100).foreach { threshold =>
+          withSQLConf(SQLConf.WINDOW_GROUP_LIMIT_THRESHOLD.key -> threshold.toString) {
+            // RowFrame
+            val existWindowGroupLimitRowFrame =
+              df.withColumn("sum_value", sum("value").over(window))
+                .limit(1)
+                .queryExecution.optimizedPlan.exists {
+                case _: WindowGroupLimit => true
+                case _ => false
+              }
+            if (threshold == -1) {
+              assert(!existWindowGroupLimitRowFrame)
+            } else {
+              assert(existWindowGroupLimitRowFrame)
+            }
+            checkAnswer(df.withColumn("rn", row_number().over(window)).limit(1),
+              Seq(
+                Row("a", 4, "", 1)
+              )
+            )
+            checkAnswer(df.withColumn("rn", rank().over(window2)).limit(7),
+              Seq(
+                Row("a", 0, "c", 4),
+                Row("a", 1, "x", 3),
+                Row("a", 2, "y", 2),
+                Row("a", 3, "z", 1),
+                Row("a", 4, "", 5),
+                Row("a", 4, "", 5),
+                Row("b", 1, "n", 1)
+              )
+            )
+            checkAnswer(df.withColumn("rn", dense_rank().over(window3)).limit(11),
+              Seq(
+                Row("a", 0, "c", 4),
+                Row("a", 1, "x", 7),
+                Row("a", 2, "y", 8),
+                Row("a", 3, "z", 9),
+                Row("a", 4, "", 2),
+                Row("a", 4, "", 2),
+                Row("b", 1, "h", 5),
+                Row("b", 1, "n", 6),
+                Row("c", 1, "a", 3),
+                Row("c", 1, "z", 9),
+                Row("c", 2, nullStr, 1)
+              )
+            )
+
+            // RangeFrame
+            val existWindowGroupLimitRangeFrame =

Review Comment:
   ditto.



##########
sql/core/src/test/scala/org/apache/spark/sql/DataFrameWindowFunctionsSuite.scala:
##########
@@ -1276,6 +1276,147 @@ class DataFrameWindowFunctionsSuite extends QueryTest
     )
   }
 
+  test("SPARK-46228: Insert window group limit node for cumulative aggregation with limit") {
+
+    val nullStr: String = null
+    val df = Seq(
+      ("a", 0, "c"),
+      ("a", 1, "x"),
+      ("a", 2, "y"),
+      ("a", 3, "z"),
+      ("a", 4, ""),
+      ("a", 4, ""),
+      ("b", 1, "h"),
+      ("b", 1, "n"),
+      ("c", 1, "z"),
+      ("c", 1, "a"),
+      ("c", 2, nullStr)).toDF("key", "value", "order")
+    val window = Window.partitionBy($"key").orderBy($"order".asc_nulls_first)
+    val window2 = Window.partitionBy($"key").orderBy($"order".desc_nulls_first)
+    val window3 = Window.orderBy($"order".asc_nulls_first)
+
+    Seq(true, false).foreach { enableEvaluator =>
+      withSQLConf(SQLConf.USE_PARTITION_EVALUATOR.key -> enableEvaluator.toString) {
+        Seq(-1, 100).foreach { threshold =>
+          withSQLConf(SQLConf.WINDOW_GROUP_LIMIT_THRESHOLD.key -> threshold.toString) {
+            // RowFrame
+            val existWindowGroupLimitRowFrame =
+              df.withColumn("sum_value", sum("value").over(window))
+                .limit(1)
+                .queryExecution.optimizedPlan.exists {
+                case _: WindowGroupLimit => true
+                case _ => false
+              }
+            if (threshold == -1) {
+              assert(!existWindowGroupLimitRowFrame)
+            } else {
+              assert(existWindowGroupLimitRowFrame)
+            }
+            checkAnswer(df.withColumn("rn", row_number().over(window)).limit(1),
+              Seq(
+                Row("a", 4, "", 1)
+              )
+            )
+            checkAnswer(df.withColumn("rn", rank().over(window2)).limit(7),
+              Seq(
+                Row("a", 0, "c", 4),
+                Row("a", 1, "x", 3),
+                Row("a", 2, "y", 2),
+                Row("a", 3, "z", 1),
+                Row("a", 4, "", 5),
+                Row("a", 4, "", 5),
+                Row("b", 1, "n", 1)
+              )
+            )
+            checkAnswer(df.withColumn("rn", dense_rank().over(window3)).limit(11),
+              Seq(
+                Row("a", 0, "c", 4),
+                Row("a", 1, "x", 7),
+                Row("a", 2, "y", 8),
+                Row("a", 3, "z", 9),
+                Row("a", 4, "", 2),
+                Row("a", 4, "", 2),
+                Row("b", 1, "h", 5),
+                Row("b", 1, "n", 6),
+                Row("c", 1, "a", 3),
+                Row("c", 1, "z", 9),
+                Row("c", 2, nullStr, 1)
+              )
+            )
+
+            // RangeFrame
+            val existWindowGroupLimitRangeFrame =
+              df.withColumn("sum_value", sum("value").over(window))
+              .limit(1)
+              .queryExecution.optimizedPlan.exists {
+                case _: WindowGroupLimit => true
+                case _ => false
+              }
+            if (threshold == -1) {
+              assert(!existWindowGroupLimitRangeFrame)
+            } else {
+              assert(existWindowGroupLimitRangeFrame)
+            }
+            checkAnswer(df.withColumn("sum_value", sum("value").over(window)).limit(1),
+              Seq(
+                Row("a", 4, "", 8)
+              )
+            )
+            checkAnswer(df.withColumn("sum_value", sum("value").over(window2)).limit(7),
+              Seq(
+                Row("a", 0, "c", 6),
+                Row("a", 1, "x", 6),
+                Row("a", 2, "y", 5),
+                Row("a", 3, "z", 3),
+                Row("a", 4, "", 14),
+                Row("a", 4, "", 14),
+                Row("b", 1, "n", 1)
+              )
+            )
+            checkAnswer(df.withColumn("sum_value", sum("value").over(window3)).limit(11),
+              Seq(
+                Row("a", 0, "c", 11),
+                Row("a", 1, "x", 14),
+                Row("a", 2, "y", 16),
+                Row("a", 3, "z", 20),
+                Row("a", 4, "", 10),
+                Row("a", 4, "", 10),
+                Row("b", 1, "h", 12),
+                Row("b", 1, "n", 13),
+                Row("c", 1, "a", 11),
+                Row("c", 1, "z", 20),
+                Row("c", 2, nullStr, 2)
+              )
+            )
+
+            // Both RowFrame and RangeFrame exist
+            checkAnswer(
+              df.withColumn("sum_value1", sum("value")
+                .over(window.rowsBetween(Window.unboundedPreceding, Window.currentRow)))
+              .withColumn("sum_value2", sum("value")
+                .over(window.rangeBetween(Window.unboundedPreceding, Window.currentRow))).limit(1),
+              Seq(
+                Row("a", 4, "", 4, 8)
+              )
+            )
+
+            // Choose LimitPushDownThroughWindow instead of WindowGroupLimit if the
+            // window function is rank-like and Window partitionSpec is empty.
+            val existWindowGroupLimit =

Review Comment:
   ditto.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


Re: [PR] [SPARK-46228][SQL] Insert window group limit node for limit outside of window [spark]

Posted by "beliefer (via GitHub)" <gi...@apache.org>.
beliefer commented on code in PR #44145:
URL: https://github.com/apache/spark/pull/44145#discussion_r1476934238


##########
sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/optimizer/InferWindowGroupLimit.scala:
##########
@@ -69,10 +73,55 @@ object InferWindowGroupLimit extends Rule[LogicalPlan] with PredicateHelper {
     case _ => false
   }
 
+  /**
+   * Whether support inferring WindowGroupLimit from Limit outside of Window. Check if:
+   * 1. The window orderSpec exists unfoldable one or all window expressions should use the same
+   *  expanding window.
+   * 2. All window expressions should not have SizeBasedWindowFunction.
+   * 3. The Limit could not be pushed down through Window.
+   */
+  private def limitSupport(limit: Int, window: Window): Boolean =
+    limit <= conf.windowGroupLimitThreshold && window.child.maxRows.forall(_ > limit) &&
+      !window.child.isInstanceOf[WindowGroupLimit] &&
+      (window.orderSpec.exists(!_.child.foldable) ||
+        window.windowExpressions.forall(isExpandingWindow)) &&
+      window.windowExpressions.forall {
+        case Alias(WindowExpression(windowFunction, WindowSpecDefinition(_, _,
+        SpecifiedWindowFrame(_, UnboundedPreceding, CurrentRow))), _)
+          if !windowFunction.isInstanceOf[SizeBasedWindowFunction] &&

Review Comment:
   We can reuse `isExpandingWindow` here.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


Re: [PR] [SPARK-46228][SQL] Insert window group limit node for limit outside of window [spark]

Posted by "cloud-fan (via GitHub)" <gi...@apache.org>.
cloud-fan commented on code in PR #44145:
URL: https://github.com/apache/spark/pull/44145#discussion_r1473816605


##########
sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/optimizer/InferWindowGroupLimit.scala:
##########
@@ -17,21 +17,25 @@
 
 package org.apache.spark.sql.catalyst.optimizer
 
-import org.apache.spark.sql.catalyst.expressions.{Alias, Attribute, CurrentRow, DenseRank, EqualTo, Expression, GreaterThan, GreaterThanOrEqual, IntegerLiteral, LessThan, LessThanOrEqual, Literal, NamedExpression, PredicateHelper, Rank, RowFrame, RowNumber, SpecifiedWindowFrame, UnboundedPreceding, WindowExpression, WindowSpecDefinition}
-import org.apache.spark.sql.catalyst.plans.logical.{Filter, Limit, LocalRelation, LogicalPlan, Window, WindowGroupLimit}
+import org.apache.spark.sql.catalyst.expressions.{Alias, Attribute, CurrentRow, DenseRank, EqualTo, Expression, GreaterThan, GreaterThanOrEqual, IntegerLiteral, LessThan, LessThanOrEqual, Literal, NamedExpression, PredicateHelper, Rank, RowFrame, RowNumber, SizeBasedWindowFunction, SpecifiedWindowFrame, UnboundedPreceding, WindowExpression, WindowSpecDefinition}
+import org.apache.spark.sql.catalyst.plans.logical._
 import org.apache.spark.sql.catalyst.rules.Rule
-import org.apache.spark.sql.catalyst.trees.TreePattern.{FILTER, WINDOW}
+import org.apache.spark.sql.catalyst.trees.TreePattern.{FILTER, LIMIT, WINDOW}
 
 /**
  * Inserts a `WindowGroupLimit` below `Window` if the `Window` has rank-like functions
- * and the function results are further filtered by limit-like predicates. Example query:
+ * and the function results are further filtered by limit-like predicates or cumulative
+ * aggregation with limit excludes `SizeBasedWindowFunction`. Example query:

Review Comment:
   Shall we always exclude `SizeBasedWindowFunction`? Even before this PR, it seems wrong to insert `WindowGroupLimit` if the window operator contains `SizeBasedWindowFunction`.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


Re: [PR] [SPARK-46228][SQL] Insert window group limit node for cumulative aggregation with limit [spark]

Posted by "zml1206 (via GitHub)" <gi...@apache.org>.
zml1206 commented on code in PR #44145:
URL: https://github.com/apache/spark/pull/44145#discussion_r1418275721


##########
sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/optimizer/InferWindowGroupLimit.scala:
##########
@@ -68,10 +71,56 @@ object InferWindowGroupLimit extends Rule[LogicalPlan] with PredicateHelper {
     case _ => false
   }
 
+  /**
+   * All window expressions should not have SizeBasedWindowFunction, all lower/upper of
+   * specifiedWindowFrame is UnboundedPreceding/CurrentRow, and window orderSpec is not foldable,
+   * so that we can safely do the early stop.
+   */
+  private def limitSupport(limit: Int, window: Window): Boolean =
+    limit <= conf.windowGroupLimitThreshold && window.child.maxRows.forall(_ > limit) &&
+      !window.child.isInstanceOf[WindowGroupLimit] &&
+      window.orderSpec.exists(!_.foldable) &&
+      !LimitPushDownThroughWindow.supportsPushdownThroughWindow(window.windowExpressions) &&
+      window.windowExpressions.forall {
+        case Alias(WindowExpression(windowFunction, WindowSpecDefinition(_, _,
+        SpecifiedWindowFrame(_, UnboundedPreceding, CurrentRow))), _)
+          if !windowFunction.isInstanceOf[SizeBasedWindowFunction] => true
+        case _ => false
+      }
+
+  private def isRowFrame(windowExpression: NamedExpression): Boolean = windowExpression match {
+    case Alias(WindowExpression(_, WindowSpecDefinition(_, _,
+    SpecifiedWindowFrame(RowFrame, UnboundedPreceding, CurrentRow))), _) => true
+    case _ => false
+  }
+
+  private def rankLikeFunction(windowExpressions: Seq[NamedExpression]): Expression =
+    // If windowExpressions all are RowFrame, choose SimpleLimitIterator,
+    // else RankLimitIterator to obtain enough rows for ensure data accuracy.
+    if (windowExpressions.forall(isRowFrame)) {
+      new RowNumber
+    } else {
+      new Rank

Review Comment:
   They are two windows, these two windows cannot be merged into one.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


Re: [PR] [SPARK-46228][SQL] Insert window group limit node for limit outside of window [spark]

Posted by "zml1206 (via GitHub)" <gi...@apache.org>.
zml1206 commented on code in PR #44145:
URL: https://github.com/apache/spark/pull/44145#discussion_r1420501839


##########
sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/optimizer/InferWindowGroupLimit.scala:
##########
@@ -68,10 +72,58 @@ object InferWindowGroupLimit extends Rule[LogicalPlan] with PredicateHelper {
     case _ => false
   }
 
+  /**
+   * All window expressions should not have SizeBasedWindowFunction, all lower/upper of
+   * specifiedWindowFrame is UnboundedPreceding/CurrentRow, and window orderSpec is not foldable,
+   * so that we can safely do the early stop.
+   */
+  private def limitSupport(limit: Int, window: Window): Boolean =

Review Comment:
   updated



##########
sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/optimizer/InferWindowGroupLimitSuite.scala:
##########
@@ -322,20 +323,122 @@ class InferWindowGroupLimitSuite extends PlanTest {
         testRelation
           .select(a, b, c,
             windowExpr(Rank(c :: Nil),
-              windowSpec(Nil, c.desc :: Nil, windowFrame)).as("rn"))
+              windowSpec(Nil, c.desc :: Nil, windowRowFrame)).as("rn"))
           .where(cond)
 
       val correctAnswer1 =
         testRelation
           .windowGroupLimit(Nil, c.desc :: Nil, Rank(c :: Nil), 2)
           .select(a, b, c,
             windowExpr(Rank(c :: Nil),
-              windowSpec(Nil, c.desc :: Nil, windowFrame)).as("rn"))
+              windowSpec(Nil, c.desc :: Nil, windowRowFrame)).as("rn"))
           .where(cond)
 
       comparePlans(
         Optimize.execute(originalQuery1.analyze),
         WithoutOptimize.execute(correctAnswer1.analyze))
     }
   }
+
+  test("Insert window group limit node for cumulative aggregation with limit") {
+    val originalQuery =
+      testRelation
+        .select(a, b, c,
+          windowExpr(sum(b),
+            windowSpec(a :: Nil, c.desc :: Nil, windowRangeFrame)).as("s"))
+        .limit(1)
+
+    val correctAnswer =
+      testRelation
+        .windowGroupLimit(a :: Nil, c.desc :: Nil, new Rank(), 1)
+        .select(a, b, c,
+          windowExpr(sum(b),
+            windowSpec(a :: Nil, c.desc :: Nil, windowRangeFrame)).as("s"))
+        .limit(1)
+    comparePlans(
+      Optimize.execute(originalQuery.analyze),
+      WithoutOptimize.execute(correctAnswer.analyze))
+  }
+
+  test("Insert window group limit node for partitionSpec is not empty and rank-like with limit") {
+    val originalQuery =
+      testRelation
+        .select(a, b, c,
+          windowExpr(RowNumber(),
+            windowSpec(a :: Nil, c.desc :: Nil, windowRowFrame)).as("rn"))
+        .limit(1)
+
+    val correctAnswer =
+      testRelation
+        .windowGroupLimit(a :: Nil, c.desc :: Nil, RowNumber(), 1)
+        .select(a, b, c,
+          windowExpr(RowNumber(),
+            windowSpec(a :: Nil, c.desc :: Nil, windowRowFrame)).as("rn"))
+        .limit(1)
+    comparePlans(
+      Optimize.execute(originalQuery.analyze),
+      WithoutOptimize.execute(correctAnswer.analyze))
+  }
+
+  test("Insert window group limit node for partitionSpec is not empty and all the orderSpec are " +
+    "foldable and all the window expressions are RowFrame with limit") {
+    val originalQuery =
+      testRelation
+        .select(a, b, c,
+          windowExpr(RowNumber(),
+            windowSpec(a :: Nil, Literal(1).desc :: Nil, windowRowFrame)).as("rn1"),
+          windowExpr(Rank(Literal(1) :: Nil),
+            windowSpec(a :: Nil, Literal(1).desc :: Nil, windowRowFrame)).as("rn2"))
+        .limit(1)
+
+    val correctAnswer =
+      testRelation
+        .windowGroupLimit(a :: Nil, Literal(1).desc :: Nil, RowNumber(), 1)
+        .select(a, b, c,
+          windowExpr(RowNumber(),
+            windowSpec(a :: Nil, Literal(1).desc :: Nil, windowRowFrame)).as("rn1"),
+          windowExpr(new Rank(),
+            windowSpec(a :: Nil, Literal(1).desc :: Nil, windowRowFrame)).as("rn2"))
+        .limit(1)
+    comparePlans(
+      Optimize.execute(originalQuery.analyze),
+      WithoutOptimize.execute(correctAnswer.analyze))
+  }
+
+  test("Limit unsupported") {

Review Comment:
   done.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


Re: [PR] [SPARK-46228][SQL] Insert window group limit node for cumulative aggregation with limit [spark]

Posted by "zml1206 (via GitHub)" <gi...@apache.org>.
zml1206 commented on code in PR #44145:
URL: https://github.com/apache/spark/pull/44145#discussion_r1417192950


##########
sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/optimizer/InferWindowGroupLimit.scala:
##########
@@ -68,10 +72,57 @@ object InferWindowGroupLimit extends Rule[LogicalPlan] with PredicateHelper {
     case _ => false
   }
 
+  private def limitSupport(limit: Int, window: Window): Boolean =
+    limit <= conf.windowGroupLimitThreshold && window.child.maxRows.forall(_ > limit) &&
+      !window.child.isInstanceOf[WindowGroupLimit] &&
+      window.windowExpressions.forall(isExpandingWindow) &&
+      window.orderSpec.exists(!_.foldable) &&
+      // LimitPushDownThroughWindow have better performance than WindowGroupLimit if the
+      // window function is Rank, DenseRank and RowNumber, and Window partitionSpec is empty.
+      !(window.partitionSpec.isEmpty && supportsPushdownThroughWindow(window.windowExpressions) &&

Review Comment:
   yea, I will remove it.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


Re: [PR] [SPARK-46228][SQL] Insert window group limit node for cumulative aggregation with limit [spark]

Posted by "zml1206 (via GitHub)" <gi...@apache.org>.
zml1206 commented on code in PR #44145:
URL: https://github.com/apache/spark/pull/44145#discussion_r1416577802


##########
sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/optimizer/InferWindowGroupLimit.scala:
##########
@@ -17,21 +17,24 @@
 
 package org.apache.spark.sql.catalyst.optimizer
 
-import org.apache.spark.sql.catalyst.expressions.{Alias, Attribute, CurrentRow, DenseRank, EqualTo, Expression, GreaterThan, GreaterThanOrEqual, IntegerLiteral, LessThan, LessThanOrEqual, Literal, NamedExpression, PredicateHelper, Rank, RowFrame, RowNumber, SpecifiedWindowFrame, UnboundedPreceding, WindowExpression, WindowSpecDefinition}
-import org.apache.spark.sql.catalyst.plans.logical.{Filter, Limit, LocalRelation, LogicalPlan, Window, WindowGroupLimit}
+import org.apache.spark.sql.catalyst.expressions.{Alias, Attribute, CurrentRow, DenseRank, EqualTo, Expression, GreaterThan, GreaterThanOrEqual, IntegerLiteral, LessThan, LessThanOrEqual, Literal, NamedExpression, PredicateHelper, Rank, RowNumber, SizeBasedWindowFunction, SpecifiedWindowFrame, UnboundedPreceding, WindowExpression, WindowSpecDefinition}
+import org.apache.spark.sql.catalyst.plans.logical._
 import org.apache.spark.sql.catalyst.rules.Rule
-import org.apache.spark.sql.catalyst.trees.TreePattern.{FILTER, WINDOW}
+import org.apache.spark.sql.catalyst.trees.TreePattern.{FILTER, LIMIT, WINDOW}
 
 /**
  * Inserts a `WindowGroupLimit` below `Window` if the `Window` has rank-like functions
- * and the function results are further filtered by limit-like predicates. Example query:
+ * and the function results are further filtered by limit-like predicates or cumulative
+ * aggregation with limit excludes `SizeBasedWindowFunction`. Example query:
  * {{{
  *   SELECT *, ROW_NUMBER() OVER(PARTITION BY k ORDER BY a) AS rn FROM Tab1 WHERE rn = 5
  *   SELECT *, ROW_NUMBER() OVER(PARTITION BY k ORDER BY a) AS rn FROM Tab1 WHERE 5 = rn
  *   SELECT *, ROW_NUMBER() OVER(PARTITION BY k ORDER BY a) AS rn FROM Tab1 WHERE rn < 5
  *   SELECT *, ROW_NUMBER() OVER(PARTITION BY k ORDER BY a) AS rn FROM Tab1 WHERE 5 > rn
  *   SELECT *, ROW_NUMBER() OVER(PARTITION BY k ORDER BY a) AS rn FROM Tab1 WHERE rn <= 5
  *   SELECT *, ROW_NUMBER() OVER(PARTITION BY k ORDER BY a) AS rn FROM Tab1 WHERE 5 >= rn
+ *   SELECT *, ROW_NUMBER() OVER(PARTITION BY k ORDER BY a) AS rn FROM Tab1 limit 5

Review Comment:
   First, the effect of this case is the same as top-k, such as the test results of q67-changed above. This is beneficial, assuming the per-group data size is large. Secondly, use `RankLimitIterator` will slightly affect the efficiency of `ROW_NUMBER()`. Using `LimitPushDownThroughWindow` if `LimitPushDownThroughWindow` support this case,  it should have better performance. 
   If my understanding is correct, my next PR will make `LimitPushDownThroughWindow` support Top n (Limit + Sort) with partitionSpec is not empy.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


Re: [PR] [SPARK-46228][SQL] Insert window group limit node for cumulative aggregation with limit [spark]

Posted by "beliefer (via GitHub)" <gi...@apache.org>.
beliefer commented on code in PR #44145:
URL: https://github.com/apache/spark/pull/44145#discussion_r1418204345


##########
sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/optimizer/InferWindowGroupLimit.scala:
##########
@@ -68,10 +71,56 @@ object InferWindowGroupLimit extends Rule[LogicalPlan] with PredicateHelper {
     case _ => false
   }
 
+  /**
+   * All window expressions should not have SizeBasedWindowFunction, all lower/upper of
+   * specifiedWindowFrame is UnboundedPreceding/CurrentRow, and window orderSpec is not foldable,
+   * so that we can safely do the early stop.
+   */
+  private def limitSupport(limit: Int, window: Window): Boolean =
+    limit <= conf.windowGroupLimitThreshold && window.child.maxRows.forall(_ > limit) &&
+      !window.child.isInstanceOf[WindowGroupLimit] &&
+      window.orderSpec.exists(!_.foldable) &&
+      !LimitPushDownThroughWindow.supportsPushdownThroughWindow(window.windowExpressions) &&
+      window.windowExpressions.forall {
+        case Alias(WindowExpression(windowFunction, WindowSpecDefinition(_, _,
+        SpecifiedWindowFrame(_, UnboundedPreceding, CurrentRow))), _)
+          if !windowFunction.isInstanceOf[SizeBasedWindowFunction] => true

Review Comment:
   We can add `&& (!support(windowFunction) || window.partitionSpec.nonEmpty)` here.
   So that we can remove `!LimitPushDownThroughWindow.supportsPushdownThroughWindow(window.windowExpressions)`



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


Re: [PR] [SPARK-46228][SQL] Insert window group limit node for cumulative aggregation with limit [spark]

Posted by "beliefer (via GitHub)" <gi...@apache.org>.
beliefer commented on code in PR #44145:
URL: https://github.com/apache/spark/pull/44145#discussion_r1419834990


##########
sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/optimizer/InferWindowGroupLimit.scala:
##########
@@ -68,10 +71,56 @@ object InferWindowGroupLimit extends Rule[LogicalPlan] with PredicateHelper {
     case _ => false
   }
 
+  /**
+   * All window expressions should not have SizeBasedWindowFunction, all lower/upper of
+   * specifiedWindowFrame is UnboundedPreceding/CurrentRow, and window orderSpec is not foldable,
+   * so that we can safely do the early stop.
+   */
+  private def limitSupport(limit: Int, window: Window): Boolean =
+    limit <= conf.windowGroupLimitThreshold && window.child.maxRows.forall(_ > limit) &&
+      !window.child.isInstanceOf[WindowGroupLimit] &&
+      window.orderSpec.exists(!_.foldable) &&
+      !LimitPushDownThroughWindow.supportsPushdownThroughWindow(window.windowExpressions) &&
+      window.windowExpressions.forall {
+        case Alias(WindowExpression(windowFunction, WindowSpecDefinition(_, _,
+        SpecifiedWindowFrame(_, UnboundedPreceding, CurrentRow))), _)
+          if !windowFunction.isInstanceOf[SizeBasedWindowFunction] => true
+        case _ => false
+      }
+
+  private def isRowFrame(windowExpression: NamedExpression): Boolean = windowExpression match {
+    case Alias(WindowExpression(_, WindowSpecDefinition(_, _,
+    SpecifiedWindowFrame(RowFrame, UnboundedPreceding, CurrentRow))), _) => true
+    case _ => false
+  }
+
+  private def rankLikeFunction(windowExpressions: Seq[NamedExpression]): Expression =
+    // If windowExpressions all are RowFrame, choose SimpleLimitIterator,
+    // else RankLimitIterator to obtain enough rows for ensure data accuracy.
+    if (windowExpressions.forall(isRowFrame)) {
+      new RowNumber
+    } else {
+      new Rank

Review Comment:
   Yes. `sum_value2` will be correct if select `RankLimitIterator`.
   But what I want know is the result of `sum_value1`.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


Re: [PR] [SPARK-46228][SQL] Insert window group limit node for limit outside of window [spark]

Posted by "zml1206 (via GitHub)" <gi...@apache.org>.
zml1206 commented on code in PR #44145:
URL: https://github.com/apache/spark/pull/44145#discussion_r1421267218


##########
sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/optimizer/InferWindowGroupLimit.scala:
##########
@@ -68,10 +72,55 @@ object InferWindowGroupLimit extends Rule[LogicalPlan] with PredicateHelper {
     case _ => false
   }
 
+  /**
+   * Whether support inferring WindowGroupLimit from Limit outside of Window. Check if:
+   * 1. The window orderSpec exists unfoldable one or all window expressions should use the same
+   *  expanding window.
+   * 2. All window expressions should not have SizeBasedWindowFunction.
+   * 3. The Limit could not be pushed down through Window.
+   */
+  private def limitSupport(limit: Int, window: Window): Boolean =
+    limit <= conf.windowGroupLimitThreshold && window.child.maxRows.forall(_ > limit) &&
+      !window.child.isInstanceOf[WindowGroupLimit] &&
+      (window.orderSpec.exists(!_.child.foldable) ||
+        window.windowExpressions.forall(isExpandingWindow)) &&
+      window.windowExpressions.forall {
+        case Alias(WindowExpression(windowFunction, WindowSpecDefinition(_, _,
+        SpecifiedWindowFrame(_, UnboundedPreceding, CurrentRow))), _)
+          if !windowFunction.isInstanceOf[SizeBasedWindowFunction] &&

Review Comment:
   Limit outside of window can be early pruned by `WindowGroupLimit`, the following three conditions must be met:
   1.Window frame: first lower/upper must be 'UnboundedPreceding'/'CurrentRow', secondly window orderSpec exists unfoldable one or all window expressions are `RowFrame`. Because when orderSpec is foldable and window expressions is `RangeFrame`, aggregation calculation requires the use of all rows in the window group.
   2.Window function: All window expressions should not have `SizeBasedWindowFunction`. Because aggregation calculation of `SizeBasedWindowFunction` same requires the use of all rows in the window group.
   3.The Limit could not be pushed down through Window. Because `LimitPushDownThroughWindow` have better performance than `WindowGroupLimit`.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


Re: [PR] [SPARK-46228][SQL] Insert window group limit node for cumulative aggregation with limit [spark]

Posted by "beliefer (via GitHub)" <gi...@apache.org>.
beliefer commented on code in PR #44145:
URL: https://github.com/apache/spark/pull/44145#discussion_r1417142565


##########
sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/optimizer/InferWindowGroupLimit.scala:
##########
@@ -68,10 +72,57 @@ object InferWindowGroupLimit extends Rule[LogicalPlan] with PredicateHelper {
     case _ => false
   }
 
+  private def limitSupport(limit: Int, window: Window): Boolean =
+    limit <= conf.windowGroupLimitThreshold && window.child.maxRows.forall(_ > limit) &&
+      !window.child.isInstanceOf[WindowGroupLimit] &&
+      window.windowExpressions.forall(isExpandingWindow) &&
+      window.orderSpec.exists(!_.foldable) &&
+      // LimitPushDownThroughWindow have better performance than WindowGroupLimit if the
+      // window function is Rank, DenseRank and RowNumber, and Window partitionSpec is empty.
+      !(window.partitionSpec.isEmpty && supportsPushdownThroughWindow(window.windowExpressions) &&

Review Comment:
   It seems `LimitPushDownThroughWindow` already pushdown `Limit` through `Window` before arrive `InferWindowGroupLimit`.



##########
sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/optimizer/InferWindowGroupLimit.scala:
##########
@@ -68,10 +72,57 @@ object InferWindowGroupLimit extends Rule[LogicalPlan] with PredicateHelper {
     case _ => false
   }
 
+  private def limitSupport(limit: Int, window: Window): Boolean =
+    limit <= conf.windowGroupLimitThreshold && window.child.maxRows.forall(_ > limit) &&
+      !window.child.isInstanceOf[WindowGroupLimit] &&
+      window.windowExpressions.forall(isExpandingWindow) &&
+      window.orderSpec.exists(!_.foldable) &&
+      // LimitPushDownThroughWindow have better performance than WindowGroupLimit if the
+      // window function is Rank, DenseRank and RowNumber, and Window partitionSpec is empty.
+      !(window.partitionSpec.isEmpty && supportsPushdownThroughWindow(window.windowExpressions) &&
+        limit < conf.topKSortFallbackThreshold)
+
+  private def supportsPushdownThroughWindow(

Review Comment:
   We can change the method of LimitPushDownThroughWindow to `private[catalyst]` and avoid copy it.



##########
sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/optimizer/InferWindowGroupLimit.scala:
##########
@@ -58,8 +61,9 @@ object InferWindowGroupLimit extends Rule[LogicalPlan] with PredicateHelper {
    */
   private def isExpandingWindow(
       windowExpression: NamedExpression): Boolean = windowExpression match {
-    case Alias(WindowExpression(_, WindowSpecDefinition(_, _,
-    SpecifiedWindowFrame(RowFrame, UnboundedPreceding, CurrentRow))), _) => true

Review Comment:
   We can't delete these directly, please do not affect the origin logic.



##########
sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/optimizer/InferWindowGroupLimit.scala:
##########
@@ -68,10 +72,57 @@ object InferWindowGroupLimit extends Rule[LogicalPlan] with PredicateHelper {
     case _ => false
   }
 
+  private def limitSupport(limit: Int, window: Window): Boolean =
+    limit <= conf.windowGroupLimitThreshold && window.child.maxRows.forall(_ > limit) &&
+      !window.child.isInstanceOf[WindowGroupLimit] &&
+      window.windowExpressions.forall(isExpandingWindow) &&
+      window.orderSpec.exists(!_.foldable) &&

Review Comment:
   Why need this check? What will happen if all the `orderSpec` are foldable.



##########
sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/optimizer/InferWindowGroupLimit.scala:
##########
@@ -58,8 +61,9 @@ object InferWindowGroupLimit extends Rule[LogicalPlan] with PredicateHelper {
    */
   private def isExpandingWindow(
       windowExpression: NamedExpression): Boolean = windowExpression match {
-    case Alias(WindowExpression(_, WindowSpecDefinition(_, _,
-    SpecifiedWindowFrame(RowFrame, UnboundedPreceding, CurrentRow))), _) => true

Review Comment:
   You can add a parameter to distinguish.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


Re: [PR] [SPARK-46228][SQL] Insert window group limit node for cumulative aggregation with limit [spark]

Posted by "zml1206 (via GitHub)" <gi...@apache.org>.
zml1206 commented on code in PR #44145:
URL: https://github.com/apache/spark/pull/44145#discussion_r1418256276


##########
sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/optimizer/InferWindowGroupLimit.scala:
##########
@@ -68,10 +72,57 @@ object InferWindowGroupLimit extends Rule[LogicalPlan] with PredicateHelper {
     case _ => false
   }
 
+  private def limitSupport(limit: Int, window: Window): Boolean =
+    limit <= conf.windowGroupLimitThreshold && window.child.maxRows.forall(_ > limit) &&
+      !window.child.isInstanceOf[WindowGroupLimit] &&
+      window.windowExpressions.forall(isExpandingWindow) &&
+      window.orderSpec.exists(!_.foldable) &&
+      // LimitPushDownThroughWindow have better performance than WindowGroupLimit if the
+      // window function is Rank, DenseRank and RowNumber, and Window partitionSpec is empty.
+      !(window.partitionSpec.isEmpty && supportsPushdownThroughWindow(window.windowExpressions) &&

Review Comment:
   InferWindowGroupLimitSuite.test("Insert window group limit node for top-k computation: empty relation") tested.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


Re: [PR] [SPARK-46228][SQL] Insert window group limit node for cumulative aggregation with limit [spark]

Posted by "zml1206 (via GitHub)" <gi...@apache.org>.
zml1206 commented on code in PR #44145:
URL: https://github.com/apache/spark/pull/44145#discussion_r1418279638


##########
sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/optimizer/InferWindowGroupLimit.scala:
##########
@@ -68,10 +72,57 @@ object InferWindowGroupLimit extends Rule[LogicalPlan] with PredicateHelper {
     case _ => false
   }
 
+  private def limitSupport(limit: Int, window: Window): Boolean =
+    limit <= conf.windowGroupLimitThreshold && window.child.maxRows.forall(_ > limit) &&
+      !window.child.isInstanceOf[WindowGroupLimit] &&
+      window.windowExpressions.forall(isExpandingWindow) &&
+      window.orderSpec.exists(!_.foldable) &&
+      // LimitPushDownThroughWindow have better performance than WindowGroupLimit if the
+      // window function is Rank, DenseRank and RowNumber, and Window partitionSpec is empty.
+      !(window.partitionSpec.isEmpty && supportsPushdownThroughWindow(window.windowExpressions) &&

Review Comment:
   Done.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


Re: [PR] [SPARK-46228][SQL] Insert window group limit node for limit outside of window [spark]

Posted by "zml1206 (via GitHub)" <gi...@apache.org>.
zml1206 commented on code in PR #44145:
URL: https://github.com/apache/spark/pull/44145#discussion_r1426081697


##########
sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/optimizer/InferWindowGroupLimit.scala:
##########
@@ -68,10 +72,55 @@ object InferWindowGroupLimit extends Rule[LogicalPlan] with PredicateHelper {
     case _ => false
   }
 
+  /**
+   * Whether support inferring WindowGroupLimit from Limit outside of Window. Check if:
+   * 1. The window orderSpec exists unfoldable one or all window expressions should use the same
+   *  expanding window.
+   * 2. All window expressions should not have SizeBasedWindowFunction.
+   * 3. The Limit could not be pushed down through Window.
+   */
+  private def limitSupport(limit: Int, window: Window): Boolean =
+    limit <= conf.windowGroupLimitThreshold && window.child.maxRows.forall(_ > limit) &&
+      !window.child.isInstanceOf[WindowGroupLimit] &&
+      (window.orderSpec.exists(!_.child.foldable) ||
+        window.windowExpressions.forall(isExpandingWindow)) &&
+      window.windowExpressions.forall {
+        case Alias(WindowExpression(windowFunction, WindowSpecDefinition(_, _,
+        SpecifiedWindowFrame(_, UnboundedPreceding, CurrentRow))), _)
+          if !windowFunction.isInstanceOf[SizeBasedWindowFunction] &&

Review Comment:
   The calculation of top k of window functions only depends on the rows with rank <= k. The above three points are the judgment logic.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


Re: [PR] [SPARK-46228][SQL] Insert window group limit node for limit outside of window [spark]

Posted by "zml1206 (via GitHub)" <gi...@apache.org>.
zml1206 commented on code in PR #44145:
URL: https://github.com/apache/spark/pull/44145#discussion_r1420494712


##########
sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/optimizer/InferWindowGroupLimit.scala:
##########
@@ -68,10 +72,58 @@ object InferWindowGroupLimit extends Rule[LogicalPlan] with PredicateHelper {
     case _ => false
   }
 
+  /**
+   * All window expressions should not have SizeBasedWindowFunction, all lower/upper of
+   * specifiedWindowFrame is UnboundedPreceding/CurrentRow, and window orderSpec is not foldable,
+   * so that we can safely do the early stop.
+   */
+  private def limitSupport(limit: Int, window: Window): Boolean =
+    limit <= conf.windowGroupLimitThreshold && window.child.maxRows.forall(_ > limit) &&
+      !window.child.isInstanceOf[WindowGroupLimit] &&
+      (window.orderSpec.exists(!_.child.foldable) || window.windowExpressions.forall(isRowFrame)) &&
+      window.windowExpressions.forall {
+        case Alias(WindowExpression(windowFunction, WindowSpecDefinition(_, _,
+        SpecifiedWindowFrame(_, UnboundedPreceding, CurrentRow))), _)
+          if !windowFunction.isInstanceOf[SizeBasedWindowFunction] &&
+            // LimitPushDownThroughWindow have better performance than WindowGroupLimit if the
+            // window function is rank-like and Window partitionSpec is empty.
+            (!support(windowFunction) || window.partitionSpec.nonEmpty) => true
+        case _ => false
+      }
+
+  private def isRowFrame(windowExpression: NamedExpression): Boolean = windowExpression match {
+    case Alias(WindowExpression(_, WindowSpecDefinition(_, _,
+    SpecifiedWindowFrame(RowFrame, UnboundedPreceding, CurrentRow))), _) => true
+    case _ => false
+  }

Review Comment:
   removed.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


Re: [PR] [SPARK-46228][SQL] Insert window group limit node for cumulative aggregation with limit [spark]

Posted by "beliefer (via GitHub)" <gi...@apache.org>.
beliefer commented on PR #44145:
URL: https://github.com/apache/spark/pull/44145#issuecomment-1842376546

   Although I understood the mechanism, please add more detail in the PR description. So as contributors could understand.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


Re: [PR] [SPARK-46228][SQL] Insert window group limit node for cumulative aggregation with limit [spark]

Posted by "beliefer (via GitHub)" <gi...@apache.org>.
beliefer commented on code in PR #44145:
URL: https://github.com/apache/spark/pull/44145#discussion_r1419847555


##########
sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/optimizer/InferWindowGroupLimit.scala:
##########
@@ -68,10 +71,56 @@ object InferWindowGroupLimit extends Rule[LogicalPlan] with PredicateHelper {
     case _ => false
   }
 
+  /**
+   * All window expressions should not have SizeBasedWindowFunction, all lower/upper of
+   * specifiedWindowFrame is UnboundedPreceding/CurrentRow, and window orderSpec is not foldable,
+   * so that we can safely do the early stop.
+   */
+  private def limitSupport(limit: Int, window: Window): Boolean =
+    limit <= conf.windowGroupLimitThreshold && window.child.maxRows.forall(_ > limit) &&
+      !window.child.isInstanceOf[WindowGroupLimit] &&
+      window.orderSpec.exists(!_.foldable) &&
+      !LimitPushDownThroughWindow.supportsPushdownThroughWindow(window.windowExpressions) &&
+      window.windowExpressions.forall {
+        case Alias(WindowExpression(windowFunction, WindowSpecDefinition(_, _,
+        SpecifiedWindowFrame(_, UnboundedPreceding, CurrentRow))), _)
+          if !windowFunction.isInstanceOf[SizeBasedWindowFunction] => true
+        case _ => false
+      }
+
+  private def isRowFrame(windowExpression: NamedExpression): Boolean = windowExpression match {
+    case Alias(WindowExpression(_, WindowSpecDefinition(_, _,
+    SpecifiedWindowFrame(RowFrame, UnboundedPreceding, CurrentRow))), _) => true
+    case _ => false
+  }
+
+  private def rankLikeFunction(windowExpressions: Seq[NamedExpression]): Expression =
+    // If windowExpressions all are RowFrame, choose SimpleLimitIterator,
+    // else RankLimitIterator to obtain enough rows for ensure data accuracy.
+    if (windowExpressions.forall(isRowFrame)) {
+      new RowNumber
+    } else {
+      new Rank

Review Comment:
   Please add the test case similar
   `df.withColumn("sum_value1", sum("value").over(window.rowsBetween(Window.unboundedPreceding, Window.currentRow))).withColumn("sum_value2", sum("value").over(window.rangeBetween(Window.unboundedPreceding, Window.currentRow))).limit(1)`



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


Re: [PR] [SPARK-46228][SQL] Insert window group limit node for cumulative aggregation with limit [spark]

Posted by "beliefer (via GitHub)" <gi...@apache.org>.
beliefer commented on code in PR #44145:
URL: https://github.com/apache/spark/pull/44145#discussion_r1418273111


##########
sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/optimizer/InferWindowGroupLimit.scala:
##########
@@ -68,10 +71,56 @@ object InferWindowGroupLimit extends Rule[LogicalPlan] with PredicateHelper {
     case _ => false
   }
 
+  /**
+   * All window expressions should not have SizeBasedWindowFunction, all lower/upper of
+   * specifiedWindowFrame is UnboundedPreceding/CurrentRow, and window orderSpec is not foldable,
+   * so that we can safely do the early stop.
+   */
+  private def limitSupport(limit: Int, window: Window): Boolean =
+    limit <= conf.windowGroupLimitThreshold && window.child.maxRows.forall(_ > limit) &&
+      !window.child.isInstanceOf[WindowGroupLimit] &&
+      window.orderSpec.exists(!_.foldable) &&
+      !LimitPushDownThroughWindow.supportsPushdownThroughWindow(window.windowExpressions) &&
+      window.windowExpressions.forall {
+        case Alias(WindowExpression(windowFunction, WindowSpecDefinition(_, _,
+        SpecifiedWindowFrame(_, UnboundedPreceding, CurrentRow))), _)
+          if !windowFunction.isInstanceOf[SizeBasedWindowFunction] => true
+        case _ => false
+      }
+
+  private def isRowFrame(windowExpression: NamedExpression): Boolean = windowExpression match {
+    case Alias(WindowExpression(_, WindowSpecDefinition(_, _,
+    SpecifiedWindowFrame(RowFrame, UnboundedPreceding, CurrentRow))), _) => true
+    case _ => false
+  }
+
+  private def rankLikeFunction(windowExpressions: Seq[NamedExpression]): Expression =
+    // If windowExpressions all are RowFrame, choose SimpleLimitIterator,
+    // else RankLimitIterator to obtain enough rows for ensure data accuracy.
+    if (windowExpressions.forall(isRowFrame)) {
+      new RowNumber
+    } else {
+      new Rank

Review Comment:
   `df.withColumn("sum_value1", sum("value").over(window.rowsBetween(Window.unboundedPreceding, Window.currentRow))).withColumn("sum_value2", sum("value").over(window)).limit(1)` ?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


Re: [PR] [SPARK-46228][SQL] Insert window group limit node for limit outside of window [spark]

Posted by "beliefer (via GitHub)" <gi...@apache.org>.
beliefer commented on code in PR #44145:
URL: https://github.com/apache/spark/pull/44145#discussion_r1476932669


##########
sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/optimizer/InferWindowGroupLimit.scala:
##########
@@ -69,10 +73,55 @@ object InferWindowGroupLimit extends Rule[LogicalPlan] with PredicateHelper {
     case _ => false
   }
 
+  /**
+   * Whether support inferring WindowGroupLimit from Limit outside of Window. Check if:
+   * 1. The window orderSpec exists unfoldable one or all window expressions should use the same
+   *  expanding window.
+   * 2. All window expressions should not have SizeBasedWindowFunction.
+   * 3. The Limit could not be pushed down through Window.
+   */
+  private def limitSupport(limit: Int, window: Window): Boolean =
+    limit <= conf.windowGroupLimitThreshold && window.child.maxRows.forall(_ > limit) &&
+      !window.child.isInstanceOf[WindowGroupLimit] &&
+      (window.orderSpec.exists(!_.child.foldable) ||
+        window.windowExpressions.forall(isExpandingWindow)) &&
+      window.windowExpressions.forall {
+        case Alias(WindowExpression(windowFunction, WindowSpecDefinition(_, _,
+        SpecifiedWindowFrame(_, UnboundedPreceding, CurrentRow))), _)
+          if !windowFunction.isInstanceOf[SizeBasedWindowFunction] &&

Review Comment:
   We should remove the three lines now.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


Re: [PR] [SPARK-46228][SQL] Insert window group limit node for limit outside of window [spark]

Posted by "beliefer (via GitHub)" <gi...@apache.org>.
beliefer commented on PR #44145:
URL: https://github.com/apache/spark/pull/44145#issuecomment-1848396570

   @zml1206 The PR description is not clear enough. Please modify
   `We can extract the limit value 2 (("a", 4, ""),("a", 4, "")) on window group a because window expression is RangeFrame and after sort, the sort values ​​of the first row and the second row are the same, limit value 1 (("b", 1, "h")) on window group b in WindowGroupLimitExec.`
   You can change to `We infer the rank() on window group a because window expression is RangeFrame and after sort. WindowGroupLimitExec will select `RankLimitIterator` to select the range values. We infer the rownumber() on window group b. `WindowGroupLimitExec` will select `SimpleLimitIterator` to select the row values.`


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


Re: [PR] [SPARK-46228][SQL] Insert window group limit node for cumulative aggregation with limit [spark]

Posted by "beliefer (via GitHub)" <gi...@apache.org>.
beliefer commented on code in PR #44145:
URL: https://github.com/apache/spark/pull/44145#discussion_r1418887819


##########
sql/core/src/test/scala/org/apache/spark/sql/DataFrameWindowFunctionsSuite.scala:
##########
@@ -1276,6 +1276,136 @@ class DataFrameWindowFunctionsSuite extends QueryTest
     )
   }
 
+  test("SPARK-46228: Insert window group limit node for cumulative aggregation with limit") {
+
+    val nullStr: String = null
+    val df = Seq(
+      ("a", 0, "c"),
+      ("a", 1, "x"),
+      ("a", 2, "y"),
+      ("a", 3, "z"),
+      ("a", 4, ""),
+      ("a", 4, ""),
+      ("b", 1, "h"),
+      ("b", 1, "n"),
+      ("c", 1, "z"),
+      ("c", 1, "a"),
+      ("c", 2, nullStr)).toDF("key", "value", "order")
+    val window = Window.partitionBy($"key").orderBy($"order".asc_nulls_first)
+    val window2 = Window.partitionBy($"key").orderBy($"order".desc_nulls_first)
+    val window3 = Window.orderBy($"order".asc_nulls_first)
+
+    Seq(true, false).foreach { enableEvaluator =>
+      withSQLConf(SQLConf.USE_PARTITION_EVALUATOR.key -> enableEvaluator.toString) {
+        Seq(-1, 100).foreach { threshold =>
+          withSQLConf(SQLConf.WINDOW_GROUP_LIMIT_THRESHOLD.key -> threshold.toString) {
+            // RowFrame
+            val existWindowGroupLimitRowFrame =
+              df.withColumn("sum_value", sum("value").over(window))
+                .limit(1)
+                .queryExecution.optimizedPlan.exists {
+                case _: WindowGroupLimit => true
+                case _ => false
+              }

Review Comment:
   Please move the test about plan to `InferWindowGroupLimitSuite`



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


Re: [PR] [SPARK-46228][SQL] Insert window group limit node for cumulative aggregation with limit [spark]

Posted by "zml1206 (via GitHub)" <gi...@apache.org>.
zml1206 commented on code in PR #44145:
URL: https://github.com/apache/spark/pull/44145#discussion_r1419843946


##########
sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/optimizer/InferWindowGroupLimit.scala:
##########
@@ -68,10 +71,56 @@ object InferWindowGroupLimit extends Rule[LogicalPlan] with PredicateHelper {
     case _ => false
   }
 
+  /**
+   * All window expressions should not have SizeBasedWindowFunction, all lower/upper of
+   * specifiedWindowFrame is UnboundedPreceding/CurrentRow, and window orderSpec is not foldable,
+   * so that we can safely do the early stop.
+   */
+  private def limitSupport(limit: Int, window: Window): Boolean =
+    limit <= conf.windowGroupLimitThreshold && window.child.maxRows.forall(_ > limit) &&
+      !window.child.isInstanceOf[WindowGroupLimit] &&
+      window.orderSpec.exists(!_.foldable) &&
+      !LimitPushDownThroughWindow.supportsPushdownThroughWindow(window.windowExpressions) &&
+      window.windowExpressions.forall {
+        case Alias(WindowExpression(windowFunction, WindowSpecDefinition(_, _,
+        SpecifiedWindowFrame(_, UnboundedPreceding, CurrentRow))), _)
+          if !windowFunction.isInstanceOf[SizeBasedWindowFunction] => true
+        case _ => false
+      }
+
+  private def isRowFrame(windowExpression: NamedExpression): Boolean = windowExpression match {
+    case Alias(WindowExpression(_, WindowSpecDefinition(_, _,
+    SpecifiedWindowFrame(RowFrame, UnboundedPreceding, CurrentRow))), _) => true
+    case _ => false
+  }
+
+  private def rankLikeFunction(windowExpressions: Seq[NamedExpression]): Expression =
+    // If windowExpressions all are RowFrame, choose SimpleLimitIterator,
+    // else RankLimitIterator to obtain enough rows for ensure data accuracy.
+    if (windowExpressions.forall(isRowFrame)) {
+      new RowNumber
+    } else {
+      new Rank

Review Comment:
   `sum_value1` not equal `sum_value2` if exist same rows.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


Re: [PR] [SPARK-46228][SQL] Insert window group limit node for cumulative aggregation with limit [spark]

Posted by "zml1206 (via GitHub)" <gi...@apache.org>.
zml1206 commented on code in PR #44145:
URL: https://github.com/apache/spark/pull/44145#discussion_r1417173216


##########
sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/optimizer/InferWindowGroupLimit.scala:
##########
@@ -68,10 +72,57 @@ object InferWindowGroupLimit extends Rule[LogicalPlan] with PredicateHelper {
     case _ => false
   }
 
+  private def limitSupport(limit: Int, window: Window): Boolean =
+    limit <= conf.windowGroupLimitThreshold && window.child.maxRows.forall(_ > limit) &&
+      !window.child.isInstanceOf[WindowGroupLimit] &&
+      window.windowExpressions.forall(isExpandingWindow) &&
+      window.orderSpec.exists(!_.foldable) &&

Review Comment:
   If `orderSpec` are foldable, equivalent orderSpec is `Nil`, function calculation requires all data of each window group.  



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


Re: [PR] [SPARK-46228][SQL] Insert window group limit node for cumulative aggregation with limit [spark]

Posted by "beliefer (via GitHub)" <gi...@apache.org>.
beliefer commented on code in PR #44145:
URL: https://github.com/apache/spark/pull/44145#discussion_r1418226850


##########
sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/optimizer/InferWindowGroupLimit.scala:
##########
@@ -68,10 +72,57 @@ object InferWindowGroupLimit extends Rule[LogicalPlan] with PredicateHelper {
     case _ => false
   }
 
+  private def limitSupport(limit: Int, window: Window): Boolean =
+    limit <= conf.windowGroupLimitThreshold && window.child.maxRows.forall(_ > limit) &&
+      !window.child.isInstanceOf[WindowGroupLimit] &&
+      window.windowExpressions.forall(isExpandingWindow) &&
+      window.orderSpec.exists(!_.foldable) &&
+      // LimitPushDownThroughWindow have better performance than WindowGroupLimit if the
+      // window function is Rank, DenseRank and RowNumber, and Window partitionSpec is empty.
+      !(window.partitionSpec.isEmpty && supportsPushdownThroughWindow(window.windowExpressions) &&

Review Comment:
   Could you give an example?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


Re: [PR] [SPARK-46228][SQL] Insert window group limit node for cumulative aggregation with limit [spark]

Posted by "beliefer (via GitHub)" <gi...@apache.org>.
beliefer commented on code in PR #44145:
URL: https://github.com/apache/spark/pull/44145#discussion_r1418209839


##########
sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/optimizer/InferWindowGroupLimit.scala:
##########
@@ -68,10 +71,56 @@ object InferWindowGroupLimit extends Rule[LogicalPlan] with PredicateHelper {
     case _ => false
   }
 
+  /**
+   * All window expressions should not have SizeBasedWindowFunction, all lower/upper of
+   * specifiedWindowFrame is UnboundedPreceding/CurrentRow, and window orderSpec is not foldable,
+   * so that we can safely do the early stop.
+   */
+  private def limitSupport(limit: Int, window: Window): Boolean =
+    limit <= conf.windowGroupLimitThreshold && window.child.maxRows.forall(_ > limit) &&
+      !window.child.isInstanceOf[WindowGroupLimit] &&
+      window.orderSpec.exists(!_.foldable) &&
+      !LimitPushDownThroughWindow.supportsPushdownThroughWindow(window.windowExpressions) &&
+      window.windowExpressions.forall {
+        case Alias(WindowExpression(windowFunction, WindowSpecDefinition(_, _,
+        SpecifiedWindowFrame(_, UnboundedPreceding, CurrentRow))), _)
+          if !windowFunction.isInstanceOf[SizeBasedWindowFunction] => true
+        case _ => false
+      }
+
+  private def isRowFrame(windowExpression: NamedExpression): Boolean = windowExpression match {
+    case Alias(WindowExpression(_, WindowSpecDefinition(_, _,
+    SpecifiedWindowFrame(RowFrame, UnboundedPreceding, CurrentRow))), _) => true
+    case _ => false
+  }
+
+  private def rankLikeFunction(windowExpressions: Seq[NamedExpression]): Expression =
+    // If windowExpressions all are RowFrame, choose SimpleLimitIterator,
+    // else RankLimitIterator to obtain enough rows for ensure data accuracy.
+    if (windowExpressions.forall(isRowFrame)) {
+      new RowNumber
+    } else {
+      new Rank

Review Comment:
   requires all the frame is `RangeFrame` ?



##########
sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/optimizer/InferWindowGroupLimit.scala:
##########
@@ -68,10 +71,56 @@ object InferWindowGroupLimit extends Rule[LogicalPlan] with PredicateHelper {
     case _ => false
   }
 
+  /**
+   * All window expressions should not have SizeBasedWindowFunction, all lower/upper of
+   * specifiedWindowFrame is UnboundedPreceding/CurrentRow, and window orderSpec is not foldable,
+   * so that we can safely do the early stop.
+   */
+  private def limitSupport(limit: Int, window: Window): Boolean =
+    limit <= conf.windowGroupLimitThreshold && window.child.maxRows.forall(_ > limit) &&
+      !window.child.isInstanceOf[WindowGroupLimit] &&
+      window.orderSpec.exists(!_.foldable) &&
+      !LimitPushDownThroughWindow.supportsPushdownThroughWindow(window.windowExpressions) &&
+      window.windowExpressions.forall {
+        case Alias(WindowExpression(windowFunction, WindowSpecDefinition(_, _,
+        SpecifiedWindowFrame(_, UnboundedPreceding, CurrentRow))), _)
+          if !windowFunction.isInstanceOf[SizeBasedWindowFunction] => true
+        case _ => false
+      }
+
+  private def isRowFrame(windowExpression: NamedExpression): Boolean = windowExpression match {
+    case Alias(WindowExpression(_, WindowSpecDefinition(_, _,
+    SpecifiedWindowFrame(RowFrame, UnboundedPreceding, CurrentRow))), _) => true
+    case _ => false
+  }
+
+  private def rankLikeFunction(windowExpressions: Seq[NamedExpression]): Expression =

Review Comment:
   `rankLikeFunction` -> `selectRankLikeFunction`



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


Re: [PR] [SPARK-46228][SQL] Insert window group limit node for limit outside of window [spark]

Posted by "beliefer (via GitHub)" <gi...@apache.org>.
beliefer commented on code in PR #44145:
URL: https://github.com/apache/spark/pull/44145#discussion_r1473987561


##########
sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/optimizer/InferWindowGroupLimit.scala:
##########
@@ -17,21 +17,25 @@
 
 package org.apache.spark.sql.catalyst.optimizer
 
-import org.apache.spark.sql.catalyst.expressions.{Alias, Attribute, CurrentRow, DenseRank, EqualTo, Expression, GreaterThan, GreaterThanOrEqual, IntegerLiteral, LessThan, LessThanOrEqual, Literal, NamedExpression, PredicateHelper, Rank, RowFrame, RowNumber, SpecifiedWindowFrame, UnboundedPreceding, WindowExpression, WindowSpecDefinition}
-import org.apache.spark.sql.catalyst.plans.logical.{Filter, Limit, LocalRelation, LogicalPlan, Window, WindowGroupLimit}
+import org.apache.spark.sql.catalyst.expressions.{Alias, Attribute, CurrentRow, DenseRank, EqualTo, Expression, GreaterThan, GreaterThanOrEqual, IntegerLiteral, LessThan, LessThanOrEqual, Literal, NamedExpression, PredicateHelper, Rank, RowFrame, RowNumber, SizeBasedWindowFunction, SpecifiedWindowFrame, UnboundedPreceding, WindowExpression, WindowSpecDefinition}
+import org.apache.spark.sql.catalyst.plans.logical._
 import org.apache.spark.sql.catalyst.rules.Rule
-import org.apache.spark.sql.catalyst.trees.TreePattern.{FILTER, WINDOW}
+import org.apache.spark.sql.catalyst.trees.TreePattern.{FILTER, LIMIT, WINDOW}
 
 /**
  * Inserts a `WindowGroupLimit` below `Window` if the `Window` has rank-like functions
- * and the function results are further filtered by limit-like predicates. Example query:
+ * and the function results are further filtered by limit-like predicates or cumulative
+ * aggregation with limit excludes `SizeBasedWindowFunction`. Example query:

Review Comment:
   @zml1206 It's my mistake. Let me fix it first.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


Re: [PR] [SPARK-46228][SQL] Insert window group limit node for limit outside of window [spark]

Posted by "beliefer (via GitHub)" <gi...@apache.org>.
beliefer commented on code in PR #44145:
URL: https://github.com/apache/spark/pull/44145#discussion_r1477302278


##########
sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/optimizer/InferWindowGroupLimit.scala:
##########
@@ -69,10 +73,55 @@ object InferWindowGroupLimit extends Rule[LogicalPlan] with PredicateHelper {
     case _ => false
   }
 
+  /**
+   * Whether support inferring WindowGroupLimit from Limit outside of Window. Check if:
+   * 1. The window orderSpec exists unfoldable one or all window expressions should use the same
+   *  expanding window.
+   * 2. All window expressions should not have SizeBasedWindowFunction.
+   * 3. The Limit could not be pushed down through Window.
+   */
+  private def limitSupport(limit: Int, window: Window): Boolean =
+    limit <= conf.windowGroupLimitThreshold && window.child.maxRows.forall(_ > limit) &&
+      !window.child.isInstanceOf[WindowGroupLimit] &&
+      (window.orderSpec.exists(!_.child.foldable) ||
+        window.windowExpressions.forall(isExpandingWindow)) &&
+      window.windowExpressions.forall {
+        case Alias(WindowExpression(windowFunction, WindowSpecDefinition(_, _,
+        SpecifiedWindowFrame(_, UnboundedPreceding, CurrentRow))), _)
+          if !windowFunction.isInstanceOf[SizeBasedWindowFunction] &&

Review Comment:
   Got it. Thank you!



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


Re: [PR] [SPARK-46228][SQL] Insert window group limit node for cumulative aggregation with limit [spark]

Posted by "zml1206 (via GitHub)" <gi...@apache.org>.
zml1206 commented on code in PR #44145:
URL: https://github.com/apache/spark/pull/44145#discussion_r1418930283


##########
sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/optimizer/InferWindowGroupLimit.scala:
##########
@@ -68,10 +72,57 @@ object InferWindowGroupLimit extends Rule[LogicalPlan] with PredicateHelper {
     case _ => false
   }
 
+  private def limitSupport(limit: Int, window: Window): Boolean =
+    limit <= conf.windowGroupLimitThreshold && window.child.maxRows.forall(_ > limit) &&
+      !window.child.isInstanceOf[WindowGroupLimit] &&
+      window.windowExpressions.forall(isExpandingWindow) &&
+      window.orderSpec.exists(!_.foldable) &&

Review Comment:
   yea, I didn't think it through, I'll make changes right away.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


Re: [PR] [SPARK-46228][SQL] Insert window group limit node for cumulative aggregation with limit [spark]

Posted by "zml1206 (via GitHub)" <gi...@apache.org>.
zml1206 commented on code in PR #44145:
URL: https://github.com/apache/spark/pull/44145#discussion_r1418257351


##########
sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/optimizer/InferWindowGroupLimit.scala:
##########
@@ -68,10 +71,56 @@ object InferWindowGroupLimit extends Rule[LogicalPlan] with PredicateHelper {
     case _ => false
   }
 
+  /**
+   * All window expressions should not have SizeBasedWindowFunction, all lower/upper of
+   * specifiedWindowFrame is UnboundedPreceding/CurrentRow, and window orderSpec is not foldable,
+   * so that we can safely do the early stop.
+   */
+  private def limitSupport(limit: Int, window: Window): Boolean =
+    limit <= conf.windowGroupLimitThreshold && window.child.maxRows.forall(_ > limit) &&
+      !window.child.isInstanceOf[WindowGroupLimit] &&
+      window.orderSpec.exists(!_.foldable) &&
+      !LimitPushDownThroughWindow.supportsPushdownThroughWindow(window.windowExpressions) &&
+      window.windowExpressions.forall {
+        case Alias(WindowExpression(windowFunction, WindowSpecDefinition(_, _,
+        SpecifiedWindowFrame(_, UnboundedPreceding, CurrentRow))), _)
+          if !windowFunction.isInstanceOf[SizeBasedWindowFunction] => true
+        case _ => false
+      }
+
+  private def isRowFrame(windowExpression: NamedExpression): Boolean = windowExpression match {
+    case Alias(WindowExpression(_, WindowSpecDefinition(_, _,
+    SpecifiedWindowFrame(RowFrame, UnboundedPreceding, CurrentRow))), _) => true
+    case _ => false
+  }
+
+  private def rankLikeFunction(windowExpressions: Seq[NamedExpression]): Expression =

Review Comment:
   updated



##########
sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/optimizer/InferWindowGroupLimit.scala:
##########
@@ -68,10 +71,56 @@ object InferWindowGroupLimit extends Rule[LogicalPlan] with PredicateHelper {
     case _ => false
   }
 
+  /**
+   * All window expressions should not have SizeBasedWindowFunction, all lower/upper of
+   * specifiedWindowFrame is UnboundedPreceding/CurrentRow, and window orderSpec is not foldable,
+   * so that we can safely do the early stop.
+   */
+  private def limitSupport(limit: Int, window: Window): Boolean =
+    limit <= conf.windowGroupLimitThreshold && window.child.maxRows.forall(_ > limit) &&
+      !window.child.isInstanceOf[WindowGroupLimit] &&
+      window.orderSpec.exists(!_.foldable) &&
+      !LimitPushDownThroughWindow.supportsPushdownThroughWindow(window.windowExpressions) &&
+      window.windowExpressions.forall {
+        case Alias(WindowExpression(windowFunction, WindowSpecDefinition(_, _,
+        SpecifiedWindowFrame(_, UnboundedPreceding, CurrentRow))), _)
+          if !windowFunction.isInstanceOf[SizeBasedWindowFunction] => true

Review Comment:
   updated



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


Re: [PR] [SPARK-46228][SQL] Insert window group limit node for cumulative aggregation with limit [spark]

Posted by "beliefer (via GitHub)" <gi...@apache.org>.
beliefer commented on code in PR #44145:
URL: https://github.com/apache/spark/pull/44145#discussion_r1418231837


##########
sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/optimizer/InferWindowGroupLimit.scala:
##########
@@ -68,10 +71,56 @@ object InferWindowGroupLimit extends Rule[LogicalPlan] with PredicateHelper {
     case _ => false
   }
 
+  /**
+   * All window expressions should not have SizeBasedWindowFunction, all lower/upper of
+   * specifiedWindowFrame is UnboundedPreceding/CurrentRow, and window orderSpec is not foldable,
+   * so that we can safely do the early stop.
+   */
+  private def limitSupport(limit: Int, window: Window): Boolean =
+    limit <= conf.windowGroupLimitThreshold && window.child.maxRows.forall(_ > limit) &&
+      !window.child.isInstanceOf[WindowGroupLimit] &&
+      window.orderSpec.exists(!_.foldable) &&
+      !LimitPushDownThroughWindow.supportsPushdownThroughWindow(window.windowExpressions) &&
+      window.windowExpressions.forall {
+        case Alias(WindowExpression(windowFunction, WindowSpecDefinition(_, _,
+        SpecifiedWindowFrame(_, UnboundedPreceding, CurrentRow))), _)
+          if !windowFunction.isInstanceOf[SizeBasedWindowFunction] => true
+        case _ => false
+      }
+
+  private def isRowFrame(windowExpression: NamedExpression): Boolean = windowExpression match {
+    case Alias(WindowExpression(_, WindowSpecDefinition(_, _,
+    SpecifiedWindowFrame(RowFrame, UnboundedPreceding, CurrentRow))), _) => true
+    case _ => false
+  }
+
+  private def rankLikeFunction(windowExpressions: Seq[NamedExpression]): Expression =
+    // If windowExpressions all are RowFrame, choose SimpleLimitIterator,
+    // else RankLimitIterator to obtain enough rows for ensure data accuracy.
+    if (windowExpressions.forall(isRowFrame)) {
+      new RowNumber
+    } else {
+      new Rank

Review Comment:
   Take an example, aggregate function `sum(a) with row frame` and another `sum(a) with range frame`.
   Now we select the `Rank`.
   Is the output of `sum(a) with row frame` correct?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


Re: [PR] [SPARK-46228][SQL] Insert window group limit node for cumulative aggregation with limit [spark]

Posted by "zml1206 (via GitHub)" <gi...@apache.org>.
zml1206 commented on code in PR #44145:
URL: https://github.com/apache/spark/pull/44145#discussion_r1418921431


##########
sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/optimizer/InferWindowGroupLimit.scala:
##########
@@ -68,10 +71,56 @@ object InferWindowGroupLimit extends Rule[LogicalPlan] with PredicateHelper {
     case _ => false
   }
 
+  /**
+   * All window expressions should not have SizeBasedWindowFunction, all lower/upper of
+   * specifiedWindowFrame is UnboundedPreceding/CurrentRow, and window orderSpec is not foldable,
+   * so that we can safely do the early stop.
+   */
+  private def limitSupport(limit: Int, window: Window): Boolean =
+    limit <= conf.windowGroupLimitThreshold && window.child.maxRows.forall(_ > limit) &&
+      !window.child.isInstanceOf[WindowGroupLimit] &&
+      window.orderSpec.exists(!_.foldable) &&
+      !LimitPushDownThroughWindow.supportsPushdownThroughWindow(window.windowExpressions) &&
+      window.windowExpressions.forall {
+        case Alias(WindowExpression(windowFunction, WindowSpecDefinition(_, _,
+        SpecifiedWindowFrame(_, UnboundedPreceding, CurrentRow))), _)
+          if !windowFunction.isInstanceOf[SizeBasedWindowFunction] => true
+        case _ => false
+      }
+
+  private def isRowFrame(windowExpression: NamedExpression): Boolean = windowExpression match {
+    case Alias(WindowExpression(_, WindowSpecDefinition(_, _,
+    SpecifiedWindowFrame(RowFrame, UnboundedPreceding, CurrentRow))), _) => true
+    case _ => false
+  }
+
+  private def rankLikeFunction(windowExpressions: Seq[NamedExpression]): Expression =
+    // If windowExpressions all are RowFrame, choose SimpleLimitIterator,
+    // else RankLimitIterator to obtain enough rows for ensure data accuracy.
+    if (windowExpressions.forall(isRowFrame)) {
+      new RowNumber
+    } else {
+      new Rank

Review Comment:
   Split Window only with partition spec, order spec and window function type is right. But it also need  select `RankLimitIterator`, otherwise sum_value2 may be wrong.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


Re: [PR] [SPARK-46228][SQL] Insert window group limit node for limit outside of window [spark]

Posted by "beliefer (via GitHub)" <gi...@apache.org>.
beliefer commented on PR #44145:
URL: https://github.com/apache/spark/pull/44145#issuecomment-1848225342

   cc @cloud-fan 


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


Re: [PR] [SPARK-46228][SQL] Insert window group limit node for cumulative aggregation with limit [spark]

Posted by "zml1206 (via GitHub)" <gi...@apache.org>.
zml1206 commented on code in PR #44145:
URL: https://github.com/apache/spark/pull/44145#discussion_r1419845038


##########
sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/optimizer/InferWindowGroupLimit.scala:
##########
@@ -68,10 +71,56 @@ object InferWindowGroupLimit extends Rule[LogicalPlan] with PredicateHelper {
     case _ => false
   }
 
+  /**
+   * All window expressions should not have SizeBasedWindowFunction, all lower/upper of
+   * specifiedWindowFrame is UnboundedPreceding/CurrentRow, and window orderSpec is not foldable,
+   * so that we can safely do the early stop.
+   */
+  private def limitSupport(limit: Int, window: Window): Boolean =
+    limit <= conf.windowGroupLimitThreshold && window.child.maxRows.forall(_ > limit) &&
+      !window.child.isInstanceOf[WindowGroupLimit] &&
+      window.orderSpec.exists(!_.foldable) &&
+      !LimitPushDownThroughWindow.supportsPushdownThroughWindow(window.windowExpressions) &&
+      window.windowExpressions.forall {
+        case Alias(WindowExpression(windowFunction, WindowSpecDefinition(_, _,
+        SpecifiedWindowFrame(_, UnboundedPreceding, CurrentRow))), _)
+          if !windowFunction.isInstanceOf[SizeBasedWindowFunction] => true
+        case _ => false
+      }
+
+  private def isRowFrame(windowExpression: NamedExpression): Boolean = windowExpression match {
+    case Alias(WindowExpression(_, WindowSpecDefinition(_, _,
+    SpecifiedWindowFrame(RowFrame, UnboundedPreceding, CurrentRow))), _) => true
+    case _ => false
+  }
+
+  private def rankLikeFunction(windowExpressions: Seq[NamedExpression]): Expression =
+    // If windowExpressions all are RowFrame, choose SimpleLimitIterator,
+    // else RankLimitIterator to obtain enough rows for ensure data accuracy.
+    if (windowExpressions.forall(isRowFrame)) {
+      new RowNumber
+    } else {
+      new Rank

Review Comment:
   `WindowGroupLimit` only limits the window group and does not perform value calculation. The value calculation is still done by `Window`, so we only need to ensure the security of `WindowGroupLimit`.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


Re: [PR] [SPARK-46228][SQL] Insert window group limit node for cumulative aggregation with limit [spark]

Posted by "zml1206 (via GitHub)" <gi...@apache.org>.
zml1206 commented on code in PR #44145:
URL: https://github.com/apache/spark/pull/44145#discussion_r1419845038


##########
sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/optimizer/InferWindowGroupLimit.scala:
##########
@@ -68,10 +71,56 @@ object InferWindowGroupLimit extends Rule[LogicalPlan] with PredicateHelper {
     case _ => false
   }
 
+  /**
+   * All window expressions should not have SizeBasedWindowFunction, all lower/upper of
+   * specifiedWindowFrame is UnboundedPreceding/CurrentRow, and window orderSpec is not foldable,
+   * so that we can safely do the early stop.
+   */
+  private def limitSupport(limit: Int, window: Window): Boolean =
+    limit <= conf.windowGroupLimitThreshold && window.child.maxRows.forall(_ > limit) &&
+      !window.child.isInstanceOf[WindowGroupLimit] &&
+      window.orderSpec.exists(!_.foldable) &&
+      !LimitPushDownThroughWindow.supportsPushdownThroughWindow(window.windowExpressions) &&
+      window.windowExpressions.forall {
+        case Alias(WindowExpression(windowFunction, WindowSpecDefinition(_, _,
+        SpecifiedWindowFrame(_, UnboundedPreceding, CurrentRow))), _)
+          if !windowFunction.isInstanceOf[SizeBasedWindowFunction] => true
+        case _ => false
+      }
+
+  private def isRowFrame(windowExpression: NamedExpression): Boolean = windowExpression match {
+    case Alias(WindowExpression(_, WindowSpecDefinition(_, _,
+    SpecifiedWindowFrame(RowFrame, UnboundedPreceding, CurrentRow))), _) => true
+    case _ => false
+  }
+
+  private def rankLikeFunction(windowExpressions: Seq[NamedExpression]): Expression =
+    // If windowExpressions all are RowFrame, choose SimpleLimitIterator,
+    // else RankLimitIterator to obtain enough rows for ensure data accuracy.
+    if (windowExpressions.forall(isRowFrame)) {
+      new RowNumber
+    } else {
+      new Rank

Review Comment:
   
   `WindowGroupLimit` only limits the partition and does not perform value calculation. The value calculation is still done by `Window`, so we only need to ensure the security of `WindowGroupLimit`.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


Re: [PR] [SPARK-46228][SQL] Insert window group limit node for limit outside of window [spark]

Posted by "zml1206 (via GitHub)" <gi...@apache.org>.
zml1206 commented on code in PR #44145:
URL: https://github.com/apache/spark/pull/44145#discussion_r1473898334


##########
sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/optimizer/InferWindowGroupLimit.scala:
##########
@@ -17,21 +17,25 @@
 
 package org.apache.spark.sql.catalyst.optimizer
 
-import org.apache.spark.sql.catalyst.expressions.{Alias, Attribute, CurrentRow, DenseRank, EqualTo, Expression, GreaterThan, GreaterThanOrEqual, IntegerLiteral, LessThan, LessThanOrEqual, Literal, NamedExpression, PredicateHelper, Rank, RowFrame, RowNumber, SpecifiedWindowFrame, UnboundedPreceding, WindowExpression, WindowSpecDefinition}
-import org.apache.spark.sql.catalyst.plans.logical.{Filter, Limit, LocalRelation, LogicalPlan, Window, WindowGroupLimit}
+import org.apache.spark.sql.catalyst.expressions.{Alias, Attribute, CurrentRow, DenseRank, EqualTo, Expression, GreaterThan, GreaterThanOrEqual, IntegerLiteral, LessThan, LessThanOrEqual, Literal, NamedExpression, PredicateHelper, Rank, RowFrame, RowNumber, SizeBasedWindowFunction, SpecifiedWindowFrame, UnboundedPreceding, WindowExpression, WindowSpecDefinition}
+import org.apache.spark.sql.catalyst.plans.logical._
 import org.apache.spark.sql.catalyst.rules.Rule
-import org.apache.spark.sql.catalyst.trees.TreePattern.{FILTER, WINDOW}
+import org.apache.spark.sql.catalyst.trees.TreePattern.{FILTER, LIMIT, WINDOW}
 
 /**
  * Inserts a `WindowGroupLimit` below `Window` if the `Window` has rank-like functions
- * and the function results are further filtered by limit-like predicates. Example query:
+ * and the function results are further filtered by limit-like predicates or cumulative
+ * aggregation with limit excludes `SizeBasedWindowFunction`. Example query:

Review Comment:
   
   Yea, I verified locally that there is indeed a problem. Start a new PR to fix it, or fix it in this PR?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


Re: [PR] [SPARK-46228][SQL] Insert window group limit node for limit outside of window [spark]

Posted by "zml1206 (via GitHub)" <gi...@apache.org>.
zml1206 commented on code in PR #44145:
URL: https://github.com/apache/spark/pull/44145#discussion_r1473972315


##########
sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/optimizer/InferWindowGroupLimit.scala:
##########
@@ -17,21 +17,25 @@
 
 package org.apache.spark.sql.catalyst.optimizer
 
-import org.apache.spark.sql.catalyst.expressions.{Alias, Attribute, CurrentRow, DenseRank, EqualTo, Expression, GreaterThan, GreaterThanOrEqual, IntegerLiteral, LessThan, LessThanOrEqual, Literal, NamedExpression, PredicateHelper, Rank, RowFrame, RowNumber, SpecifiedWindowFrame, UnboundedPreceding, WindowExpression, WindowSpecDefinition}
-import org.apache.spark.sql.catalyst.plans.logical.{Filter, Limit, LocalRelation, LogicalPlan, Window, WindowGroupLimit}
+import org.apache.spark.sql.catalyst.expressions.{Alias, Attribute, CurrentRow, DenseRank, EqualTo, Expression, GreaterThan, GreaterThanOrEqual, IntegerLiteral, LessThan, LessThanOrEqual, Literal, NamedExpression, PredicateHelper, Rank, RowFrame, RowNumber, SizeBasedWindowFunction, SpecifiedWindowFrame, UnboundedPreceding, WindowExpression, WindowSpecDefinition}
+import org.apache.spark.sql.catalyst.plans.logical._
 import org.apache.spark.sql.catalyst.rules.Rule
-import org.apache.spark.sql.catalyst.trees.TreePattern.{FILTER, WINDOW}
+import org.apache.spark.sql.catalyst.trees.TreePattern.{FILTER, LIMIT, WINDOW}
 
 /**
  * Inserts a `WindowGroupLimit` below `Window` if the `Window` has rank-like functions
- * and the function results are further filtered by limit-like predicates. Example query:
+ * and the function results are further filtered by limit-like predicates or cumulative
+ * aggregation with limit excludes `SizeBasedWindowFunction`. Example query:

Review Comment:
   Start a new PR is better? Because bug is caused by rank-like filter throuth window, this pr is limit throuth window.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


Re: [PR] [SPARK-46228][SQL] Insert window group limit node for limit outside of window [spark]

Posted by "cloud-fan (via GitHub)" <gi...@apache.org>.
cloud-fan commented on code in PR #44145:
URL: https://github.com/apache/spark/pull/44145#discussion_r1476028600


##########
sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/optimizer/InferWindowGroupLimit.scala:
##########
@@ -18,20 +18,24 @@
 package org.apache.spark.sql.catalyst.optimizer
 
 import org.apache.spark.sql.catalyst.expressions.{Alias, Attribute, CurrentRow, DenseRank, EqualTo, Expression, GreaterThan, GreaterThanOrEqual, IntegerLiteral, LessThan, LessThanOrEqual, Literal, NamedExpression, PredicateHelper, Rank, RowFrame, RowNumber, SizeBasedWindowFunction, SpecifiedWindowFrame, UnboundedPreceding, WindowExpression, WindowSpecDefinition}
-import org.apache.spark.sql.catalyst.plans.logical.{Filter, Limit, LocalRelation, LogicalPlan, Window, WindowGroupLimit}
+import org.apache.spark.sql.catalyst.plans.logical._
 import org.apache.spark.sql.catalyst.rules.Rule
-import org.apache.spark.sql.catalyst.trees.TreePattern.{FILTER, WINDOW}
+import org.apache.spark.sql.catalyst.trees.TreePattern.{FILTER, LIMIT, WINDOW}
 
 /**
  * Inserts a `WindowGroupLimit` below `Window` if the `Window` has rank-like functions
- * and the function results are further filtered by limit-like predicates. Example query:
+ * and the function results are further filtered by limit-like predicates or cumulative

Review Comment:
   ```suggestion
    * and the function results are further filtered by limit-like predicates or an actual limit.
   ```



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


Re: [PR] [SPARK-46228][SQL] Insert window group limit node for limit outside of window [spark]

Posted by "beliefer (via GitHub)" <gi...@apache.org>.
beliefer commented on code in PR #44145:
URL: https://github.com/apache/spark/pull/44145#discussion_r1474031237


##########
sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/optimizer/InferWindowGroupLimit.scala:
##########
@@ -17,21 +17,25 @@
 
 package org.apache.spark.sql.catalyst.optimizer
 
-import org.apache.spark.sql.catalyst.expressions.{Alias, Attribute, CurrentRow, DenseRank, EqualTo, Expression, GreaterThan, GreaterThanOrEqual, IntegerLiteral, LessThan, LessThanOrEqual, Literal, NamedExpression, PredicateHelper, Rank, RowFrame, RowNumber, SpecifiedWindowFrame, UnboundedPreceding, WindowExpression, WindowSpecDefinition}
-import org.apache.spark.sql.catalyst.plans.logical.{Filter, Limit, LocalRelation, LogicalPlan, Window, WindowGroupLimit}
+import org.apache.spark.sql.catalyst.expressions.{Alias, Attribute, CurrentRow, DenseRank, EqualTo, Expression, GreaterThan, GreaterThanOrEqual, IntegerLiteral, LessThan, LessThanOrEqual, Literal, NamedExpression, PredicateHelper, Rank, RowFrame, RowNumber, SizeBasedWindowFunction, SpecifiedWindowFrame, UnboundedPreceding, WindowExpression, WindowSpecDefinition}
+import org.apache.spark.sql.catalyst.plans.logical._
 import org.apache.spark.sql.catalyst.rules.Rule
-import org.apache.spark.sql.catalyst.trees.TreePattern.{FILTER, WINDOW}
+import org.apache.spark.sql.catalyst.trees.TreePattern.{FILTER, LIMIT, WINDOW}
 
 /**
  * Inserts a `WindowGroupLimit` below `Window` if the `Window` has rank-like functions
- * and the function results are further filtered by limit-like predicates. Example query:
+ * and the function results are further filtered by limit-like predicates or cumulative
+ * aggregation with limit excludes `SizeBasedWindowFunction`. Example query:

Review Comment:
   @zml1206 OK. Please commit that PR first. After that PR merged, this PR go on.
   Thank you!



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


Re: [PR] [SPARK-46228][SQL] Insert window group limit node for limit outside of window [spark]

Posted by "beliefer (via GitHub)" <gi...@apache.org>.
beliefer commented on code in PR #44145:
URL: https://github.com/apache/spark/pull/44145#discussion_r1421239656


##########
sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/optimizer/InferWindowGroupLimit.scala:
##########
@@ -17,21 +17,25 @@
 
 package org.apache.spark.sql.catalyst.optimizer
 
-import org.apache.spark.sql.catalyst.expressions.{Alias, Attribute, CurrentRow, DenseRank, EqualTo, Expression, GreaterThan, GreaterThanOrEqual, IntegerLiteral, LessThan, LessThanOrEqual, Literal, NamedExpression, PredicateHelper, Rank, RowFrame, RowNumber, SpecifiedWindowFrame, UnboundedPreceding, WindowExpression, WindowSpecDefinition}
-import org.apache.spark.sql.catalyst.plans.logical.{Filter, Limit, LocalRelation, LogicalPlan, Window, WindowGroupLimit}
+import org.apache.spark.sql.catalyst.expressions.{Alias, Attribute, CurrentRow, DenseRank, EqualTo, Expression, GreaterThan, GreaterThanOrEqual, IntegerLiteral, LessThan, LessThanOrEqual, Literal, NamedExpression, PredicateHelper, Rank, RowFrame, RowNumber, SizeBasedWindowFunction, SpecifiedWindowFrame, UnboundedPreceding, WindowExpression, WindowSpecDefinition}
+import org.apache.spark.sql.catalyst.plans.logical._
 import org.apache.spark.sql.catalyst.rules.Rule
-import org.apache.spark.sql.catalyst.trees.TreePattern.{FILTER, WINDOW}
+import org.apache.spark.sql.catalyst.trees.TreePattern.{FILTER, LIMIT, WINDOW}
 
 /**
  * Inserts a `WindowGroupLimit` below `Window` if the `Window` has rank-like functions
- * and the function results are further filtered by limit-like predicates. Example query:
+ * and the function results are further filtered by limit-like predicates or cumulative
+ * aggregation with limit excludes `SizeBasedWindowFunction`. Example query:
  * {{{
  *   SELECT *, ROW_NUMBER() OVER(PARTITION BY k ORDER BY a) AS rn FROM Tab1 WHERE rn = 5
  *   SELECT *, ROW_NUMBER() OVER(PARTITION BY k ORDER BY a) AS rn FROM Tab1 WHERE 5 = rn
  *   SELECT *, ROW_NUMBER() OVER(PARTITION BY k ORDER BY a) AS rn FROM Tab1 WHERE rn < 5
  *   SELECT *, ROW_NUMBER() OVER(PARTITION BY k ORDER BY a) AS rn FROM Tab1 WHERE 5 > rn
  *   SELECT *, ROW_NUMBER() OVER(PARTITION BY k ORDER BY a) AS rn FROM Tab1 WHERE rn <= 5
  *   SELECT *, ROW_NUMBER() OVER(PARTITION BY k ORDER BY a) AS rn FROM Tab1 WHERE 5 >= rn
+ *   SELECT *, ROW_NUMBER() OVER(PARTITION BY k ORDER BY a) AS rn FROM Tab1 LIMIT 5
+ *   SELECT *, SUM(b) OVER(PARTITION BY k ORDER BY a) AS s FROM Tab1 LIMIT 5
+ *   SELECT *, SUM(b) OVER(ORDER BY a) AS s FROM Tab1 LIMIT 5

Review Comment:
   Please add the test case into `InferWindowGroupLimitSuite`



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


Re: [PR] [SPARK-46228][SQL] Insert window group limit node for limit outside of window [spark]

Posted by "zml1206 (via GitHub)" <gi...@apache.org>.
zml1206 commented on code in PR #44145:
URL: https://github.com/apache/spark/pull/44145#discussion_r1421248218


##########
sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/optimizer/InferWindowGroupLimit.scala:
##########
@@ -17,21 +17,25 @@
 
 package org.apache.spark.sql.catalyst.optimizer
 
-import org.apache.spark.sql.catalyst.expressions.{Alias, Attribute, CurrentRow, DenseRank, EqualTo, Expression, GreaterThan, GreaterThanOrEqual, IntegerLiteral, LessThan, LessThanOrEqual, Literal, NamedExpression, PredicateHelper, Rank, RowFrame, RowNumber, SpecifiedWindowFrame, UnboundedPreceding, WindowExpression, WindowSpecDefinition}
-import org.apache.spark.sql.catalyst.plans.logical.{Filter, Limit, LocalRelation, LogicalPlan, Window, WindowGroupLimit}
+import org.apache.spark.sql.catalyst.expressions.{Alias, Attribute, CurrentRow, DenseRank, EqualTo, Expression, GreaterThan, GreaterThanOrEqual, IntegerLiteral, LessThan, LessThanOrEqual, Literal, NamedExpression, PredicateHelper, Rank, RowFrame, RowNumber, SizeBasedWindowFunction, SpecifiedWindowFrame, UnboundedPreceding, WindowExpression, WindowSpecDefinition}
+import org.apache.spark.sql.catalyst.plans.logical._
 import org.apache.spark.sql.catalyst.rules.Rule
-import org.apache.spark.sql.catalyst.trees.TreePattern.{FILTER, WINDOW}
+import org.apache.spark.sql.catalyst.trees.TreePattern.{FILTER, LIMIT, WINDOW}
 
 /**
  * Inserts a `WindowGroupLimit` below `Window` if the `Window` has rank-like functions
- * and the function results are further filtered by limit-like predicates. Example query:
+ * and the function results are further filtered by limit-like predicates or cumulative
+ * aggregation with limit excludes `SizeBasedWindowFunction`. Example query:
  * {{{
  *   SELECT *, ROW_NUMBER() OVER(PARTITION BY k ORDER BY a) AS rn FROM Tab1 WHERE rn = 5
  *   SELECT *, ROW_NUMBER() OVER(PARTITION BY k ORDER BY a) AS rn FROM Tab1 WHERE 5 = rn
  *   SELECT *, ROW_NUMBER() OVER(PARTITION BY k ORDER BY a) AS rn FROM Tab1 WHERE rn < 5
  *   SELECT *, ROW_NUMBER() OVER(PARTITION BY k ORDER BY a) AS rn FROM Tab1 WHERE 5 > rn
  *   SELECT *, ROW_NUMBER() OVER(PARTITION BY k ORDER BY a) AS rn FROM Tab1 WHERE rn <= 5
  *   SELECT *, ROW_NUMBER() OVER(PARTITION BY k ORDER BY a) AS rn FROM Tab1 WHERE 5 >= rn
+ *   SELECT *, ROW_NUMBER() OVER(PARTITION BY k ORDER BY a) AS rn FROM Tab1 LIMIT 5
+ *   SELECT *, SUM(b) OVER(PARTITION BY k ORDER BY a) AS s FROM Tab1 LIMIT 5
+ *   SELECT *, SUM(b) OVER(ORDER BY a) AS s FROM Tab1 LIMIT 5

Review Comment:
   Added.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


Re: [PR] [SPARK-46228][SQL] Insert window group limit node for limit outside of window [spark]

Posted by "zml1206 (via GitHub)" <gi...@apache.org>.
zml1206 commented on PR #44145:
URL: https://github.com/apache/spark/pull/44145#issuecomment-1848418182

   > @zml1206 The PR description is not clear enough. Please modify `We can extract the limit value 2 (("a", 4, ""),("a", 4, "")) on window group a because window expression is RangeFrame and after sort, the sort values ​​of the first row and the second row are the same, limit value 1 (("b", 1, "h")) on window group b in WindowGroupLimitExec.` You can change to `We infer the rank() on window group a because window expression is RangeFrame and after sort. WindowGroupLimitExec will select RankLimitIterator to select the range values. We infer the rownumber() on window group b. WindowGroupLimitExec will select SimpleLimitIterator to select the row values.`
   
   The description is a bit ambiguous. I made some changes. Please see what you think about it.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


Re: [PR] [SPARK-46228][SQL] Insert window group limit node for cumulative aggregation with limit [spark]

Posted by "zml1206 (via GitHub)" <gi...@apache.org>.
zml1206 commented on code in PR #44145:
URL: https://github.com/apache/spark/pull/44145#discussion_r1417176550


##########
sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/optimizer/InferWindowGroupLimit.scala:
##########
@@ -58,8 +61,9 @@ object InferWindowGroupLimit extends Rule[LogicalPlan] with PredicateHelper {
    */
   private def isExpandingWindow(
       windowExpression: NamedExpression): Boolean = windowExpression match {
-    case Alias(WindowExpression(_, WindowSpecDefinition(_, _,
-    SpecifiedWindowFrame(RowFrame, UnboundedPreceding, CurrentRow))), _) => true

Review Comment:
   
   I have also found problems with this implementation before, I will make changes later.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


Re: [PR] [SPARK-46228][SQL] Insert window group limit node for cumulative aggregation with limit [spark]

Posted by "zml1206 (via GitHub)" <gi...@apache.org>.
zml1206 commented on code in PR #44145:
URL: https://github.com/apache/spark/pull/44145#discussion_r1418239659


##########
sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/optimizer/InferWindowGroupLimit.scala:
##########
@@ -68,10 +71,56 @@ object InferWindowGroupLimit extends Rule[LogicalPlan] with PredicateHelper {
     case _ => false
   }
 
+  /**
+   * All window expressions should not have SizeBasedWindowFunction, all lower/upper of
+   * specifiedWindowFrame is UnboundedPreceding/CurrentRow, and window orderSpec is not foldable,
+   * so that we can safely do the early stop.
+   */
+  private def limitSupport(limit: Int, window: Window): Boolean =
+    limit <= conf.windowGroupLimitThreshold && window.child.maxRows.forall(_ > limit) &&
+      !window.child.isInstanceOf[WindowGroupLimit] &&
+      window.orderSpec.exists(!_.foldable) &&
+      !LimitPushDownThroughWindow.supportsPushdownThroughWindow(window.windowExpressions) &&
+      window.windowExpressions.forall {
+        case Alias(WindowExpression(windowFunction, WindowSpecDefinition(_, _,
+        SpecifiedWindowFrame(_, UnboundedPreceding, CurrentRow))), _)
+          if !windowFunction.isInstanceOf[SizeBasedWindowFunction] => true
+        case _ => false
+      }
+
+  private def isRowFrame(windowExpression: NamedExpression): Boolean = windowExpression match {
+    case Alias(WindowExpression(_, WindowSpecDefinition(_, _,
+    SpecifiedWindowFrame(RowFrame, UnboundedPreceding, CurrentRow))), _) => true
+    case _ => false
+  }
+
+  private def rankLikeFunction(windowExpressions: Seq[NamedExpression]): Expression =
+    // If windowExpressions all are RowFrame, choose SimpleLimitIterator,
+    // else RankLimitIterator to obtain enough rows for ensure data accuracy.
+    if (windowExpressions.forall(isRowFrame)) {
+      new RowNumber
+    } else {
+      new Rank

Review Comment:
   
   There is a problem with this example. Another example, `df.withColumn("rn", row_number().over(window)).withColumn("sum_value", sum("value").over(window)).limit(1) `. row_number is row frame, sum is range frame, We need to select `RankLimitIterator`. If select `SimpleLimitIterator`, sum value data will be wrong when the same row exists.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


Re: [PR] [SPARK-46228][SQL] Insert window group limit node for cumulative aggregation with limit [spark]

Posted by "zml1206 (via GitHub)" <gi...@apache.org>.
zml1206 commented on code in PR #44145:
URL: https://github.com/apache/spark/pull/44145#discussion_r1418275721


##########
sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/optimizer/InferWindowGroupLimit.scala:
##########
@@ -68,10 +71,56 @@ object InferWindowGroupLimit extends Rule[LogicalPlan] with PredicateHelper {
     case _ => false
   }
 
+  /**
+   * All window expressions should not have SizeBasedWindowFunction, all lower/upper of
+   * specifiedWindowFrame is UnboundedPreceding/CurrentRow, and window orderSpec is not foldable,
+   * so that we can safely do the early stop.
+   */
+  private def limitSupport(limit: Int, window: Window): Boolean =
+    limit <= conf.windowGroupLimitThreshold && window.child.maxRows.forall(_ > limit) &&
+      !window.child.isInstanceOf[WindowGroupLimit] &&
+      window.orderSpec.exists(!_.foldable) &&
+      !LimitPushDownThroughWindow.supportsPushdownThroughWindow(window.windowExpressions) &&
+      window.windowExpressions.forall {
+        case Alias(WindowExpression(windowFunction, WindowSpecDefinition(_, _,
+        SpecifiedWindowFrame(_, UnboundedPreceding, CurrentRow))), _)
+          if !windowFunction.isInstanceOf[SizeBasedWindowFunction] => true
+        case _ => false
+      }
+
+  private def isRowFrame(windowExpression: NamedExpression): Boolean = windowExpression match {
+    case Alias(WindowExpression(_, WindowSpecDefinition(_, _,
+    SpecifiedWindowFrame(RowFrame, UnboundedPreceding, CurrentRow))), _) => true
+    case _ => false
+  }
+
+  private def rankLikeFunction(windowExpressions: Seq[NamedExpression]): Expression =
+    // If windowExpressions all are RowFrame, choose SimpleLimitIterator,
+    // else RankLimitIterator to obtain enough rows for ensure data accuracy.
+    if (windowExpressions.forall(isRowFrame)) {
+      new RowNumber
+    } else {
+      new Rank

Review Comment:
   They are two window.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


Re: [PR] [SPARK-46228][SQL] Insert window group limit node for cumulative aggregation with limit [spark]

Posted by "zml1206 (via GitHub)" <gi...@apache.org>.
zml1206 commented on code in PR #44145:
URL: https://github.com/apache/spark/pull/44145#discussion_r1416577802


##########
sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/optimizer/InferWindowGroupLimit.scala:
##########
@@ -17,21 +17,24 @@
 
 package org.apache.spark.sql.catalyst.optimizer
 
-import org.apache.spark.sql.catalyst.expressions.{Alias, Attribute, CurrentRow, DenseRank, EqualTo, Expression, GreaterThan, GreaterThanOrEqual, IntegerLiteral, LessThan, LessThanOrEqual, Literal, NamedExpression, PredicateHelper, Rank, RowFrame, RowNumber, SpecifiedWindowFrame, UnboundedPreceding, WindowExpression, WindowSpecDefinition}
-import org.apache.spark.sql.catalyst.plans.logical.{Filter, Limit, LocalRelation, LogicalPlan, Window, WindowGroupLimit}
+import org.apache.spark.sql.catalyst.expressions.{Alias, Attribute, CurrentRow, DenseRank, EqualTo, Expression, GreaterThan, GreaterThanOrEqual, IntegerLiteral, LessThan, LessThanOrEqual, Literal, NamedExpression, PredicateHelper, Rank, RowNumber, SizeBasedWindowFunction, SpecifiedWindowFrame, UnboundedPreceding, WindowExpression, WindowSpecDefinition}
+import org.apache.spark.sql.catalyst.plans.logical._
 import org.apache.spark.sql.catalyst.rules.Rule
-import org.apache.spark.sql.catalyst.trees.TreePattern.{FILTER, WINDOW}
+import org.apache.spark.sql.catalyst.trees.TreePattern.{FILTER, LIMIT, WINDOW}
 
 /**
  * Inserts a `WindowGroupLimit` below `Window` if the `Window` has rank-like functions
- * and the function results are further filtered by limit-like predicates. Example query:
+ * and the function results are further filtered by limit-like predicates or cumulative
+ * aggregation with limit excludes `SizeBasedWindowFunction`. Example query:
  * {{{
  *   SELECT *, ROW_NUMBER() OVER(PARTITION BY k ORDER BY a) AS rn FROM Tab1 WHERE rn = 5
  *   SELECT *, ROW_NUMBER() OVER(PARTITION BY k ORDER BY a) AS rn FROM Tab1 WHERE 5 = rn
  *   SELECT *, ROW_NUMBER() OVER(PARTITION BY k ORDER BY a) AS rn FROM Tab1 WHERE rn < 5
  *   SELECT *, ROW_NUMBER() OVER(PARTITION BY k ORDER BY a) AS rn FROM Tab1 WHERE 5 > rn
  *   SELECT *, ROW_NUMBER() OVER(PARTITION BY k ORDER BY a) AS rn FROM Tab1 WHERE rn <= 5
  *   SELECT *, ROW_NUMBER() OVER(PARTITION BY k ORDER BY a) AS rn FROM Tab1 WHERE 5 >= rn
+ *   SELECT *, ROW_NUMBER() OVER(PARTITION BY k ORDER BY a) AS rn FROM Tab1 limit 5

Review Comment:
   Thanks. First, the effect of this case is the same as top-k, such as the test results of q67-changed above. This is beneficial, assuming the per-group data size is large. Secondly, I should update to distinguish if it only contains row_number/rank/dens_rank, choose SimpleLimitIterator. what do you think?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


Re: [PR] [SPARK-46228][SQL] Insert window group limit node for cumulative aggregation with limit [spark]

Posted by "beliefer (via GitHub)" <gi...@apache.org>.
beliefer commented on code in PR #44145:
URL: https://github.com/apache/spark/pull/44145#discussion_r1417209744


##########
sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/optimizer/InferWindowGroupLimit.scala:
##########
@@ -68,10 +72,57 @@ object InferWindowGroupLimit extends Rule[LogicalPlan] with PredicateHelper {
     case _ => false
   }
 
+  private def limitSupport(limit: Int, window: Window): Boolean =
+    limit <= conf.windowGroupLimitThreshold && window.child.maxRows.forall(_ > limit) &&
+      !window.child.isInstanceOf[WindowGroupLimit] &&
+      window.windowExpressions.forall(isExpandingWindow) &&
+      window.orderSpec.exists(!_.foldable) &&

Review Comment:
   Sounds reasonable.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


Re: [PR] [SPARK-46228][SQL] Insert window group limit node for limit outside of window [spark]

Posted by "beliefer (via GitHub)" <gi...@apache.org>.
beliefer commented on code in PR #44145:
URL: https://github.com/apache/spark/pull/44145#discussion_r1476932669


##########
sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/optimizer/InferWindowGroupLimit.scala:
##########
@@ -69,10 +73,55 @@ object InferWindowGroupLimit extends Rule[LogicalPlan] with PredicateHelper {
     case _ => false
   }
 
+  /**
+   * Whether support inferring WindowGroupLimit from Limit outside of Window. Check if:
+   * 1. The window orderSpec exists unfoldable one or all window expressions should use the same
+   *  expanding window.
+   * 2. All window expressions should not have SizeBasedWindowFunction.
+   * 3. The Limit could not be pushed down through Window.
+   */
+  private def limitSupport(limit: Int, window: Window): Boolean =
+    limit <= conf.windowGroupLimitThreshold && window.child.maxRows.forall(_ > limit) &&
+      !window.child.isInstanceOf[WindowGroupLimit] &&
+      (window.orderSpec.exists(!_.child.foldable) ||
+        window.windowExpressions.forall(isExpandingWindow)) &&
+      window.windowExpressions.forall {
+        case Alias(WindowExpression(windowFunction, WindowSpecDefinition(_, _,
+        SpecifiedWindowFrame(_, UnboundedPreceding, CurrentRow))), _)
+          if !windowFunction.isInstanceOf[SizeBasedWindowFunction] &&

Review Comment:
   We should remove the three lines now.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


Re: [PR] [SPARK-46228][SQL] Insert window group limit node for cumulative aggregation with limit [spark]

Posted by "beliefer (via GitHub)" <gi...@apache.org>.
beliefer commented on code in PR #44145:
URL: https://github.com/apache/spark/pull/44145#discussion_r1418881952


##########
sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/optimizer/InferWindowGroupLimit.scala:
##########
@@ -68,10 +72,57 @@ object InferWindowGroupLimit extends Rule[LogicalPlan] with PredicateHelper {
     case _ => false
   }
 
+  private def limitSupport(limit: Int, window: Window): Boolean =
+    limit <= conf.windowGroupLimitThreshold && window.child.maxRows.forall(_ > limit) &&
+      !window.child.isInstanceOf[WindowGroupLimit] &&
+      window.windowExpressions.forall(isExpandingWindow) &&
+      window.orderSpec.exists(!_.foldable) &&

Review Comment:
   After a second thought, shall we allow all the `orderSpec` are foldable if all the window expressions with `RowFrame`?



##########
sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/optimizer/InferWindowGroupLimit.scala:
##########
@@ -68,10 +71,56 @@ object InferWindowGroupLimit extends Rule[LogicalPlan] with PredicateHelper {
     case _ => false
   }
 
+  /**
+   * All window expressions should not have SizeBasedWindowFunction, all lower/upper of
+   * specifiedWindowFrame is UnboundedPreceding/CurrentRow, and window orderSpec is not foldable,
+   * so that we can safely do the early stop.
+   */
+  private def limitSupport(limit: Int, window: Window): Boolean =
+    limit <= conf.windowGroupLimitThreshold && window.child.maxRows.forall(_ > limit) &&
+      !window.child.isInstanceOf[WindowGroupLimit] &&
+      window.orderSpec.exists(!_.foldable) &&
+      !LimitPushDownThroughWindow.supportsPushdownThroughWindow(window.windowExpressions) &&
+      window.windowExpressions.forall {
+        case Alias(WindowExpression(windowFunction, WindowSpecDefinition(_, _,
+        SpecifiedWindowFrame(_, UnboundedPreceding, CurrentRow))), _)
+          if !windowFunction.isInstanceOf[SizeBasedWindowFunction] => true
+        case _ => false
+      }
+
+  private def isRowFrame(windowExpression: NamedExpression): Boolean = windowExpression match {
+    case Alias(WindowExpression(_, WindowSpecDefinition(_, _,
+    SpecifiedWindowFrame(RowFrame, UnboundedPreceding, CurrentRow))), _) => true
+    case _ => false
+  }
+
+  private def rankLikeFunction(windowExpressions: Seq[NamedExpression]): Expression =
+    // If windowExpressions all are RowFrame, choose SimpleLimitIterator,
+    // else RankLimitIterator to obtain enough rows for ensure data accuracy.
+    if (windowExpressions.forall(isRowFrame)) {
+      new RowNumber
+    } else {
+      new Rank

Review Comment:
   No. split Window only with partition spec, order spec and window function type. The window frame is not participant.
   
   `df.withColumn("sum_value1", sum("value").over(window.rowsBetween(Window.unboundedPreceding, Window.currentRow))).withColumn("sum_value2", sum("value").over(window.rangeBetween(Window.unboundedPreceding, Window.currentRow))).limit(1)`



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


Re: [PR] [SPARK-46228][SQL] Insert window group limit node for cumulative aggregation with limit [spark]

Posted by "zml1206 (via GitHub)" <gi...@apache.org>.
zml1206 commented on code in PR #44145:
URL: https://github.com/apache/spark/pull/44145#discussion_r1419861725


##########
sql/core/src/test/scala/org/apache/spark/sql/DataFrameWindowFunctionsSuite.scala:
##########
@@ -1276,6 +1276,147 @@ class DataFrameWindowFunctionsSuite extends QueryTest
     )
   }
 
+  test("SPARK-46228: Insert window group limit node for cumulative aggregation with limit") {
+
+    val nullStr: String = null
+    val df = Seq(
+      ("a", 0, "c"),
+      ("a", 1, "x"),
+      ("a", 2, "y"),
+      ("a", 3, "z"),
+      ("a", 4, ""),
+      ("a", 4, ""),
+      ("b", 1, "h"),
+      ("b", 1, "n"),
+      ("c", 1, "z"),
+      ("c", 1, "a"),
+      ("c", 2, nullStr)).toDF("key", "value", "order")
+    val window = Window.partitionBy($"key").orderBy($"order".asc_nulls_first)
+    val window2 = Window.partitionBy($"key").orderBy($"order".desc_nulls_first)
+    val window3 = Window.orderBy($"order".asc_nulls_first)
+
+    Seq(true, false).foreach { enableEvaluator =>
+      withSQLConf(SQLConf.USE_PARTITION_EVALUATOR.key -> enableEvaluator.toString) {
+        Seq(-1, 100).foreach { threshold =>
+          withSQLConf(SQLConf.WINDOW_GROUP_LIMIT_THRESHOLD.key -> threshold.toString) {
+            // RowFrame
+            val existWindowGroupLimitRowFrame =

Review Comment:
   Relevant tests have been added to `InferWindowGroupLimitSuite`, I will remove the redundant ones in `DataFrameWindowFunctionsSuite`.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


Re: [PR] [SPARK-46228][SQL] Insert window group limit node for limit outside of window [spark]

Posted by "zml1206 (via GitHub)" <gi...@apache.org>.
zml1206 commented on code in PR #44145:
URL: https://github.com/apache/spark/pull/44145#discussion_r1474001014


##########
sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/optimizer/InferWindowGroupLimit.scala:
##########
@@ -17,21 +17,25 @@
 
 package org.apache.spark.sql.catalyst.optimizer
 
-import org.apache.spark.sql.catalyst.expressions.{Alias, Attribute, CurrentRow, DenseRank, EqualTo, Expression, GreaterThan, GreaterThanOrEqual, IntegerLiteral, LessThan, LessThanOrEqual, Literal, NamedExpression, PredicateHelper, Rank, RowFrame, RowNumber, SpecifiedWindowFrame, UnboundedPreceding, WindowExpression, WindowSpecDefinition}
-import org.apache.spark.sql.catalyst.plans.logical.{Filter, Limit, LocalRelation, LogicalPlan, Window, WindowGroupLimit}
+import org.apache.spark.sql.catalyst.expressions.{Alias, Attribute, CurrentRow, DenseRank, EqualTo, Expression, GreaterThan, GreaterThanOrEqual, IntegerLiteral, LessThan, LessThanOrEqual, Literal, NamedExpression, PredicateHelper, Rank, RowFrame, RowNumber, SizeBasedWindowFunction, SpecifiedWindowFrame, UnboundedPreceding, WindowExpression, WindowSpecDefinition}
+import org.apache.spark.sql.catalyst.plans.logical._
 import org.apache.spark.sql.catalyst.rules.Rule
-import org.apache.spark.sql.catalyst.trees.TreePattern.{FILTER, WINDOW}
+import org.apache.spark.sql.catalyst.trees.TreePattern.{FILTER, LIMIT, WINDOW}
 
 /**
  * Inserts a `WindowGroupLimit` below `Window` if the `Window` has rank-like functions
- * and the function results are further filtered by limit-like predicates. Example query:
+ * and the function results are further filtered by limit-like predicates or cumulative
+ * aggregation with limit excludes `SizeBasedWindowFunction`. Example query:

Review Comment:
   
   I have fixed it locally and added UT. How about I submit it? @beliefer 



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


Re: [PR] [SPARK-46228][SQL] Insert window group limit node for limit outside of window [spark]

Posted by "zml1206 (via GitHub)" <gi...@apache.org>.
zml1206 commented on code in PR #44145:
URL: https://github.com/apache/spark/pull/44145#discussion_r1473898334


##########
sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/optimizer/InferWindowGroupLimit.scala:
##########
@@ -17,21 +17,25 @@
 
 package org.apache.spark.sql.catalyst.optimizer
 
-import org.apache.spark.sql.catalyst.expressions.{Alias, Attribute, CurrentRow, DenseRank, EqualTo, Expression, GreaterThan, GreaterThanOrEqual, IntegerLiteral, LessThan, LessThanOrEqual, Literal, NamedExpression, PredicateHelper, Rank, RowFrame, RowNumber, SpecifiedWindowFrame, UnboundedPreceding, WindowExpression, WindowSpecDefinition}
-import org.apache.spark.sql.catalyst.plans.logical.{Filter, Limit, LocalRelation, LogicalPlan, Window, WindowGroupLimit}
+import org.apache.spark.sql.catalyst.expressions.{Alias, Attribute, CurrentRow, DenseRank, EqualTo, Expression, GreaterThan, GreaterThanOrEqual, IntegerLiteral, LessThan, LessThanOrEqual, Literal, NamedExpression, PredicateHelper, Rank, RowFrame, RowNumber, SizeBasedWindowFunction, SpecifiedWindowFrame, UnboundedPreceding, WindowExpression, WindowSpecDefinition}
+import org.apache.spark.sql.catalyst.plans.logical._
 import org.apache.spark.sql.catalyst.rules.Rule
-import org.apache.spark.sql.catalyst.trees.TreePattern.{FILTER, WINDOW}
+import org.apache.spark.sql.catalyst.trees.TreePattern.{FILTER, LIMIT, WINDOW}
 
 /**
  * Inserts a `WindowGroupLimit` below `Window` if the `Window` has rank-like functions
- * and the function results are further filtered by limit-like predicates. Example query:
+ * and the function results are further filtered by limit-like predicates or cumulative
+ * aggregation with limit excludes `SizeBasedWindowFunction`. Example query:

Review Comment:
   Start a new PR to fix it, or fix it in this PR?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


Re: [PR] [SPARK-46228][SQL] Insert window group limit node for cumulative aggregation with limit [spark]

Posted by "beliefer (via GitHub)" <gi...@apache.org>.
beliefer commented on code in PR #44145:
URL: https://github.com/apache/spark/pull/44145#discussion_r1417145168


##########
sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/optimizer/InferWindowGroupLimit.scala:
##########
@@ -68,10 +72,57 @@ object InferWindowGroupLimit extends Rule[LogicalPlan] with PredicateHelper {
     case _ => false
   }
 
+  private def limitSupport(limit: Int, window: Window): Boolean =
+    limit <= conf.windowGroupLimitThreshold && window.child.maxRows.forall(_ > limit) &&
+      !window.child.isInstanceOf[WindowGroupLimit] &&
+      window.windowExpressions.forall(isExpandingWindow) &&
+      window.orderSpec.exists(!_.foldable) &&
+      // LimitPushDownThroughWindow have better performance than WindowGroupLimit if the
+      // window function is Rank, DenseRank and RowNumber, and Window partitionSpec is empty.
+      !(window.partitionSpec.isEmpty && supportsPushdownThroughWindow(window.windowExpressions) &&
+        limit < conf.topKSortFallbackThreshold)
+
+  private def supportsPushdownThroughWindow(

Review Comment:
   We can change the method of `LimitPushDownThroughWindow` to `private[catalyst]` and avoid copy it.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


Re: [PR] [SPARK-46228][SQL] Insert window group limit node for cumulative aggregation with limit [spark]

Posted by "zml1206 (via GitHub)" <gi...@apache.org>.
zml1206 commented on PR #44145:
URL: https://github.com/apache/spark/pull/44145#issuecomment-1842460913

   > Although I understood the mechanism, please add more detail in the PR description. So as other contributors could understand.
   
   Done


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


Re: [PR] [SPARK-46228][SQL] Insert window group limit node for cumulative aggregation with limit [spark]

Posted by "zml1206 (via GitHub)" <gi...@apache.org>.
zml1206 commented on code in PR #44145:
URL: https://github.com/apache/spark/pull/44145#discussion_r1418250683


##########
sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/optimizer/InferWindowGroupLimit.scala:
##########
@@ -68,10 +72,57 @@ object InferWindowGroupLimit extends Rule[LogicalPlan] with PredicateHelper {
     case _ => false
   }
 
+  private def limitSupport(limit: Int, window: Window): Boolean =
+    limit <= conf.windowGroupLimitThreshold && window.child.maxRows.forall(_ > limit) &&
+      !window.child.isInstanceOf[WindowGroupLimit] &&
+      window.windowExpressions.forall(isExpandingWindow) &&
+      window.orderSpec.exists(!_.foldable) &&
+      // LimitPushDownThroughWindow have better performance than WindowGroupLimit if the
+      // window function is Rank, DenseRank and RowNumber, and Window partitionSpec is empty.
+      !(window.partitionSpec.isEmpty && supportsPushdownThroughWindow(window.windowExpressions) &&

Review Comment:
   `df.withColumn("rn", row_number().over(Window.orderBy($"order".asc_nulls_first)))
                 .limit(10)
                 .where("rn <= 5")`
    Because filter in `InferWindowGroupLimit` is possible to convert filter to limit. Code is 
   ```
   case (limit, rankLikeFunction) if limit <= conf.windowGroupLimitThreshold &&
                 child.maxRows.forall(_ > limit) =>
                 if (limit > 0) {
                   val newFilterChild = if (rankLikeFunction.isInstanceOf[RowNumber] &&
                     partitionSpec.isEmpty && limit < conf.topKSortFallbackThreshold) {
                     // Top n (Limit + Sort) have better performance than WindowGroupLimit if the
                     // window function is RowNumber and Window partitionSpec is empty.
                     Limit(Literal(limit), window)
   ```



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


Re: [PR] [SPARK-46228][SQL] Insert window group limit node for cumulative aggregation with limit [spark]

Posted by "zml1206 (via GitHub)" <gi...@apache.org>.
zml1206 commented on code in PR #44145:
URL: https://github.com/apache/spark/pull/44145#discussion_r1417465021


##########
sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/optimizer/InferWindowGroupLimit.scala:
##########
@@ -68,10 +72,57 @@ object InferWindowGroupLimit extends Rule[LogicalPlan] with PredicateHelper {
     case _ => false
   }
 
+  private def limitSupport(limit: Int, window: Window): Boolean =
+    limit <= conf.windowGroupLimitThreshold && window.child.maxRows.forall(_ > limit) &&
+      !window.child.isInstanceOf[WindowGroupLimit] &&
+      window.windowExpressions.forall(isExpandingWindow) &&
+      window.orderSpec.exists(!_.foldable) &&
+      // LimitPushDownThroughWindow have better performance than WindowGroupLimit if the
+      // window function is Rank, DenseRank and RowNumber, and Window partitionSpec is empty.
+      !(window.partitionSpec.isEmpty && supportsPushdownThroughWindow(window.windowExpressions) &&

Review Comment:
   
   No, after `LimitPushDownThroughWindow` is pushed down limit, the new plan will still hit `InferWindowGroupLimit`.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


Re: [PR] [SPARK-46228][SQL] Insert window group limit node for cumulative aggregation with limit [spark]

Posted by "zml1206 (via GitHub)" <gi...@apache.org>.
zml1206 commented on code in PR #44145:
URL: https://github.com/apache/spark/pull/44145#discussion_r1418213459


##########
sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/optimizer/InferWindowGroupLimit.scala:
##########
@@ -68,10 +71,56 @@ object InferWindowGroupLimit extends Rule[LogicalPlan] with PredicateHelper {
     case _ => false
   }
 
+  /**
+   * All window expressions should not have SizeBasedWindowFunction, all lower/upper of
+   * specifiedWindowFrame is UnboundedPreceding/CurrentRow, and window orderSpec is not foldable,
+   * so that we can safely do the early stop.
+   */
+  private def limitSupport(limit: Int, window: Window): Boolean =
+    limit <= conf.windowGroupLimitThreshold && window.child.maxRows.forall(_ > limit) &&
+      !window.child.isInstanceOf[WindowGroupLimit] &&
+      window.orderSpec.exists(!_.foldable) &&
+      !LimitPushDownThroughWindow.supportsPushdownThroughWindow(window.windowExpressions) &&
+      window.windowExpressions.forall {
+        case Alias(WindowExpression(windowFunction, WindowSpecDefinition(_, _,
+        SpecifiedWindowFrame(_, UnboundedPreceding, CurrentRow))), _)
+          if !windowFunction.isInstanceOf[SizeBasedWindowFunction] => true
+        case _ => false
+      }
+
+  private def isRowFrame(windowExpression: NamedExpression): Boolean = windowExpression match {
+    case Alias(WindowExpression(_, WindowSpecDefinition(_, _,
+    SpecifiedWindowFrame(RowFrame, UnboundedPreceding, CurrentRow))), _) => true
+    case _ => false
+  }
+
+  private def rankLikeFunction(windowExpressions: Seq[NamedExpression]): Expression =
+    // If windowExpressions all are RowFrame, choose SimpleLimitIterator,
+    // else RankLimitIterator to obtain enough rows for ensure data accuracy.
+    if (windowExpressions.forall(isRowFrame)) {
+      new RowNumber
+    } else {
+      new Rank

Review Comment:
   requires all the frame is `RowFrame`. As long as `Rangeframe` exists, press `Rangeframe`. `Rangeframe` requires more data.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


Re: [PR] [SPARK-46228][SQL] Insert window group limit node for cumulative aggregation with limit [spark]

Posted by "beliefer (via GitHub)" <gi...@apache.org>.
beliefer commented on code in PR #44145:
URL: https://github.com/apache/spark/pull/44145#discussion_r1418264040


##########
sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/optimizer/InferWindowGroupLimit.scala:
##########
@@ -68,10 +72,57 @@ object InferWindowGroupLimit extends Rule[LogicalPlan] with PredicateHelper {
     case _ => false
   }
 
+  private def limitSupport(limit: Int, window: Window): Boolean =
+    limit <= conf.windowGroupLimitThreshold && window.child.maxRows.forall(_ > limit) &&
+      !window.child.isInstanceOf[WindowGroupLimit] &&
+      window.windowExpressions.forall(isExpandingWindow) &&
+      window.orderSpec.exists(!_.foldable) &&
+      // LimitPushDownThroughWindow have better performance than WindowGroupLimit if the
+      // window function is Rank, DenseRank and RowNumber, and Window partitionSpec is empty.
+      !(window.partitionSpec.isEmpty && supportsPushdownThroughWindow(window.windowExpressions) &&

Review Comment:
   `InferWindowGroupLimitSuite` not contains `LimitPushDownThroughWindow` before `InferWindowGroupLimit`.
   Could you test with `DataFrameWindowFunctionsSuite`?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


Re: [PR] [SPARK-46228][SQL] Insert window group limit node for limit outside of window [spark]

Posted by "zml1206 (via GitHub)" <gi...@apache.org>.
zml1206 commented on code in PR #44145:
URL: https://github.com/apache/spark/pull/44145#discussion_r1421267218


##########
sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/optimizer/InferWindowGroupLimit.scala:
##########
@@ -68,10 +72,55 @@ object InferWindowGroupLimit extends Rule[LogicalPlan] with PredicateHelper {
     case _ => false
   }
 
+  /**
+   * Whether support inferring WindowGroupLimit from Limit outside of Window. Check if:
+   * 1. The window orderSpec exists unfoldable one or all window expressions should use the same
+   *  expanding window.
+   * 2. All window expressions should not have SizeBasedWindowFunction.
+   * 3. The Limit could not be pushed down through Window.
+   */
+  private def limitSupport(limit: Int, window: Window): Boolean =
+    limit <= conf.windowGroupLimitThreshold && window.child.maxRows.forall(_ > limit) &&
+      !window.child.isInstanceOf[WindowGroupLimit] &&
+      (window.orderSpec.exists(!_.child.foldable) ||
+        window.windowExpressions.forall(isExpandingWindow)) &&
+      window.windowExpressions.forall {
+        case Alias(WindowExpression(windowFunction, WindowSpecDefinition(_, _,
+        SpecifiedWindowFrame(_, UnboundedPreceding, CurrentRow))), _)
+          if !windowFunction.isInstanceOf[SizeBasedWindowFunction] &&

Review Comment:
   Limit outside of window can be early pruned by `WindowGroupLimit`, the following three conditions must be met:
   1.Window frame: first lower/upper must be 'UnboundedPreceding'/'CurrentRow', secondly window orderSpec exists unfoldable one or all window expressions are `RowFrame`. Because when orderSpec is foldable and window expressions is `RangeFrame`, aggregation calculation requires the use of all rows in the window group.
   2.Window function: All window expressions should not have `SizeBasedWindowFunction`, for example `CumeDist`, `NTile`, `PercentRank`. Because aggregation calculation of `SizeBasedWindowFunction` requires the use of all rows in the window group.
   3.The Limit could not be pushed down through Window. Because `LimitPushDownThroughWindow` have better performance than `WindowGroupLimit`.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


Re: [PR] [SPARK-46228][SQL] Insert window group limit node for cumulative aggregation with limit [spark]

Posted by "zml1206 (via GitHub)" <gi...@apache.org>.
zml1206 commented on code in PR #44145:
URL: https://github.com/apache/spark/pull/44145#discussion_r1416577802


##########
sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/optimizer/InferWindowGroupLimit.scala:
##########
@@ -17,21 +17,24 @@
 
 package org.apache.spark.sql.catalyst.optimizer
 
-import org.apache.spark.sql.catalyst.expressions.{Alias, Attribute, CurrentRow, DenseRank, EqualTo, Expression, GreaterThan, GreaterThanOrEqual, IntegerLiteral, LessThan, LessThanOrEqual, Literal, NamedExpression, PredicateHelper, Rank, RowFrame, RowNumber, SpecifiedWindowFrame, UnboundedPreceding, WindowExpression, WindowSpecDefinition}
-import org.apache.spark.sql.catalyst.plans.logical.{Filter, Limit, LocalRelation, LogicalPlan, Window, WindowGroupLimit}
+import org.apache.spark.sql.catalyst.expressions.{Alias, Attribute, CurrentRow, DenseRank, EqualTo, Expression, GreaterThan, GreaterThanOrEqual, IntegerLiteral, LessThan, LessThanOrEqual, Literal, NamedExpression, PredicateHelper, Rank, RowNumber, SizeBasedWindowFunction, SpecifiedWindowFrame, UnboundedPreceding, WindowExpression, WindowSpecDefinition}
+import org.apache.spark.sql.catalyst.plans.logical._
 import org.apache.spark.sql.catalyst.rules.Rule
-import org.apache.spark.sql.catalyst.trees.TreePattern.{FILTER, WINDOW}
+import org.apache.spark.sql.catalyst.trees.TreePattern.{FILTER, LIMIT, WINDOW}
 
 /**
  * Inserts a `WindowGroupLimit` below `Window` if the `Window` has rank-like functions
- * and the function results are further filtered by limit-like predicates. Example query:
+ * and the function results are further filtered by limit-like predicates or cumulative
+ * aggregation with limit excludes `SizeBasedWindowFunction`. Example query:
  * {{{
  *   SELECT *, ROW_NUMBER() OVER(PARTITION BY k ORDER BY a) AS rn FROM Tab1 WHERE rn = 5
  *   SELECT *, ROW_NUMBER() OVER(PARTITION BY k ORDER BY a) AS rn FROM Tab1 WHERE 5 = rn
  *   SELECT *, ROW_NUMBER() OVER(PARTITION BY k ORDER BY a) AS rn FROM Tab1 WHERE rn < 5
  *   SELECT *, ROW_NUMBER() OVER(PARTITION BY k ORDER BY a) AS rn FROM Tab1 WHERE 5 > rn
  *   SELECT *, ROW_NUMBER() OVER(PARTITION BY k ORDER BY a) AS rn FROM Tab1 WHERE rn <= 5
  *   SELECT *, ROW_NUMBER() OVER(PARTITION BY k ORDER BY a) AS rn FROM Tab1 WHERE 5 >= rn
+ *   SELECT *, ROW_NUMBER() OVER(PARTITION BY k ORDER BY a) AS rn FROM Tab1 limit 5

Review Comment:
   Thanks. First, the effect of this case is the same as top-k, such as the test results of q67-changed above. This is beneficial, assuming the per-group data size is large. Secondly, use `RankLimitIterator` will slightly affect the efficiency of `ROW_NUMBER()`. Using `LimitPushDownThroughWindow` if `LimitPushDownThroughWindow` support this case,  it should have better performance. 
   If my understanding is correct, my next PR will make `LimitPushDownThroughWindow` support Top n (Limit + Sort) with partitionSpec is not empy.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


Re: [PR] [SPARK-46228][SQL] Insert window group limit node for limit outside of window [spark]

Posted by "zml1206 (via GitHub)" <gi...@apache.org>.
zml1206 commented on code in PR #44145:
URL: https://github.com/apache/spark/pull/44145#discussion_r1476978831


##########
sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/optimizer/InferWindowGroupLimit.scala:
##########
@@ -69,10 +73,55 @@ object InferWindowGroupLimit extends Rule[LogicalPlan] with PredicateHelper {
     case _ => false
   }
 
+  /**
+   * Whether support inferring WindowGroupLimit from Limit outside of Window. Check if:
+   * 1. The window orderSpec exists unfoldable one or all window expressions should use the same
+   *  expanding window.
+   * 2. All window expressions should not have SizeBasedWindowFunction.
+   * 3. The Limit could not be pushed down through Window.
+   */
+  private def limitSupport(limit: Int, window: Window): Boolean =
+    limit <= conf.windowGroupLimitThreshold && window.child.maxRows.forall(_ > limit) &&
+      !window.child.isInstanceOf[WindowGroupLimit] &&
+      (window.orderSpec.exists(!_.child.foldable) ||
+        window.windowExpressions.forall(isExpandingWindow)) &&
+      window.windowExpressions.forall {
+        case Alias(WindowExpression(windowFunction, WindowSpecDefinition(_, _,
+        SpecifiedWindowFrame(_, UnboundedPreceding, CurrentRow))), _)
+          if !windowFunction.isInstanceOf[SizeBasedWindowFunction] &&

Review Comment:
   There is a difference, it can also be rangeframe here.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


Re: [PR] [SPARK-46228][SQL] Insert window group limit node for cumulative aggregation with limit [spark]

Posted by "beliefer (via GitHub)" <gi...@apache.org>.
beliefer commented on code in PR #44145:
URL: https://github.com/apache/spark/pull/44145#discussion_r1416530396


##########
sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/optimizer/InferWindowGroupLimit.scala:
##########
@@ -17,21 +17,24 @@
 
 package org.apache.spark.sql.catalyst.optimizer
 
-import org.apache.spark.sql.catalyst.expressions.{Alias, Attribute, CurrentRow, DenseRank, EqualTo, Expression, GreaterThan, GreaterThanOrEqual, IntegerLiteral, LessThan, LessThanOrEqual, Literal, NamedExpression, PredicateHelper, Rank, RowFrame, RowNumber, SpecifiedWindowFrame, UnboundedPreceding, WindowExpression, WindowSpecDefinition}
-import org.apache.spark.sql.catalyst.plans.logical.{Filter, Limit, LocalRelation, LogicalPlan, Window, WindowGroupLimit}
+import org.apache.spark.sql.catalyst.expressions.{Alias, Attribute, CurrentRow, DenseRank, EqualTo, Expression, GreaterThan, GreaterThanOrEqual, IntegerLiteral, LessThan, LessThanOrEqual, Literal, NamedExpression, PredicateHelper, Rank, RowNumber, SizeBasedWindowFunction, SpecifiedWindowFrame, UnboundedPreceding, WindowExpression, WindowSpecDefinition}
+import org.apache.spark.sql.catalyst.plans.logical._
 import org.apache.spark.sql.catalyst.rules.Rule
-import org.apache.spark.sql.catalyst.trees.TreePattern.{FILTER, WINDOW}
+import org.apache.spark.sql.catalyst.trees.TreePattern.{FILTER, LIMIT, WINDOW}
 
 /**
  * Inserts a `WindowGroupLimit` below `Window` if the `Window` has rank-like functions
- * and the function results are further filtered by limit-like predicates. Example query:
+ * and the function results are further filtered by limit-like predicates or cumulative
+ * aggregation with limit excludes `SizeBasedWindowFunction`. Example query:
  * {{{
  *   SELECT *, ROW_NUMBER() OVER(PARTITION BY k ORDER BY a) AS rn FROM Tab1 WHERE rn = 5
  *   SELECT *, ROW_NUMBER() OVER(PARTITION BY k ORDER BY a) AS rn FROM Tab1 WHERE 5 = rn
  *   SELECT *, ROW_NUMBER() OVER(PARTITION BY k ORDER BY a) AS rn FROM Tab1 WHERE rn < 5
  *   SELECT *, ROW_NUMBER() OVER(PARTITION BY k ORDER BY a) AS rn FROM Tab1 WHERE 5 > rn
  *   SELECT *, ROW_NUMBER() OVER(PARTITION BY k ORDER BY a) AS rn FROM Tab1 WHERE rn <= 5
  *   SELECT *, ROW_NUMBER() OVER(PARTITION BY k ORDER BY a) AS rn FROM Tab1 WHERE 5 >= rn
+ *   SELECT *, ROW_NUMBER() OVER(PARTITION BY k ORDER BY a) AS rn FROM Tab1 limit 5

Review Comment:
   I guess using WindowGroupLimit for this case works bad.
   Maybe I missing some thing, could you explain it in detail?



##########
sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/optimizer/InferWindowGroupLimit.scala:
##########
@@ -17,21 +17,24 @@
 
 package org.apache.spark.sql.catalyst.optimizer
 
-import org.apache.spark.sql.catalyst.expressions.{Alias, Attribute, CurrentRow, DenseRank, EqualTo, Expression, GreaterThan, GreaterThanOrEqual, IntegerLiteral, LessThan, LessThanOrEqual, Literal, NamedExpression, PredicateHelper, Rank, RowFrame, RowNumber, SpecifiedWindowFrame, UnboundedPreceding, WindowExpression, WindowSpecDefinition}
-import org.apache.spark.sql.catalyst.plans.logical.{Filter, Limit, LocalRelation, LogicalPlan, Window, WindowGroupLimit}
+import org.apache.spark.sql.catalyst.expressions.{Alias, Attribute, CurrentRow, DenseRank, EqualTo, Expression, GreaterThan, GreaterThanOrEqual, IntegerLiteral, LessThan, LessThanOrEqual, Literal, NamedExpression, PredicateHelper, Rank, RowNumber, SizeBasedWindowFunction, SpecifiedWindowFrame, UnboundedPreceding, WindowExpression, WindowSpecDefinition}
+import org.apache.spark.sql.catalyst.plans.logical._
 import org.apache.spark.sql.catalyst.rules.Rule
-import org.apache.spark.sql.catalyst.trees.TreePattern.{FILTER, WINDOW}
+import org.apache.spark.sql.catalyst.trees.TreePattern.{FILTER, LIMIT, WINDOW}
 
 /**
  * Inserts a `WindowGroupLimit` below `Window` if the `Window` has rank-like functions
- * and the function results are further filtered by limit-like predicates. Example query:
+ * and the function results are further filtered by limit-like predicates or cumulative
+ * aggregation with limit excludes `SizeBasedWindowFunction`. Example query:
  * {{{
  *   SELECT *, ROW_NUMBER() OVER(PARTITION BY k ORDER BY a) AS rn FROM Tab1 WHERE rn = 5
  *   SELECT *, ROW_NUMBER() OVER(PARTITION BY k ORDER BY a) AS rn FROM Tab1 WHERE 5 = rn
  *   SELECT *, ROW_NUMBER() OVER(PARTITION BY k ORDER BY a) AS rn FROM Tab1 WHERE rn < 5
  *   SELECT *, ROW_NUMBER() OVER(PARTITION BY k ORDER BY a) AS rn FROM Tab1 WHERE 5 > rn
  *   SELECT *, ROW_NUMBER() OVER(PARTITION BY k ORDER BY a) AS rn FROM Tab1 WHERE rn <= 5
  *   SELECT *, ROW_NUMBER() OVER(PARTITION BY k ORDER BY a) AS rn FROM Tab1 WHERE 5 >= rn
+ *   SELECT *, ROW_NUMBER() OVER(PARTITION BY k ORDER BY a) AS rn FROM Tab1 limit 5

Review Comment:
   I guess using `WindowGroupLimit` for this case works bad.
   Maybe I missing some thing, could you explain it in detail?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


Re: [PR] [SPARK-46228][SQL] Insert window group limit node for cumulative aggregation with limit [spark]

Posted by "zml1206 (via GitHub)" <gi...@apache.org>.
zml1206 commented on code in PR #44145:
URL: https://github.com/apache/spark/pull/44145#discussion_r1418213459


##########
sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/optimizer/InferWindowGroupLimit.scala:
##########
@@ -68,10 +71,56 @@ object InferWindowGroupLimit extends Rule[LogicalPlan] with PredicateHelper {
     case _ => false
   }
 
+  /**
+   * All window expressions should not have SizeBasedWindowFunction, all lower/upper of
+   * specifiedWindowFrame is UnboundedPreceding/CurrentRow, and window orderSpec is not foldable,
+   * so that we can safely do the early stop.
+   */
+  private def limitSupport(limit: Int, window: Window): Boolean =
+    limit <= conf.windowGroupLimitThreshold && window.child.maxRows.forall(_ > limit) &&
+      !window.child.isInstanceOf[WindowGroupLimit] &&
+      window.orderSpec.exists(!_.foldable) &&
+      !LimitPushDownThroughWindow.supportsPushdownThroughWindow(window.windowExpressions) &&
+      window.windowExpressions.forall {
+        case Alias(WindowExpression(windowFunction, WindowSpecDefinition(_, _,
+        SpecifiedWindowFrame(_, UnboundedPreceding, CurrentRow))), _)
+          if !windowFunction.isInstanceOf[SizeBasedWindowFunction] => true
+        case _ => false
+      }
+
+  private def isRowFrame(windowExpression: NamedExpression): Boolean = windowExpression match {
+    case Alias(WindowExpression(_, WindowSpecDefinition(_, _,
+    SpecifiedWindowFrame(RowFrame, UnboundedPreceding, CurrentRow))), _) => true
+    case _ => false
+  }
+
+  private def rankLikeFunction(windowExpressions: Seq[NamedExpression]): Expression =
+    // If windowExpressions all are RowFrame, choose SimpleLimitIterator,
+    // else RankLimitIterator to obtain enough rows for ensure data accuracy.
+    if (windowExpressions.forall(isRowFrame)) {
+      new RowNumber
+    } else {
+      new Rank

Review Comment:
   requires all the frame is `RowFrame`. As long as `RowFrame` exists, press `Rangeframe`. `Rangeframe` requires more data.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


Re: [PR] [SPARK-46228][SQL] Insert window group limit node for cumulative aggregation with limit [spark]

Posted by "beliefer (via GitHub)" <gi...@apache.org>.
beliefer commented on code in PR #44145:
URL: https://github.com/apache/spark/pull/44145#discussion_r1418887819


##########
sql/core/src/test/scala/org/apache/spark/sql/DataFrameWindowFunctionsSuite.scala:
##########
@@ -1276,6 +1276,136 @@ class DataFrameWindowFunctionsSuite extends QueryTest
     )
   }
 
+  test("SPARK-46228: Insert window group limit node for cumulative aggregation with limit") {
+
+    val nullStr: String = null
+    val df = Seq(
+      ("a", 0, "c"),
+      ("a", 1, "x"),
+      ("a", 2, "y"),
+      ("a", 3, "z"),
+      ("a", 4, ""),
+      ("a", 4, ""),
+      ("b", 1, "h"),
+      ("b", 1, "n"),
+      ("c", 1, "z"),
+      ("c", 1, "a"),
+      ("c", 2, nullStr)).toDF("key", "value", "order")
+    val window = Window.partitionBy($"key").orderBy($"order".asc_nulls_first)
+    val window2 = Window.partitionBy($"key").orderBy($"order".desc_nulls_first)
+    val window3 = Window.orderBy($"order".asc_nulls_first)
+
+    Seq(true, false).foreach { enableEvaluator =>
+      withSQLConf(SQLConf.USE_PARTITION_EVALUATOR.key -> enableEvaluator.toString) {
+        Seq(-1, 100).foreach { threshold =>
+          withSQLConf(SQLConf.WINDOW_GROUP_LIMIT_THRESHOLD.key -> threshold.toString) {
+            // RowFrame
+            val existWindowGroupLimitRowFrame =
+              df.withColumn("sum_value", sum("value").over(window))
+                .limit(1)
+                .queryExecution.optimizedPlan.exists {
+                case _: WindowGroupLimit => true
+                case _ => false
+              }

Review Comment:
   Please move the test about plan to `InferWindowGroupLimitSuite`



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


Re: [PR] [SPARK-46228][SQL] Insert window group limit node for cumulative aggregation with limit [spark]

Posted by "beliefer (via GitHub)" <gi...@apache.org>.
beliefer commented on code in PR #44145:
URL: https://github.com/apache/spark/pull/44145#discussion_r1419834990


##########
sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/optimizer/InferWindowGroupLimit.scala:
##########
@@ -68,10 +71,56 @@ object InferWindowGroupLimit extends Rule[LogicalPlan] with PredicateHelper {
     case _ => false
   }
 
+  /**
+   * All window expressions should not have SizeBasedWindowFunction, all lower/upper of
+   * specifiedWindowFrame is UnboundedPreceding/CurrentRow, and window orderSpec is not foldable,
+   * so that we can safely do the early stop.
+   */
+  private def limitSupport(limit: Int, window: Window): Boolean =
+    limit <= conf.windowGroupLimitThreshold && window.child.maxRows.forall(_ > limit) &&
+      !window.child.isInstanceOf[WindowGroupLimit] &&
+      window.orderSpec.exists(!_.foldable) &&
+      !LimitPushDownThroughWindow.supportsPushdownThroughWindow(window.windowExpressions) &&
+      window.windowExpressions.forall {
+        case Alias(WindowExpression(windowFunction, WindowSpecDefinition(_, _,
+        SpecifiedWindowFrame(_, UnboundedPreceding, CurrentRow))), _)
+          if !windowFunction.isInstanceOf[SizeBasedWindowFunction] => true
+        case _ => false
+      }
+
+  private def isRowFrame(windowExpression: NamedExpression): Boolean = windowExpression match {
+    case Alias(WindowExpression(_, WindowSpecDefinition(_, _,
+    SpecifiedWindowFrame(RowFrame, UnboundedPreceding, CurrentRow))), _) => true
+    case _ => false
+  }
+
+  private def rankLikeFunction(windowExpressions: Seq[NamedExpression]): Expression =
+    // If windowExpressions all are RowFrame, choose SimpleLimitIterator,
+    // else RankLimitIterator to obtain enough rows for ensure data accuracy.
+    if (windowExpressions.forall(isRowFrame)) {
+      new RowNumber
+    } else {
+      new Rank

Review Comment:
   Yes. `sum_value2` will be correct is select `RankLimitIterator`.
   But what I want know is the result of `sum_value1`.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


Re: [PR] [SPARK-46228][SQL] Insert window group limit node for cumulative aggregation with limit [spark]

Posted by "zml1206 (via GitHub)" <gi...@apache.org>.
zml1206 commented on code in PR #44145:
URL: https://github.com/apache/spark/pull/44145#discussion_r1419851792


##########
sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/optimizer/InferWindowGroupLimit.scala:
##########
@@ -68,10 +71,56 @@ object InferWindowGroupLimit extends Rule[LogicalPlan] with PredicateHelper {
     case _ => false
   }
 
+  /**
+   * All window expressions should not have SizeBasedWindowFunction, all lower/upper of
+   * specifiedWindowFrame is UnboundedPreceding/CurrentRow, and window orderSpec is not foldable,
+   * so that we can safely do the early stop.
+   */
+  private def limitSupport(limit: Int, window: Window): Boolean =
+    limit <= conf.windowGroupLimitThreshold && window.child.maxRows.forall(_ > limit) &&
+      !window.child.isInstanceOf[WindowGroupLimit] &&
+      window.orderSpec.exists(!_.foldable) &&
+      !LimitPushDownThroughWindow.supportsPushdownThroughWindow(window.windowExpressions) &&
+      window.windowExpressions.forall {
+        case Alias(WindowExpression(windowFunction, WindowSpecDefinition(_, _,
+        SpecifiedWindowFrame(_, UnboundedPreceding, CurrentRow))), _)
+          if !windowFunction.isInstanceOf[SizeBasedWindowFunction] => true
+        case _ => false
+      }
+
+  private def isRowFrame(windowExpression: NamedExpression): Boolean = windowExpression match {
+    case Alias(WindowExpression(_, WindowSpecDefinition(_, _,
+    SpecifiedWindowFrame(RowFrame, UnboundedPreceding, CurrentRow))), _) => true
+    case _ => false
+  }
+
+  private def rankLikeFunction(windowExpressions: Seq[NamedExpression]): Expression =
+    // If windowExpressions all are RowFrame, choose SimpleLimitIterator,
+    // else RankLimitIterator to obtain enough rows for ensure data accuracy.
+    if (windowExpressions.forall(isRowFrame)) {
+      new RowNumber
+    } else {
+      new Rank

Review Comment:
   OK, done.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


Re: [PR] [SPARK-46228][SQL] Insert window group limit node for cumulative aggregation with limit [spark]

Posted by "beliefer (via GitHub)" <gi...@apache.org>.
beliefer commented on code in PR #44145:
URL: https://github.com/apache/spark/pull/44145#discussion_r1420347586


##########
sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/optimizer/InferWindowGroupLimit.scala:
##########
@@ -68,10 +72,58 @@ object InferWindowGroupLimit extends Rule[LogicalPlan] with PredicateHelper {
     case _ => false
   }
 
+  /**
+   * All window expressions should not have SizeBasedWindowFunction, all lower/upper of
+   * specifiedWindowFrame is UnboundedPreceding/CurrentRow, and window orderSpec is not foldable,
+   * so that we can safely do the early stop.
+   */
+  private def limitSupport(limit: Int, window: Window): Boolean =
+    limit <= conf.windowGroupLimitThreshold && window.child.maxRows.forall(_ > limit) &&
+      !window.child.isInstanceOf[WindowGroupLimit] &&
+      (window.orderSpec.exists(!_.child.foldable) || window.windowExpressions.forall(isRowFrame)) &&
+      window.windowExpressions.forall {
+        case Alias(WindowExpression(windowFunction, WindowSpecDefinition(_, _,
+        SpecifiedWindowFrame(_, UnboundedPreceding, CurrentRow))), _)
+          if !windowFunction.isInstanceOf[SizeBasedWindowFunction] &&
+            // LimitPushDownThroughWindow have better performance than WindowGroupLimit if the
+            // window function is rank-like and Window partitionSpec is empty.
+            (!support(windowFunction) || window.partitionSpec.nonEmpty) => true
+        case _ => false
+      }
+
+  private def isRowFrame(windowExpression: NamedExpression): Boolean = windowExpression match {
+    case Alias(WindowExpression(_, WindowSpecDefinition(_, _,
+    SpecifiedWindowFrame(RowFrame, UnboundedPreceding, CurrentRow))), _) => true
+    case _ => false
+  }

Review Comment:
   The code of `isRowFrame` is the same as `isExpandingWindow`. Please remove this one.



##########
sql/core/src/test/scala/org/apache/spark/sql/DataFrameWindowFunctionsSuite.scala:
##########
@@ -1276,6 +1276,111 @@ class DataFrameWindowFunctionsSuite extends QueryTest
     )
   }
 
+  test("SPARK-46228: Insert window group limit node for cumulative aggregation with limit") {
+
+    val nullStr: String = null
+    val df = Seq(
+      ("a", 0, "c"),
+      ("a", 1, "x"),
+      ("a", 2, "y"),
+      ("a", 3, "z"),
+      ("a", 4, ""),
+      ("a", 4, ""),
+      ("b", 1, "h"),
+      ("b", 1, "n"),
+      ("c", 1, "z"),
+      ("c", 1, "a"),
+      ("c", 2, nullStr)).toDF("key", "value", "order")
+    val window = Window.partitionBy($"key").orderBy($"order".asc_nulls_first)
+    val window2 = Window.partitionBy($"key").orderBy($"order".desc_nulls_first)
+    val window3 = Window.orderBy($"order".asc_nulls_first)
+
+    Seq(true, false).foreach { enableEvaluator =>
+      withSQLConf(SQLConf.USE_PARTITION_EVALUATOR.key -> enableEvaluator.toString) {

Review Comment:
   Other test cases covers this config. Please remove it.



##########
sql/core/src/test/scala/org/apache/spark/sql/DataFrameWindowFunctionsSuite.scala:
##########
@@ -1276,6 +1276,147 @@ class DataFrameWindowFunctionsSuite extends QueryTest
     )
   }
 
+  test("SPARK-46228: Insert window group limit node for cumulative aggregation with limit") {
+
+    val nullStr: String = null
+    val df = Seq(
+      ("a", 0, "c"),
+      ("a", 1, "x"),
+      ("a", 2, "y"),
+      ("a", 3, "z"),
+      ("a", 4, ""),
+      ("a", 4, ""),
+      ("b", 1, "h"),
+      ("b", 1, "n"),
+      ("c", 1, "z"),
+      ("c", 1, "a"),
+      ("c", 2, nullStr)).toDF("key", "value", "order")
+    val window = Window.partitionBy($"key").orderBy($"order".asc_nulls_first)
+    val window2 = Window.partitionBy($"key").orderBy($"order".desc_nulls_first)
+    val window3 = Window.orderBy($"order".asc_nulls_first)
+
+    Seq(true, false).foreach { enableEvaluator =>
+      withSQLConf(SQLConf.USE_PARTITION_EVALUATOR.key -> enableEvaluator.toString) {
+        Seq(-1, 100).foreach { threshold =>
+          withSQLConf(SQLConf.WINDOW_GROUP_LIMIT_THRESHOLD.key -> threshold.toString) {
+            // RowFrame
+            val existWindowGroupLimitRowFrame =
+              df.withColumn("sum_value", sum("value").over(window))
+                .limit(1)
+                .queryExecution.optimizedPlan.exists {
+                case _: WindowGroupLimit => true
+                case _ => false
+              }
+            if (threshold == -1) {
+              assert(!existWindowGroupLimitRowFrame)
+            } else {
+              assert(existWindowGroupLimitRowFrame)
+            }
+            checkAnswer(df.withColumn("rn", row_number().over(window)).limit(1),
+              Seq(
+                Row("a", 4, "", 1)
+              )
+            )
+            checkAnswer(df.withColumn("rn", rank().over(window2)).limit(7),
+              Seq(
+                Row("a", 0, "c", 4),
+                Row("a", 1, "x", 3),
+                Row("a", 2, "y", 2),
+                Row("a", 3, "z", 1),
+                Row("a", 4, "", 5),
+                Row("a", 4, "", 5),
+                Row("b", 1, "n", 1)
+              )
+            )
+            checkAnswer(df.withColumn("rn", dense_rank().over(window3)).limit(11),
+              Seq(
+                Row("a", 0, "c", 4),
+                Row("a", 1, "x", 7),
+                Row("a", 2, "y", 8),
+                Row("a", 3, "z", 9),
+                Row("a", 4, "", 2),
+                Row("a", 4, "", 2),
+                Row("b", 1, "h", 5),
+                Row("b", 1, "n", 6),
+                Row("c", 1, "a", 3),
+                Row("c", 1, "z", 9),
+                Row("c", 2, nullStr, 1)
+              )
+            )
+
+            // RangeFrame
+            val existWindowGroupLimitRangeFrame =
+              df.withColumn("sum_value", sum("value").over(window))
+              .limit(1)
+              .queryExecution.optimizedPlan.exists {
+                case _: WindowGroupLimit => true
+                case _ => false
+              }
+            if (threshold == -1) {
+              assert(!existWindowGroupLimitRangeFrame)
+            } else {
+              assert(existWindowGroupLimitRangeFrame)
+            }
+            checkAnswer(df.withColumn("sum_value", sum("value").over(window)).limit(1),
+              Seq(
+                Row("a", 4, "", 8)
+              )
+            )
+            checkAnswer(df.withColumn("sum_value", sum("value").over(window2)).limit(7),
+              Seq(
+                Row("a", 0, "c", 6),
+                Row("a", 1, "x", 6),
+                Row("a", 2, "y", 5),
+                Row("a", 3, "z", 3),
+                Row("a", 4, "", 14),
+                Row("a", 4, "", 14),
+                Row("b", 1, "n", 1)
+              )
+            )
+            checkAnswer(df.withColumn("sum_value", sum("value").over(window3)).limit(11),
+              Seq(
+                Row("a", 0, "c", 11),
+                Row("a", 1, "x", 14),
+                Row("a", 2, "y", 16),
+                Row("a", 3, "z", 20),
+                Row("a", 4, "", 10),
+                Row("a", 4, "", 10),
+                Row("b", 1, "h", 12),
+                Row("b", 1, "n", 13),
+                Row("c", 1, "a", 11),
+                Row("c", 1, "z", 20),
+                Row("c", 2, nullStr, 2)
+              )
+            )
+
+            // Both RowFrame and RangeFrame exist
+            checkAnswer(
+              df.withColumn("sum_value1", sum("value")
+                .over(window.rowsBetween(Window.unboundedPreceding, Window.currentRow)))
+              .withColumn("sum_value2", sum("value")
+                .over(window.rangeBetween(Window.unboundedPreceding, Window.currentRow))).limit(1),
+              Seq(
+                Row("a", 4, "", 4, 8)

Review Comment:
   Shall we add another group with limit 7 ?



##########
sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/optimizer/InferWindowGroupLimitSuite.scala:
##########
@@ -322,20 +323,122 @@ class InferWindowGroupLimitSuite extends PlanTest {
         testRelation
           .select(a, b, c,
             windowExpr(Rank(c :: Nil),
-              windowSpec(Nil, c.desc :: Nil, windowFrame)).as("rn"))
+              windowSpec(Nil, c.desc :: Nil, windowRowFrame)).as("rn"))
           .where(cond)
 
       val correctAnswer1 =
         testRelation
           .windowGroupLimit(Nil, c.desc :: Nil, Rank(c :: Nil), 2)
           .select(a, b, c,
             windowExpr(Rank(c :: Nil),
-              windowSpec(Nil, c.desc :: Nil, windowFrame)).as("rn"))
+              windowSpec(Nil, c.desc :: Nil, windowRowFrame)).as("rn"))
           .where(cond)
 
       comparePlans(
         Optimize.execute(originalQuery1.analyze),
         WithoutOptimize.execute(correctAnswer1.analyze))
     }
   }
+
+  test("Insert window group limit node for cumulative aggregation with limit") {
+    val originalQuery =
+      testRelation
+        .select(a, b, c,
+          windowExpr(sum(b),
+            windowSpec(a :: Nil, c.desc :: Nil, windowRangeFrame)).as("s"))
+        .limit(1)
+
+    val correctAnswer =
+      testRelation
+        .windowGroupLimit(a :: Nil, c.desc :: Nil, new Rank(), 1)
+        .select(a, b, c,
+          windowExpr(sum(b),
+            windowSpec(a :: Nil, c.desc :: Nil, windowRangeFrame)).as("s"))
+        .limit(1)
+    comparePlans(
+      Optimize.execute(originalQuery.analyze),
+      WithoutOptimize.execute(correctAnswer.analyze))
+  }
+
+  test("Insert window group limit node for partitionSpec is not empty and rank-like with limit") {
+    val originalQuery =
+      testRelation
+        .select(a, b, c,
+          windowExpr(RowNumber(),
+            windowSpec(a :: Nil, c.desc :: Nil, windowRowFrame)).as("rn"))
+        .limit(1)
+
+    val correctAnswer =
+      testRelation
+        .windowGroupLimit(a :: Nil, c.desc :: Nil, RowNumber(), 1)
+        .select(a, b, c,
+          windowExpr(RowNumber(),
+            windowSpec(a :: Nil, c.desc :: Nil, windowRowFrame)).as("rn"))
+        .limit(1)
+    comparePlans(
+      Optimize.execute(originalQuery.analyze),
+      WithoutOptimize.execute(correctAnswer.analyze))
+  }
+
+  test("Insert window group limit node for partitionSpec is not empty and all the orderSpec are " +
+    "foldable and all the window expressions are RowFrame with limit") {
+    val originalQuery =
+      testRelation
+        .select(a, b, c,
+          windowExpr(RowNumber(),
+            windowSpec(a :: Nil, Literal(1).desc :: Nil, windowRowFrame)).as("rn1"),
+          windowExpr(Rank(Literal(1) :: Nil),
+            windowSpec(a :: Nil, Literal(1).desc :: Nil, windowRowFrame)).as("rn2"))
+        .limit(1)
+
+    val correctAnswer =
+      testRelation
+        .windowGroupLimit(a :: Nil, Literal(1).desc :: Nil, RowNumber(), 1)
+        .select(a, b, c,
+          windowExpr(RowNumber(),
+            windowSpec(a :: Nil, Literal(1).desc :: Nil, windowRowFrame)).as("rn1"),
+          windowExpr(new Rank(),
+            windowSpec(a :: Nil, Literal(1).desc :: Nil, windowRowFrame)).as("rn2"))
+        .limit(1)
+    comparePlans(
+      Optimize.execute(originalQuery.analyze),
+      WithoutOptimize.execute(correctAnswer.analyze))
+  }
+
+  test("Limit unsupported") {

Review Comment:
   Please make the test name clear.



##########
sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/optimizer/InferWindowGroupLimit.scala:
##########
@@ -68,10 +72,58 @@ object InferWindowGroupLimit extends Rule[LogicalPlan] with PredicateHelper {
     case _ => false
   }
 
+  /**
+   * All window expressions should not have SizeBasedWindowFunction, all lower/upper of
+   * specifiedWindowFrame is UnboundedPreceding/CurrentRow, and window orderSpec is not foldable,
+   * so that we can safely do the early stop.
+   */
+  private def limitSupport(limit: Int, window: Window): Boolean =

Review Comment:
   Please make comments clearer.
   ```
   /**
    * Whether support inferring WindowGroupLimit from Limit outside of Window. Check if:
    * 1. The window orderSpec exists unfoldable one or all window expressions should use the same expanding window.
    * 2. All window expressions should not have SizeBasedWindowFunction.
    * 3. The Limit could be pushed down through Window.
    */
   ```



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


Re: [PR] [SPARK-46228][SQL] Insert window group limit node for limit outside of window [spark]

Posted by "zml1206 (via GitHub)" <gi...@apache.org>.
zml1206 commented on code in PR #44145:
URL: https://github.com/apache/spark/pull/44145#discussion_r1420502309


##########
sql/core/src/test/scala/org/apache/spark/sql/DataFrameWindowFunctionsSuite.scala:
##########
@@ -1276,6 +1276,111 @@ class DataFrameWindowFunctionsSuite extends QueryTest
     )
   }
 
+  test("SPARK-46228: Insert window group limit node for cumulative aggregation with limit") {
+
+    val nullStr: String = null
+    val df = Seq(
+      ("a", 0, "c"),
+      ("a", 1, "x"),
+      ("a", 2, "y"),
+      ("a", 3, "z"),
+      ("a", 4, ""),
+      ("a", 4, ""),
+      ("b", 1, "h"),
+      ("b", 1, "n"),
+      ("c", 1, "z"),
+      ("c", 1, "a"),
+      ("c", 2, nullStr)).toDF("key", "value", "order")
+    val window = Window.partitionBy($"key").orderBy($"order".asc_nulls_first)
+    val window2 = Window.partitionBy($"key").orderBy($"order".desc_nulls_first)
+    val window3 = Window.orderBy($"order".asc_nulls_first)
+
+    Seq(true, false).foreach { enableEvaluator =>
+      withSQLConf(SQLConf.USE_PARTITION_EVALUATOR.key -> enableEvaluator.toString) {

Review Comment:
   Removed.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


Re: [PR] [SPARK-46228][SQL] Insert window group limit node for limit outside of window [spark]

Posted by "zml1206 (via GitHub)" <gi...@apache.org>.
zml1206 commented on code in PR #44145:
URL: https://github.com/apache/spark/pull/44145#discussion_r1420540736


##########
sql/core/src/test/scala/org/apache/spark/sql/DataFrameWindowFunctionsSuite.scala:
##########
@@ -1276,6 +1276,147 @@ class DataFrameWindowFunctionsSuite extends QueryTest
     )
   }
 
+  test("SPARK-46228: Insert window group limit node for cumulative aggregation with limit") {
+
+    val nullStr: String = null
+    val df = Seq(
+      ("a", 0, "c"),
+      ("a", 1, "x"),
+      ("a", 2, "y"),
+      ("a", 3, "z"),
+      ("a", 4, ""),
+      ("a", 4, ""),
+      ("b", 1, "h"),
+      ("b", 1, "n"),
+      ("c", 1, "z"),
+      ("c", 1, "a"),
+      ("c", 2, nullStr)).toDF("key", "value", "order")
+    val window = Window.partitionBy($"key").orderBy($"order".asc_nulls_first)
+    val window2 = Window.partitionBy($"key").orderBy($"order".desc_nulls_first)
+    val window3 = Window.orderBy($"order".asc_nulls_first)
+
+    Seq(true, false).foreach { enableEvaluator =>
+      withSQLConf(SQLConf.USE_PARTITION_EVALUATOR.key -> enableEvaluator.toString) {
+        Seq(-1, 100).foreach { threshold =>
+          withSQLConf(SQLConf.WINDOW_GROUP_LIMIT_THRESHOLD.key -> threshold.toString) {
+            // RowFrame
+            val existWindowGroupLimitRowFrame =
+              df.withColumn("sum_value", sum("value").over(window))
+                .limit(1)
+                .queryExecution.optimizedPlan.exists {
+                case _: WindowGroupLimit => true
+                case _ => false
+              }
+            if (threshold == -1) {
+              assert(!existWindowGroupLimitRowFrame)
+            } else {
+              assert(existWindowGroupLimitRowFrame)
+            }
+            checkAnswer(df.withColumn("rn", row_number().over(window)).limit(1),
+              Seq(
+                Row("a", 4, "", 1)
+              )
+            )
+            checkAnswer(df.withColumn("rn", rank().over(window2)).limit(7),
+              Seq(
+                Row("a", 0, "c", 4),
+                Row("a", 1, "x", 3),
+                Row("a", 2, "y", 2),
+                Row("a", 3, "z", 1),
+                Row("a", 4, "", 5),
+                Row("a", 4, "", 5),
+                Row("b", 1, "n", 1)
+              )
+            )
+            checkAnswer(df.withColumn("rn", dense_rank().over(window3)).limit(11),
+              Seq(
+                Row("a", 0, "c", 4),
+                Row("a", 1, "x", 7),
+                Row("a", 2, "y", 8),
+                Row("a", 3, "z", 9),
+                Row("a", 4, "", 2),
+                Row("a", 4, "", 2),
+                Row("b", 1, "h", 5),
+                Row("b", 1, "n", 6),
+                Row("c", 1, "a", 3),
+                Row("c", 1, "z", 9),
+                Row("c", 2, nullStr, 1)
+              )
+            )
+
+            // RangeFrame
+            val existWindowGroupLimitRangeFrame =
+              df.withColumn("sum_value", sum("value").over(window))
+              .limit(1)
+              .queryExecution.optimizedPlan.exists {
+                case _: WindowGroupLimit => true
+                case _ => false
+              }
+            if (threshold == -1) {
+              assert(!existWindowGroupLimitRangeFrame)
+            } else {
+              assert(existWindowGroupLimitRangeFrame)
+            }
+            checkAnswer(df.withColumn("sum_value", sum("value").over(window)).limit(1),
+              Seq(
+                Row("a", 4, "", 8)
+              )
+            )
+            checkAnswer(df.withColumn("sum_value", sum("value").over(window2)).limit(7),
+              Seq(
+                Row("a", 0, "c", 6),
+                Row("a", 1, "x", 6),
+                Row("a", 2, "y", 5),
+                Row("a", 3, "z", 3),
+                Row("a", 4, "", 14),
+                Row("a", 4, "", 14),
+                Row("b", 1, "n", 1)
+              )
+            )
+            checkAnswer(df.withColumn("sum_value", sum("value").over(window3)).limit(11),
+              Seq(
+                Row("a", 0, "c", 11),
+                Row("a", 1, "x", 14),
+                Row("a", 2, "y", 16),
+                Row("a", 3, "z", 20),
+                Row("a", 4, "", 10),
+                Row("a", 4, "", 10),
+                Row("b", 1, "h", 12),
+                Row("b", 1, "n", 13),
+                Row("c", 1, "a", 11),
+                Row("c", 1, "z", 20),
+                Row("c", 2, nullStr, 2)
+              )
+            )
+
+            // Both RowFrame and RangeFrame exist
+            checkAnswer(
+              df.withColumn("sum_value1", sum("value")
+                .over(window.rowsBetween(Window.unboundedPreceding, Window.currentRow)))
+              .withColumn("sum_value2", sum("value")
+                .over(window.rangeBetween(Window.unboundedPreceding, Window.currentRow))).limit(1),
+              Seq(
+                Row("a", 4, "", 4, 8)

Review Comment:
   Added.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


Re: [PR] [SPARK-46228][SQL] Insert window group limit node for limit outside of window [spark]

Posted by "cloud-fan (via GitHub)" <gi...@apache.org>.
cloud-fan commented on code in PR #44145:
URL: https://github.com/apache/spark/pull/44145#discussion_r1421259543


##########
sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/optimizer/InferWindowGroupLimit.scala:
##########
@@ -68,10 +72,55 @@ object InferWindowGroupLimit extends Rule[LogicalPlan] with PredicateHelper {
     case _ => false
   }
 
+  /**
+   * Whether support inferring WindowGroupLimit from Limit outside of Window. Check if:
+   * 1. The window orderSpec exists unfoldable one or all window expressions should use the same
+   *  expanding window.
+   * 2. All window expressions should not have SizeBasedWindowFunction.
+   * 3. The Limit could not be pushed down through Window.
+   */
+  private def limitSupport(limit: Int, window: Window): Boolean =
+    limit <= conf.windowGroupLimitThreshold && window.child.maxRows.forall(_ > limit) &&
+      !window.child.isInstanceOf[WindowGroupLimit] &&
+      (window.orderSpec.exists(!_.child.foldable) ||
+        window.windowExpressions.forall(isExpandingWindow)) &&
+      window.windowExpressions.forall {
+        case Alias(WindowExpression(windowFunction, WindowSpecDefinition(_, _,
+        SpecifiedWindowFrame(_, UnboundedPreceding, CurrentRow))), _)
+          if !windowFunction.isInstanceOf[SizeBasedWindowFunction] &&

Review Comment:
   I think this is the key here. Do you have an intuitive explanation about what window function can be early pruned by `WindowGroupLimit`?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


Re: [PR] [SPARK-46228][SQL] Insert window group limit node for limit outside of window [spark]

Posted by "zml1206 (via GitHub)" <gi...@apache.org>.
zml1206 commented on PR #44145:
URL: https://github.com/apache/spark/pull/44145#issuecomment-1849247835

   
   I changed the description. Do you think it’s easier to understand this way? @cloud-fan 


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org