You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@spark.apache.org by we...@apache.org on 2023/03/15 12:01:44 UTC

[spark] branch master updated: [SPARK-42783][SQL] Infer window group limit should run as late as possible

This is an automated email from the ASF dual-hosted git repository.

wenchen pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/spark.git


The following commit(s) were added to refs/heads/master by this push:
     new d9d4b593533 [SPARK-42783][SQL] Infer window group limit should run as late as possible
d9d4b593533 is described below

commit d9d4b59353349e30135cdee718e18a129547ff41
Author: Jiaan Geng <be...@163.com>
AuthorDate: Wed Mar 15 20:01:21 2023 +0800

    [SPARK-42783][SQL] Infer window group limit should run as late as possible
    
    ### What changes were proposed in this pull request?
    https://github.com/apache/spark/pull/40142 have an unrelated change and is actually a regression. The change let infer window group limit runs early.
    
    Infer window group limit should run as late as possible, it is more safe.
    
    ### Why are the changes needed?
    Infer window group limit should run as late as possible.
    
    ### Does this PR introduce _any_ user-facing change?
    'No'.
    New feature.
    
    ### How was this patch tested?
    Exists test cases.
    
    Manually generate the micro benchmark.
    
    ```
    Benchmark Top-K:                                                 Best Time(ms)   Avg Time(ms)   Stdev(ms)    Rate(M/s)   Per Row(ns)   Relative
    -----------------------------------------------------------------------------------------------------------------------------------------------
    ROW_NUMBER (PARTITION: , WindowGroupLimit: false)                        10972          11739         765          1.9         523.2       1.0X
    ROW_NUMBER (PARTITION: , WindowGroupLimit: true)                          1700           1738          29         12.3          81.0       6.5X
    ROW_NUMBER (PARTITION: PARTITION BY b, WindowGroupLimit: false)          24317          24452         113          0.9        1159.5       0.5X
    ROW_NUMBER (PARTITION: PARTITION BY b, WindowGroupLimit: true)            6608           6965         348          3.2         315.1       1.7X
    RANK (PARTITION: , WindowGroupLimit: false)                              11549          11850         160          1.8         550.7       1.0X
    RANK (PARTITION: , WindowGroupLimit: true)                                2916           3211         267          7.2         139.1       3.8X
    RANK (PARTITION: PARTITION BY b, WindowGroupLimit: false)                24736          25951         565          0.8        1179.5       0.4X
    RANK (PARTITION: PARTITION BY b, WindowGroupLimit: true)                  6825           7256         497          3.1         325.5       1.6X
    DENSE_RANK (PARTITION: , WindowGroupLimit: false)                        11857          12513         652          1.8         565.4       0.9X
    DENSE_RANK (PARTITION: , WindowGroupLimit: true)                          2721           2937         113          7.7         129.8       4.0X
    DENSE_RANK (PARTITION: PARTITION BY b, WindowGroupLimit: false)          24976          25686         760          0.8        1191.0       0.4X
    DENSE_RANK (PARTITION: PARTITION BY b, WindowGroupLimit: true)            6568           6884         364          3.2         313.2       1.7X
    
    ```
    
    Closes #40410 from beliefer/SPARK-42783.
    
    Authored-by: Jiaan Geng <be...@163.com>
    Signed-off-by: Wenchen Fan <we...@databricks.com>
---
 .../apache/spark/sql/catalyst/optimizer/InferWindowGroupLimit.scala | 6 +++---
 .../scala/org/apache/spark/sql/catalyst/optimizer/Optimizer.scala   | 1 -
 .../main/scala/org/apache/spark/sql/execution/SparkOptimizer.scala  | 5 +++++
 3 files changed, 8 insertions(+), 4 deletions(-)

diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/optimizer/InferWindowGroupLimit.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/optimizer/InferWindowGroupLimit.scala
index 8e3dc662205..261be291463 100644
--- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/optimizer/InferWindowGroupLimit.scala
+++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/optimizer/InferWindowGroupLimit.scala
@@ -84,11 +84,11 @@ object InferWindowGroupLimit extends Rule[LogicalPlan] with PredicateHelper {
           }
           // Pick a rank-like function with the smallest limit
           selectedLimits.minBy(_._1) match {
-            case (limit, rankLikeFunction) if limit <= conf.windowGroupLimitThreshold =>
+            case (limit, rankLikeFunction) if limit <= conf.windowGroupLimitThreshold &&
+              child.maxRows.forall(_ > limit) =>
               if (limit > 0) {
                 val newFilterChild = if (rankLikeFunction.isInstanceOf[RowNumber] &&
-                  partitionSpec.isEmpty && child.maxRows.forall(_ > limit) &&
-                  limit < conf.topKSortFallbackThreshold) {
+                  partitionSpec.isEmpty && limit < conf.topKSortFallbackThreshold) {
                   // Top n (Limit + Sort) have better performance than WindowGroupLimit if the
                   // window function is RowNumber and Window partitionSpec is empty.
                   Limit(Literal(limit), window)
diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/optimizer/Optimizer.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/optimizer/Optimizer.scala
index ca55a281605..a0d49c29470 100644
--- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/optimizer/Optimizer.scala
+++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/optimizer/Optimizer.scala
@@ -130,7 +130,6 @@ abstract class Optimizer(catalogManager: CatalogManager)
     val operatorOptimizationBatch: Seq[Batch] = {
       Batch("Operator Optimization before Inferring Filters", fixedPoint,
         operatorOptimizationRuleSet: _*) ::
-      Batch("Infer window group limit", Once, InferWindowGroupLimit) ::
       Batch("Infer Filters", Once,
         InferFiltersFromGenerate,
         InferFiltersFromConstraints) ::
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/SparkOptimizer.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/SparkOptimizer.scala
index 8c420838ca2..f05fe9d60fb 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/execution/SparkOptimizer.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/SparkOptimizer.scala
@@ -84,6 +84,11 @@ class SparkOptimizer(
       PushPredicateThroughNonJoin,
       PushProjectionThroughLimit,
       RemoveNoopOperators) :+
+    Batch("Infer window group limit", Once,
+      InferWindowGroupLimit,
+      LimitPushDown,
+      LimitPushDownThroughWindow,
+      EliminateLimits) :+
     Batch("User Provided Optimizers", fixedPoint, experimentalMethods.extraOptimizations: _*) :+
     Batch("Replace CTE with Repartition", Once, ReplaceCTERefWithRepartition)
 


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@spark.apache.org
For additional commands, e-mail: commits-help@spark.apache.org