You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@spark.apache.org by we...@apache.org on 2023/03/15 12:01:44 UTC
[spark] branch master updated: [SPARK-42783][SQL] Infer window group limit should run as late as possible
This is an automated email from the ASF dual-hosted git repository.
wenchen pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/spark.git
The following commit(s) were added to refs/heads/master by this push:
new d9d4b593533 [SPARK-42783][SQL] Infer window group limit should run as late as possible
d9d4b593533 is described below
commit d9d4b59353349e30135cdee718e18a129547ff41
Author: Jiaan Geng <be...@163.com>
AuthorDate: Wed Mar 15 20:01:21 2023 +0800
[SPARK-42783][SQL] Infer window group limit should run as late as possible
### What changes were proposed in this pull request?
https://github.com/apache/spark/pull/40142 have an unrelated change and is actually a regression. The change let infer window group limit runs early.
Infer window group limit should run as late as possible, it is more safe.
### Why are the changes needed?
Infer window group limit should run as late as possible.
### Does this PR introduce _any_ user-facing change?
'No'.
New feature.
### How was this patch tested?
Exists test cases.
Manually generate the micro benchmark.
```
Benchmark Top-K: Best Time(ms) Avg Time(ms) Stdev(ms) Rate(M/s) Per Row(ns) Relative
-----------------------------------------------------------------------------------------------------------------------------------------------
ROW_NUMBER (PARTITION: , WindowGroupLimit: false) 10972 11739 765 1.9 523.2 1.0X
ROW_NUMBER (PARTITION: , WindowGroupLimit: true) 1700 1738 29 12.3 81.0 6.5X
ROW_NUMBER (PARTITION: PARTITION BY b, WindowGroupLimit: false) 24317 24452 113 0.9 1159.5 0.5X
ROW_NUMBER (PARTITION: PARTITION BY b, WindowGroupLimit: true) 6608 6965 348 3.2 315.1 1.7X
RANK (PARTITION: , WindowGroupLimit: false) 11549 11850 160 1.8 550.7 1.0X
RANK (PARTITION: , WindowGroupLimit: true) 2916 3211 267 7.2 139.1 3.8X
RANK (PARTITION: PARTITION BY b, WindowGroupLimit: false) 24736 25951 565 0.8 1179.5 0.4X
RANK (PARTITION: PARTITION BY b, WindowGroupLimit: true) 6825 7256 497 3.1 325.5 1.6X
DENSE_RANK (PARTITION: , WindowGroupLimit: false) 11857 12513 652 1.8 565.4 0.9X
DENSE_RANK (PARTITION: , WindowGroupLimit: true) 2721 2937 113 7.7 129.8 4.0X
DENSE_RANK (PARTITION: PARTITION BY b, WindowGroupLimit: false) 24976 25686 760 0.8 1191.0 0.4X
DENSE_RANK (PARTITION: PARTITION BY b, WindowGroupLimit: true) 6568 6884 364 3.2 313.2 1.7X
```
Closes #40410 from beliefer/SPARK-42783.
Authored-by: Jiaan Geng <be...@163.com>
Signed-off-by: Wenchen Fan <we...@databricks.com>
---
.../apache/spark/sql/catalyst/optimizer/InferWindowGroupLimit.scala | 6 +++---
.../scala/org/apache/spark/sql/catalyst/optimizer/Optimizer.scala | 1 -
.../main/scala/org/apache/spark/sql/execution/SparkOptimizer.scala | 5 +++++
3 files changed, 8 insertions(+), 4 deletions(-)
diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/optimizer/InferWindowGroupLimit.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/optimizer/InferWindowGroupLimit.scala
index 8e3dc662205..261be291463 100644
--- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/optimizer/InferWindowGroupLimit.scala
+++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/optimizer/InferWindowGroupLimit.scala
@@ -84,11 +84,11 @@ object InferWindowGroupLimit extends Rule[LogicalPlan] with PredicateHelper {
}
// Pick a rank-like function with the smallest limit
selectedLimits.minBy(_._1) match {
- case (limit, rankLikeFunction) if limit <= conf.windowGroupLimitThreshold =>
+ case (limit, rankLikeFunction) if limit <= conf.windowGroupLimitThreshold &&
+ child.maxRows.forall(_ > limit) =>
if (limit > 0) {
val newFilterChild = if (rankLikeFunction.isInstanceOf[RowNumber] &&
- partitionSpec.isEmpty && child.maxRows.forall(_ > limit) &&
- limit < conf.topKSortFallbackThreshold) {
+ partitionSpec.isEmpty && limit < conf.topKSortFallbackThreshold) {
// Top n (Limit + Sort) have better performance than WindowGroupLimit if the
// window function is RowNumber and Window partitionSpec is empty.
Limit(Literal(limit), window)
diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/optimizer/Optimizer.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/optimizer/Optimizer.scala
index ca55a281605..a0d49c29470 100644
--- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/optimizer/Optimizer.scala
+++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/optimizer/Optimizer.scala
@@ -130,7 +130,6 @@ abstract class Optimizer(catalogManager: CatalogManager)
val operatorOptimizationBatch: Seq[Batch] = {
Batch("Operator Optimization before Inferring Filters", fixedPoint,
operatorOptimizationRuleSet: _*) ::
- Batch("Infer window group limit", Once, InferWindowGroupLimit) ::
Batch("Infer Filters", Once,
InferFiltersFromGenerate,
InferFiltersFromConstraints) ::
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/SparkOptimizer.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/SparkOptimizer.scala
index 8c420838ca2..f05fe9d60fb 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/execution/SparkOptimizer.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/SparkOptimizer.scala
@@ -84,6 +84,11 @@ class SparkOptimizer(
PushPredicateThroughNonJoin,
PushProjectionThroughLimit,
RemoveNoopOperators) :+
+ Batch("Infer window group limit", Once,
+ InferWindowGroupLimit,
+ LimitPushDown,
+ LimitPushDownThroughWindow,
+ EliminateLimits) :+
Batch("User Provided Optimizers", fixedPoint, experimentalMethods.extraOptimizations: _*) :+
Batch("Replace CTE with Repartition", Once, ReplaceCTERefWithRepartition)
---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@spark.apache.org
For additional commands, e-mail: commits-help@spark.apache.org