You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@spark.apache.org by yu...@apache.org on 2022/06/23 00:08:35 UTC
[spark] branch branch-3.3 updated: [SPARK-38614][SQL] Don't push down limit through window that's using percent_rank
This is an automated email from the ASF dual-hosted git repository.
yumwang pushed a commit to branch branch-3.3
in repository https://gitbox.apache.org/repos/asf/spark.git
The following commit(s) were added to refs/heads/branch-3.3 by this push:
new bebfecb81da [SPARK-38614][SQL] Don't push down limit through window that's using percent_rank
bebfecb81da is described below
commit bebfecb81da2de33240d8ab37bd641985281844e
Author: Bruce Robbins <be...@gmail.com>
AuthorDate: Thu Jun 23 08:07:43 2022 +0800
[SPARK-38614][SQL] Don't push down limit through window that's using percent_rank
### What changes were proposed in this pull request?
Change `LimitPushDownThroughWindow` so that it no longer supports pushing down a limit through a window using percent_rank.
### Why are the changes needed?
Given a query with a limit of _n_ rows, and a window whose child produces _m_ rows, percent_rank will label the _nth_ row as 100% rather than the _mth_ row.
This behavior conflicts with Spark 3.1.3, Hive 2.3.9 and Prestodb 0.268.
#### Example
Assume this data:
```
create table t1 stored as parquet as
select *
from range(101);
```
And also assume this query:
```
select id, percent_rank() over (order by id) as pr
from t1
limit 3;
```
With Spark 3.2.1, 3.3.0, and master, the limit is applied before the percent_rank:
```
0 0.0
1 0.5
2 1.0
```
With Spark 3.1.3, and Hive 2.3.9, and Prestodb 0.268, the limit is applied _after_ percent_rank:
Spark 3.1.3:
```
0 0.0
1 0.01
2 0.02
```
Hive 2.3.9:
```
0: jdbc:hive2://localhost:10000> select id, percent_rank() over (order by id) as pr
from t1
limit 3;
. . . . . . . . . . . . . . . .> . . . . . . . . . . . . . . . .> WARNING: Hive-on-MR is deprecated in Hive 2 and may not be available in the future versions. Consider using a different execution engine (i.e. spark, tez) or using Hive 1.X releases.
+-----+-------+
| id | pr |
+-----+-------+
| 0 | 0.0 |
| 1 | 0.01 |
| 2 | 0.02 |
+-----+-------+
3 rows selected (4.621 seconds)
0: jdbc:hive2://localhost:10000>
```
Prestodb 0.268:
```
id | pr
----+------
0 | 0.0
1 | 0.01
2 | 0.02
(3 rows)
```
With this PR, Spark will apply the limit after percent_rank.
### Does this PR introduce _any_ user-facing change?
No (besides changing percent_rank's behavior to be more like Spark 3.1.3, Hive, and Prestodb).
### How was this patch tested?
New unit tests.
Closes #36951 from bersprockets/percent_rank_issue.
Authored-by: Bruce Robbins <be...@gmail.com>
Signed-off-by: Yuming Wang <yu...@ebay.com>
(cherry picked from commit a63ce5676e79f15903e9fd533a26a6c3ec9bf7a8)
Signed-off-by: Yuming Wang <yu...@ebay.com>
---
.../sql/catalyst/optimizer/LimitPushDownThroughWindow.scala | 5 +++--
.../optimizer/LimitPushdownThroughWindowSuite.scala | 13 ++++++++++++-
.../apache/spark/sql/DataFrameWindowFunctionsSuite.scala | 13 +++++++++++++
3 files changed, 28 insertions(+), 3 deletions(-)
diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/optimizer/LimitPushDownThroughWindow.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/optimizer/LimitPushDownThroughWindow.scala
index eaea167ee9f..635434741b9 100644
--- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/optimizer/LimitPushDownThroughWindow.scala
+++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/optimizer/LimitPushDownThroughWindow.scala
@@ -17,7 +17,7 @@
package org.apache.spark.sql.catalyst.optimizer
-import org.apache.spark.sql.catalyst.expressions.{Alias, CurrentRow, IntegerLiteral, NamedExpression, RankLike, RowFrame, RowNumberLike, SpecifiedWindowFrame, UnboundedPreceding, WindowExpression, WindowSpecDefinition}
+import org.apache.spark.sql.catalyst.expressions.{Alias, CurrentRow, DenseRank, IntegerLiteral, NamedExpression, NTile, Rank, RowFrame, RowNumber, SpecifiedWindowFrame, UnboundedPreceding, WindowExpression, WindowSpecDefinition}
import org.apache.spark.sql.catalyst.plans.logical.{Limit, LocalLimit, LogicalPlan, Project, Sort, Window}
import org.apache.spark.sql.catalyst.rules.Rule
import org.apache.spark.sql.catalyst.trees.TreePattern.{LIMIT, WINDOW}
@@ -33,7 +33,8 @@ object LimitPushDownThroughWindow extends Rule[LogicalPlan] {
// The window frame of RankLike and RowNumberLike can only be UNBOUNDED PRECEDING to CURRENT ROW.
private def supportsPushdownThroughWindow(
windowExpressions: Seq[NamedExpression]): Boolean = windowExpressions.forall {
- case Alias(WindowExpression(_: RankLike | _: RowNumberLike, WindowSpecDefinition(Nil, _,
+ case Alias(WindowExpression(_: Rank | _: DenseRank | _: NTile | _: RowNumber,
+ WindowSpecDefinition(Nil, _,
SpecifiedWindowFrame(RowFrame, UnboundedPreceding, CurrentRow))), _) => true
case _ => false
}
diff --git a/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/optimizer/LimitPushdownThroughWindowSuite.scala b/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/optimizer/LimitPushdownThroughWindowSuite.scala
index f2c1f452d02..b09d10b2601 100644
--- a/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/optimizer/LimitPushdownThroughWindowSuite.scala
+++ b/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/optimizer/LimitPushdownThroughWindowSuite.scala
@@ -20,7 +20,7 @@ package org.apache.spark.sql.catalyst.optimizer
import org.apache.spark.sql.Row
import org.apache.spark.sql.catalyst.dsl.expressions._
import org.apache.spark.sql.catalyst.dsl.plans._
-import org.apache.spark.sql.catalyst.expressions.{CurrentRow, Rank, RowFrame, RowNumber, SpecifiedWindowFrame, UnboundedPreceding}
+import org.apache.spark.sql.catalyst.expressions.{CurrentRow, PercentRank, Rank, RowFrame, RowNumber, SpecifiedWindowFrame, UnboundedPreceding}
import org.apache.spark.sql.catalyst.plans._
import org.apache.spark.sql.catalyst.plans.logical._
import org.apache.spark.sql.catalyst.rules._
@@ -187,4 +187,15 @@ class LimitPushdownThroughWindowSuite extends PlanTest {
Optimize.execute(originalQuery.analyze),
WithoutOptimize.execute(originalQuery.analyze))
}
+
+ test("SPARK-38614: Should not push through percent_rank window function") {
+ val originalQuery = testRelation
+ .select(a, b, c,
+ windowExpr(new PercentRank(), windowSpec(Nil, c.desc :: Nil, windowFrame)).as("rn"))
+ .limit(2)
+
+ comparePlans(
+ Optimize.execute(originalQuery.analyze),
+ WithoutOptimize.execute(originalQuery.analyze))
+ }
}
diff --git a/sql/core/src/test/scala/org/apache/spark/sql/DataFrameWindowFunctionsSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/DataFrameWindowFunctionsSuite.scala
index 25d676f5d93..557b278f9c4 100644
--- a/sql/core/src/test/scala/org/apache/spark/sql/DataFrameWindowFunctionsSuite.scala
+++ b/sql/core/src/test/scala/org/apache/spark/sql/DataFrameWindowFunctionsSuite.scala
@@ -1190,4 +1190,17 @@ class DataFrameWindowFunctionsSuite extends QueryTest
)
)
}
+
+ test("SPARK-38614: percent_rank should apply before limit") {
+ val df = Seq.tabulate(101)(identity).toDF("id")
+ val w = Window.orderBy("id")
+ checkAnswer(
+ df.select($"id", percent_rank().over(w)).limit(3),
+ Seq(
+ Row(0, 0.0d),
+ Row(1, 0.01d),
+ Row(2, 0.02d)
+ )
+ )
+ }
}
---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@spark.apache.org
For additional commands, e-mail: commits-help@spark.apache.org