You are viewing a plain text version of this content. The canonical link for it is here.
Posted to issues@spark.apache.org by "Wenchen Fan (Jira)" <ji...@apache.org> on 2022/08/05 10:47:00 UTC

[jira] [Resolved] (SPARK-38034) Optimize time complexity and extend applicable cases for TransposeWindow

     [ https://issues.apache.org/jira/browse/SPARK-38034?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ]

Wenchen Fan resolved SPARK-38034.
---------------------------------
    Fix Version/s: 3.4.0
                   3.3.1
                   3.2.3
         Assignee: zhou xiang
       Resolution: Fixed

> Optimize time complexity and extend applicable cases for TransposeWindow 
> -------------------------------------------------------------------------
>
>                 Key: SPARK-38034
>                 URL: https://issues.apache.org/jira/browse/SPARK-38034
>             Project: Spark
>          Issue Type: Improvement
>          Components: SQL
>    Affects Versions: 3.2.0
>            Reporter: zhou xiang
>            Assignee: zhou xiang
>            Priority: Minor
>             Fix For: 3.4.0, 3.3.1, 3.2.3
>
>
> TransposeWindow rule will try to eliminate unnecessary shuffle:
> {code:java}
> /**
>  * Transpose Adjacent Window Expressions.
>  * - If the partition spec of the parent Window expression is compatible with the partition spec
>  *   of the child window expression, transpose them.
>  */
> object TransposeWindow extends Rule[LogicalPlan] {
>   private def compatiblePartitions(ps1 : Seq[Expression], ps2: Seq[Expression]): Boolean = {
>     ps1.length < ps2.length && ps2.take(ps1.length).permutations.exists(ps1.zip(_).forall {
>       case (l, r) => l.semanticEquals(r)
>     })
>   }
>   private def windowsCompatible(w1: Window, w2: Window): Boolean = {
>     w1.references.intersect(w2.windowOutputSet).isEmpty &&
>       w1.expressions.forall(_.deterministic) &&
>       w2.expressions.forall(_.deterministic) &&
>       compatiblePartitions(w1.partitionSpec, w2.partitionSpec)
>   }
>   def apply(plan: LogicalPlan): LogicalPlan = plan.transformUpWithPruning(
>     _.containsPattern(WINDOW), ruleId) {
>     case w1 @ Window(_, _, _, w2 @ Window(_, _, _, grandChild))
>       if windowsCompatible(w1, w2) =>
>       Project(w1.output, w2.copy(child = w1.copy(child = grandChild)))
>     case w1 @ Window(_, _, _, Project(pl, w2 @ Window(_, _, _, grandChild)))
>       if windowsCompatible(w1, w2) && w1.references.subsetOf(grandChild.outputSet) =>
>       Project(
>         pl ++ w1.windowOutputSet,
>         w2.copy(child = w1.copy(child = grandChild)))
>   }
> } {code}
> but the function compatiblePartitions will only take the first n elements of the ps2 sequence, for some cases, this will not take effect, like the case below: 
> {code:java}
> val df = spark.range(10).selectExpr("id AS a", "id AS b", "id AS c", "id AS d") 
> df.selectExpr(
>     "sum(`d`) OVER(PARTITION BY `b`,`a`) as e", 
>     "sum(`c`) OVER(PARTITION BY `a`) as f"
>   ).explain
> {code}
> Current plan
> {code:java}
> == Physical Plan ==
> *(5) Project [e#10L, f#11L]
> +- Window [sum(c#4L) windowspecdefinition(a#2L, specifiedwindowframe(RowFrame, unboundedpreceding$(), unboundedfollowing$())) AS f#11L], [a#2L]
>    +- *(4) Sort [a#2L ASC NULLS FIRST], false, 0
>       +- Exchange hashpartitioning(a#2L, 200), true, [id=#41]
>          +- *(3) Project [a#2L, c#4L, e#10L]
>             +- Window [sum(d#5L) windowspecdefinition(b#3L, a#2L, specifiedwindowframe(RowFrame, unboundedpreceding$(), unboundedfollowing$())) AS e#10L], [b#3L, a#2L]
>                +- *(2) Sort [b#3L ASC NULLS FIRST, a#2L ASC NULLS FIRST], false, 0
>                   +- Exchange hashpartitioning(b#3L, a#2L, 200), true, [id=#33]
>                      +- *(1) Project [id#0L AS d#5L, id#0L AS b#3L, id#0L AS a#2L, id#0L AS c#4L]
>                         +- *(1) Range (0, 10, step=1, splits=10) {code}
> Expected plan:
> {code:java}
> == Physical Plan ==
> *(4) Project [e#924L, f#925L]
> +- Window [sum(d#43L) windowspecdefinition(b#41L, a#40L, specifiedwindowframe(RowFrame, unboundedpreceding$(), unboundedfollowing$())) AS e#924L], [b#41L, a#40L]
>    +- *(3) Sort [b#41L ASC NULLS FIRST, a#40L ASC NULLS FIRST], false, 0
>       +- *(3) Project [d#43L, b#41L, a#40L, f#925L]
>          +- Window [sum(c#42L) windowspecdefinition(a#40L, specifiedwindowframe(RowFrame, unboundedpreceding$(), unboundedfollowing$())) AS f#925L], [a#40L]
>             +- *(2) Sort [a#40L ASC NULLS FIRST], false, 0
>                +- Exchange hashpartitioning(a#40L, 200), true, [id=#282]
>                   +- *(1) Project [id#38L AS d#43L, id#38L AS b#41L, id#38L AS a#40L, id#38L AS c#42L]
>                      +- *(1) Range (0, 10, step=1, splits=10) {code}
> Also the permutations method has a O(n!) time complexity, which is very expensive when there are many partition columns, we could try to optimize it.



--
This message was sent by Atlassian Jira
(v8.20.10#820010)

---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscribe@spark.apache.org
For additional commands, e-mail: issues-help@spark.apache.org