You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@spark.apache.org by yu...@apache.org on 2022/06/23 00:08:35 UTC

[spark] branch branch-3.3 updated: [SPARK-38614][SQL] Don't push down limit through window that's using percent_rank

This is an automated email from the ASF dual-hosted git repository.

yumwang pushed a commit to branch branch-3.3
in repository https://gitbox.apache.org/repos/asf/spark.git


The following commit(s) were added to refs/heads/branch-3.3 by this push:
     new bebfecb81da [SPARK-38614][SQL] Don't push down limit through window that's using percent_rank
bebfecb81da is described below

commit bebfecb81da2de33240d8ab37bd641985281844e
Author: Bruce Robbins <be...@gmail.com>
AuthorDate: Thu Jun 23 08:07:43 2022 +0800

    [SPARK-38614][SQL] Don't push down limit through window that's using percent_rank
    
    ### What changes were proposed in this pull request?
    
    Change `LimitPushDownThroughWindow` so that it no longer supports pushing down a limit through a window using percent_rank.
    
    ### Why are the changes needed?
    
    Given a query with a limit of _n_ rows, and a window whose child produces _m_ rows, percent_rank will label the _nth_ row as 100% rather than the _mth_ row.
    
    This behavior conflicts with Spark 3.1.3, Hive 2.3.9 and Prestodb 0.268.
    
    #### Example
    
    Assume this data:
    ```
    create table t1 stored as parquet as
    select *
    from range(101);
    ```
    And also assume this query:
    ```
    select id, percent_rank() over (order by id) as pr
    from t1
    limit 3;
    ```
    With Spark 3.2.1, 3.3.0, and master, the limit is applied before the percent_rank:
    ```
    0       0.0
    1       0.5
    2       1.0
    ```
    With Spark 3.1.3, and Hive 2.3.9, and Prestodb 0.268, the limit is applied _after_ percent_rank:
    
    Spark 3.1.3:
    ```
    0       0.0
    1       0.01
    2       0.02
    ```
    Hive 2.3.9:
    ```
    0: jdbc:hive2://localhost:10000> select id, percent_rank() over (order by id) as pr
    from t1
    limit 3;
    . . . . . . . . . . . . . . . .> . . . . . . . . . . . . . . . .> WARNING: Hive-on-MR is deprecated in Hive 2 and may not be available in the future versions. Consider using a different execution engine (i.e. spark, tez) or using Hive 1.X releases.
    +-----+-------+
    | id  |  pr   |
    +-----+-------+
    | 0   | 0.0   |
    | 1   | 0.01  |
    | 2   | 0.02  |
    +-----+-------+
    3 rows selected (4.621 seconds)
    0: jdbc:hive2://localhost:10000>
    ```
    
    Prestodb 0.268:
    ```
     id |  pr
    ----+------
      0 |  0.0
      1 | 0.01
      2 | 0.02
    (3 rows)
    ```
    With this PR, Spark will apply the limit after percent_rank.
    
    ### Does this PR introduce _any_ user-facing change?
    
    No (besides changing percent_rank's behavior to be more like Spark 3.1.3, Hive, and Prestodb).
    
    ### How was this patch tested?
    
    New unit tests.
    
    Closes #36951 from bersprockets/percent_rank_issue.
    
    Authored-by: Bruce Robbins <be...@gmail.com>
    Signed-off-by: Yuming Wang <yu...@ebay.com>
    (cherry picked from commit a63ce5676e79f15903e9fd533a26a6c3ec9bf7a8)
    Signed-off-by: Yuming Wang <yu...@ebay.com>
---
 .../sql/catalyst/optimizer/LimitPushDownThroughWindow.scala |  5 +++--
 .../optimizer/LimitPushdownThroughWindowSuite.scala         | 13 ++++++++++++-
 .../apache/spark/sql/DataFrameWindowFunctionsSuite.scala    | 13 +++++++++++++
 3 files changed, 28 insertions(+), 3 deletions(-)

diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/optimizer/LimitPushDownThroughWindow.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/optimizer/LimitPushDownThroughWindow.scala
index eaea167ee9f..635434741b9 100644
--- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/optimizer/LimitPushDownThroughWindow.scala
+++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/optimizer/LimitPushDownThroughWindow.scala
@@ -17,7 +17,7 @@
 
 package org.apache.spark.sql.catalyst.optimizer
 
-import org.apache.spark.sql.catalyst.expressions.{Alias, CurrentRow, IntegerLiteral, NamedExpression, RankLike, RowFrame, RowNumberLike, SpecifiedWindowFrame, UnboundedPreceding, WindowExpression, WindowSpecDefinition}
+import org.apache.spark.sql.catalyst.expressions.{Alias, CurrentRow, DenseRank, IntegerLiteral, NamedExpression, NTile, Rank, RowFrame, RowNumber, SpecifiedWindowFrame, UnboundedPreceding, WindowExpression, WindowSpecDefinition}
 import org.apache.spark.sql.catalyst.plans.logical.{Limit, LocalLimit, LogicalPlan, Project, Sort, Window}
 import org.apache.spark.sql.catalyst.rules.Rule
 import org.apache.spark.sql.catalyst.trees.TreePattern.{LIMIT, WINDOW}
@@ -33,7 +33,8 @@ object LimitPushDownThroughWindow extends Rule[LogicalPlan] {
   // The window frame of RankLike and RowNumberLike can only be UNBOUNDED PRECEDING to CURRENT ROW.
   private def supportsPushdownThroughWindow(
       windowExpressions: Seq[NamedExpression]): Boolean = windowExpressions.forall {
-    case Alias(WindowExpression(_: RankLike | _: RowNumberLike, WindowSpecDefinition(Nil, _,
+    case Alias(WindowExpression(_: Rank | _: DenseRank | _: NTile | _: RowNumber,
+        WindowSpecDefinition(Nil, _,
         SpecifiedWindowFrame(RowFrame, UnboundedPreceding, CurrentRow))), _) => true
     case _ => false
   }
diff --git a/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/optimizer/LimitPushdownThroughWindowSuite.scala b/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/optimizer/LimitPushdownThroughWindowSuite.scala
index f2c1f452d02..b09d10b2601 100644
--- a/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/optimizer/LimitPushdownThroughWindowSuite.scala
+++ b/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/optimizer/LimitPushdownThroughWindowSuite.scala
@@ -20,7 +20,7 @@ package org.apache.spark.sql.catalyst.optimizer
 import org.apache.spark.sql.Row
 import org.apache.spark.sql.catalyst.dsl.expressions._
 import org.apache.spark.sql.catalyst.dsl.plans._
-import org.apache.spark.sql.catalyst.expressions.{CurrentRow, Rank, RowFrame, RowNumber, SpecifiedWindowFrame, UnboundedPreceding}
+import org.apache.spark.sql.catalyst.expressions.{CurrentRow, PercentRank, Rank, RowFrame, RowNumber, SpecifiedWindowFrame, UnboundedPreceding}
 import org.apache.spark.sql.catalyst.plans._
 import org.apache.spark.sql.catalyst.plans.logical._
 import org.apache.spark.sql.catalyst.rules._
@@ -187,4 +187,15 @@ class LimitPushdownThroughWindowSuite extends PlanTest {
       Optimize.execute(originalQuery.analyze),
       WithoutOptimize.execute(originalQuery.analyze))
   }
+
+  test("SPARK-38614: Should not push through percent_rank window function") {
+    val originalQuery = testRelation
+      .select(a, b, c,
+        windowExpr(new PercentRank(), windowSpec(Nil, c.desc :: Nil, windowFrame)).as("rn"))
+      .limit(2)
+
+    comparePlans(
+      Optimize.execute(originalQuery.analyze),
+      WithoutOptimize.execute(originalQuery.analyze))
+  }
 }
diff --git a/sql/core/src/test/scala/org/apache/spark/sql/DataFrameWindowFunctionsSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/DataFrameWindowFunctionsSuite.scala
index 25d676f5d93..557b278f9c4 100644
--- a/sql/core/src/test/scala/org/apache/spark/sql/DataFrameWindowFunctionsSuite.scala
+++ b/sql/core/src/test/scala/org/apache/spark/sql/DataFrameWindowFunctionsSuite.scala
@@ -1190,4 +1190,17 @@ class DataFrameWindowFunctionsSuite extends QueryTest
       )
     )
   }
+
+  test("SPARK-38614: percent_rank should apply before limit") {
+    val df = Seq.tabulate(101)(identity).toDF("id")
+    val w = Window.orderBy("id")
+    checkAnswer(
+      df.select($"id", percent_rank().over(w)).limit(3),
+      Seq(
+        Row(0, 0.0d),
+        Row(1, 0.01d),
+        Row(2, 0.02d)
+      )
+    )
+  }
 }


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@spark.apache.org
For additional commands, e-mail: commits-help@spark.apache.org