You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@spark.apache.org by we...@apache.org on 2022/08/05 10:43:52 UTC

[spark] branch branch-3.2 updated: [SPARK-38034][SQL] Optimize TransposeWindow rule

This is an automated email from the ASF dual-hosted git repository.

wenchen pushed a commit to branch branch-3.2
in repository https://gitbox.apache.org/repos/asf/spark.git


The following commit(s) were added to refs/heads/branch-3.2 by this push:
     new 0f609ffe86b [SPARK-38034][SQL] Optimize TransposeWindow rule
0f609ffe86b is described below

commit 0f609ffe86beeb0cc039ac85091544714f66c459
Author: xzhou <15...@163.com>
AuthorDate: Fri Aug 5 18:42:57 2022 +0800

    [SPARK-38034][SQL] Optimize TransposeWindow rule
    
    ### What changes were proposed in this pull request?
    
    Optimize the TransposeWindow rule to extend applicable cases and optimize time complexity.
    TransposeWindow rule will try to eliminate unnecessary shuffle:
    
    but the function compatiblePartitions will only take the first n elements of the window2 partition sequence, for some cases, this will not take effect, like the case below: 
    
    val df = spark.range(10).selectExpr("id AS a", "id AS b", "id AS c", "id AS d")
    df.selectExpr(
        "sum(`d`) OVER(PARTITION BY `b`,`a`) as e",
        "sum(`c`) OVER(PARTITION BY `a`) as f"
      ).explain
    
    Current plan
    
    == Physical Plan ==
    *(5) Project [e#10L, f#11L]
    +- Window [sum(c#4L) windowspecdefinition(a#2L, specifiedwindowframe(RowFrame, unboundedpreceding$(), unboundedfollowing$())) AS f#11L], [a#2L]
       +- *(4) Sort [a#2L ASC NULLS FIRST], false, 0
          +- Exchange hashpartitioning(a#2L, 200), true, [id=#41]
             +- *(3) Project [a#2L, c#4L, e#10L]
                +- Window [sum(d#5L) windowspecdefinition(b#3L, a#2L, specifiedwindowframe(RowFrame, unboundedpreceding$(), unboundedfollowing$())) AS e#10L], [b#3L, a#2L]
                   +- *(2) Sort [b#3L ASC NULLS FIRST, a#2L ASC NULLS FIRST], false, 0
                      +- Exchange hashpartitioning(b#3L, a#2L, 200), true, [id=#33]
                         +- *(1) Project [id#0L AS d#5L, id#0L AS b#3L, id#0L AS a#2L, id#0L AS c#4L]
                            +- *(1) Range (0, 10, step=1, splits=10)
    
    Expected plan:
    
    == Physical Plan ==
    *(4) Project [e#924L, f#925L]
    +- Window [sum(d#43L) windowspecdefinition(b#41L, a#40L, specifiedwindowframe(RowFrame, unboundedpreceding$(), unboundedfollowing$())) AS e#924L], [b#41L, a#40L]
       +- *(3) Sort [b#41L ASC NULLS FIRST, a#40L ASC NULLS FIRST], false, 0
          +- *(3) Project [d#43L, b#41L, a#40L, f#925L]
             +- Window [sum(c#42L) windowspecdefinition(a#40L, specifiedwindowframe(RowFrame, unboundedpreceding$(), unboundedfollowing$())) AS f#925L], [a#40L]
                +- *(2) Sort [a#40L ASC NULLS FIRST], false, 0
                   +- Exchange hashpartitioning(a#40L, 200), true, [id=#282]
                      +- *(1) Project [id#38L AS d#43L, id#38L AS b#41L, id#38L AS a#40L, id#38L AS c#42L]
                         +- *(1) Range (0, 10, step=1, splits=10)
    
    Also the permutations method has a O(n!) time complexity, which is very expensive when there are many partition columns, we could try to optimize it.
    
    ### Why are the changes needed?
    
    We could apply the rule for more cases, which could improve the execution performance by eliminate unnecessary shuffle, and by reducing the time complexity from O(n!) to O(n2), the performance for the rule itself could improve
    
    ### Does this PR introduce _any_ user-facing change?
    
    no
    
    ### How was this patch tested?
    
    UT
    
    Closes #35334 from constzhou/SPARK-38034_optimize_transpose_window_rule.
    
    Authored-by: xzhou <15...@163.com>
    Signed-off-by: Wenchen Fan <we...@databricks.com>
    (cherry picked from commit 0cc331dc7e51e53000063052b0c8ace417eb281b)
    Signed-off-by: Wenchen Fan <we...@databricks.com>
---
 .../apache/spark/sql/catalyst/optimizer/Optimizer.scala |  6 +++---
 .../sql/catalyst/optimizer/TransposeWindowSuite.scala   | 17 +++++++++++++++++
 2 files changed, 20 insertions(+), 3 deletions(-)

diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/optimizer/Optimizer.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/optimizer/Optimizer.scala
index 9522dc5f55a..e0f94dfdc74 100644
--- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/optimizer/Optimizer.scala
+++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/optimizer/Optimizer.scala
@@ -1094,9 +1094,9 @@ object CollapseWindow extends Rule[LogicalPlan] {
  */
 object TransposeWindow extends Rule[LogicalPlan] {
   private def compatiblePartitions(ps1 : Seq[Expression], ps2: Seq[Expression]): Boolean = {
-    ps1.length < ps2.length && ps2.take(ps1.length).permutations.exists(ps1.zip(_).forall {
-      case (l, r) => l.semanticEquals(r)
-    })
+    ps1.length < ps2.length && ps1.forall { expr1 =>
+      ps2.exists(expr1.semanticEquals)
+    }
   }
 
   private def windowsCompatible(w1: Window, w2: Window): Boolean = {
diff --git a/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/optimizer/TransposeWindowSuite.scala b/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/optimizer/TransposeWindowSuite.scala
index 4fd681d4ced..4834652368f 100644
--- a/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/optimizer/TransposeWindowSuite.scala
+++ b/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/optimizer/TransposeWindowSuite.scala
@@ -142,4 +142,21 @@ class TransposeWindowSuite extends PlanTest {
     comparePlans(optimized, analyzed)
   }
 
+  test("SPARK-38034: transpose two adjacent windows with compatible partitions " +
+    "which is not a prefix") {
+    val query = testRelation
+      .window(Seq(sum(c).as('sum_a_2)), partitionSpec4, orderSpec2)
+      .window(Seq(sum(c).as('sum_a_1)), partitionSpec3, orderSpec1)
+
+    val analyzed = query.analyze
+    val optimized = Optimize.execute(analyzed)
+
+    val correctAnswer = testRelation
+      .window(Seq(sum(c).as('sum_a_1)), partitionSpec3, orderSpec1)
+      .window(Seq(sum(c).as('sum_a_2)), partitionSpec4, orderSpec2)
+      .select('a, 'b, 'c, 'd, 'sum_a_2, 'sum_a_1)
+
+    comparePlans(optimized, correctAnswer.analyze)
+  }
+
 }


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@spark.apache.org
For additional commands, e-mail: commits-help@spark.apache.org