You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@spark.apache.org by gu...@apache.org on 2022/08/09 02:41:01 UTC

[spark] branch branch-3.2 updated: [SPARK-40002][SQL] Don't push down limit through window using ntile

This is an automated email from the ASF dual-hosted git repository.

gurwls223 pushed a commit to branch branch-3.2
in repository https://gitbox.apache.org/repos/asf/spark.git


The following commit(s) were added to refs/heads/branch-3.2 by this push:
     new 16a67883280 [SPARK-40002][SQL] Don't push down limit through window using ntile
16a67883280 is described below

commit 16a678832802ba631ce531d6f72d701543c081ac
Author: Bruce Robbins <be...@gmail.com>
AuthorDate: Tue Aug 9 11:39:58 2022 +0900

    [SPARK-40002][SQL] Don't push down limit through window using ntile
    
    ### What changes were proposed in this pull request?
    
    Change `LimitPushDownThroughWindow` so that it no longer supports pushing down a limit through a window using ntile.
    
    ### Why are the changes needed?
    
    In an unpartitioned window, the ntile function is currently applied to the result of the limit. This behavior produces results that conflict with Spark 3.1.3, Hive 2.3.9 and Prestodb 0.268
    
    #### Example
    
    Assume this data:
    ```
    create table t1 stored as parquet as
    select *
    from range(101);
    ```
    Also assume this query:
    ```
    select id, ntile(10) over (order by id) as nt
    from t1
    limit 10;
    ```
    With Spark 3.2.2, Spark 3.3.0, and master, the limit is applied before the ntile function:
    ```
    +---+---+
    |id |nt |
    +---+---+
    |0  |1  |
    |1  |2  |
    |2  |3  |
    |3  |4  |
    |4  |5  |
    |5  |6  |
    |6  |7  |
    |7  |8  |
    |8  |9  |
    |9  |10 |
    +---+---+
    ```
    With Spark 3.1.3, and Hive 2.3.9, and Prestodb 0.268, the limit is applied _after_ ntile.
    
    Spark 3.1.3:
    ```
    +---+---+
    |id |nt |
    +---+---+
    |0  |1  |
    |1  |1  |
    |2  |1  |
    |3  |1  |
    |4  |1  |
    |5  |1  |
    |6  |1  |
    |7  |1  |
    |8  |1  |
    |9  |1  |
    +---+---+
    ```
    Hive 2.3.9:
    ```
    +-----+-----+
    | id  | nt  |
    +-----+-----+
    | 0   | 1   |
    | 1   | 1   |
    | 2   | 1   |
    | 3   | 1   |
    | 4   | 1   |
    | 5   | 1   |
    | 6   | 1   |
    | 7   | 1   |
    | 8   | 1   |
    | 9   | 1   |
    +-----+-----+
    10 rows selected (1.72 seconds)
    ```
    Prestodb 0.268:
    ```
     id | nt
    ----+----
      0 |  1
      1 |  1
      2 |  1
      3 |  1
      4 |  1
      5 |  1
      6 |  1
      7 |  1
      8 |  1
      9 |  1
    (10 rows)
    
    ```
    ### Does this PR introduce _any_ user-facing change?
    
    No.
    
    ### How was this patch tested?
    
    Two new unit tests.
    
    Closes #37443 from bersprockets/pushdown_ntile.
    
    Authored-by: Bruce Robbins <be...@gmail.com>
    Signed-off-by: Hyukjin Kwon <gu...@apache.org>
    (cherry picked from commit c9156e5a3b9cb290c7cdda8db298c9875e67aa5e)
    Signed-off-by: Hyukjin Kwon <gu...@apache.org>
---
 .../sql/catalyst/optimizer/LimitPushDownThroughWindow.scala |  5 ++---
 .../optimizer/LimitPushdownThroughWindowSuite.scala         | 13 ++++++++++++-
 .../apache/spark/sql/DataFrameWindowFunctionsSuite.scala    | 13 +++++++++++++
 3 files changed, 27 insertions(+), 4 deletions(-)

diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/optimizer/LimitPushDownThroughWindow.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/optimizer/LimitPushDownThroughWindow.scala
index 635434741b9..88f92262dcc 100644
--- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/optimizer/LimitPushDownThroughWindow.scala
+++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/optimizer/LimitPushDownThroughWindow.scala
@@ -17,7 +17,7 @@
 
 package org.apache.spark.sql.catalyst.optimizer
 
-import org.apache.spark.sql.catalyst.expressions.{Alias, CurrentRow, DenseRank, IntegerLiteral, NamedExpression, NTile, Rank, RowFrame, RowNumber, SpecifiedWindowFrame, UnboundedPreceding, WindowExpression, WindowSpecDefinition}
+import org.apache.spark.sql.catalyst.expressions.{Alias, CurrentRow, DenseRank, IntegerLiteral, NamedExpression, Rank, RowFrame, RowNumber, SpecifiedWindowFrame, UnboundedPreceding, WindowExpression, WindowSpecDefinition}
 import org.apache.spark.sql.catalyst.plans.logical.{Limit, LocalLimit, LogicalPlan, Project, Sort, Window}
 import org.apache.spark.sql.catalyst.rules.Rule
 import org.apache.spark.sql.catalyst.trees.TreePattern.{LIMIT, WINDOW}
@@ -33,8 +33,7 @@ object LimitPushDownThroughWindow extends Rule[LogicalPlan] {
   // The window frame of RankLike and RowNumberLike can only be UNBOUNDED PRECEDING to CURRENT ROW.
   private def supportsPushdownThroughWindow(
       windowExpressions: Seq[NamedExpression]): Boolean = windowExpressions.forall {
-    case Alias(WindowExpression(_: Rank | _: DenseRank | _: NTile | _: RowNumber,
-        WindowSpecDefinition(Nil, _,
+    case Alias(WindowExpression(_: Rank | _: DenseRank | _: RowNumber, WindowSpecDefinition(Nil, _,
         SpecifiedWindowFrame(RowFrame, UnboundedPreceding, CurrentRow))), _) => true
     case _ => false
   }
diff --git a/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/optimizer/LimitPushdownThroughWindowSuite.scala b/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/optimizer/LimitPushdownThroughWindowSuite.scala
index b09d10b2601..99812d20bf5 100644
--- a/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/optimizer/LimitPushdownThroughWindowSuite.scala
+++ b/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/optimizer/LimitPushdownThroughWindowSuite.scala
@@ -20,7 +20,7 @@ package org.apache.spark.sql.catalyst.optimizer
 import org.apache.spark.sql.Row
 import org.apache.spark.sql.catalyst.dsl.expressions._
 import org.apache.spark.sql.catalyst.dsl.plans._
-import org.apache.spark.sql.catalyst.expressions.{CurrentRow, PercentRank, Rank, RowFrame, RowNumber, SpecifiedWindowFrame, UnboundedPreceding}
+import org.apache.spark.sql.catalyst.expressions.{CurrentRow, NTile, PercentRank, Rank, RowFrame, RowNumber, SpecifiedWindowFrame, UnboundedPreceding}
 import org.apache.spark.sql.catalyst.plans._
 import org.apache.spark.sql.catalyst.plans.logical._
 import org.apache.spark.sql.catalyst.rules._
@@ -198,4 +198,15 @@ class LimitPushdownThroughWindowSuite extends PlanTest {
       Optimize.execute(originalQuery.analyze),
       WithoutOptimize.execute(originalQuery.analyze))
   }
+
+  test("SPARK-40002: Should not push through ntile window function") {
+    val originalQuery = testRelation
+      .select(a, b, c,
+        windowExpr(new NTile(), windowSpec(Nil, c.desc :: Nil, windowFrame)).as("nt"))
+      .limit(2)
+
+    comparePlans(
+      Optimize.execute(originalQuery.analyze),
+      WithoutOptimize.execute(originalQuery.analyze))
+  }
 }
diff --git a/sql/core/src/test/scala/org/apache/spark/sql/DataFrameWindowFunctionsSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/DataFrameWindowFunctionsSuite.scala
index 78bb3180ff1..6e8a06db9d5 100644
--- a/sql/core/src/test/scala/org/apache/spark/sql/DataFrameWindowFunctionsSuite.scala
+++ b/sql/core/src/test/scala/org/apache/spark/sql/DataFrameWindowFunctionsSuite.scala
@@ -1127,4 +1127,17 @@ class DataFrameWindowFunctionsSuite extends QueryTest
       )
     )
   }
+
+  test("SPARK-40002: ntile should apply before limit") {
+    val df = Seq.tabulate(101)(identity).toDF("id")
+    val w = Window.orderBy("id")
+    checkAnswer(
+      df.select($"id", ntile(10).over(w)).limit(3),
+      Seq(
+        Row(0, 1),
+        Row(1, 1),
+        Row(2, 1)
+      )
+    )
+  }
 }


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@spark.apache.org
For additional commands, e-mail: commits-help@spark.apache.org