You are viewing a plain text version of this content. The canonical link for it is here.
Posted to issues@spark.apache.org by "Apache Spark (JIRA)" <ji...@apache.org> on 2018/11/05 10:34:01 UTC
[jira] [Commented] (SPARK-24066) Add new optimization rule to
eliminate unnecessary sort by exchanged adjacent Window expressions
[ https://issues.apache.org/jira/browse/SPARK-24066?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16674930#comment-16674930 ]
Apache Spark commented on SPARK-24066:
--------------------------------------
User 'heary-cao' has created a pull request for this issue:
https://github.com/apache/spark/pull/22945
> Add new optimization rule to eliminate unnecessary sort by exchanged adjacent Window expressions
> ------------------------------------------------------------------------------------------------
>
> Key: SPARK-24066
> URL: https://issues.apache.org/jira/browse/SPARK-24066
> Project: Spark
> Issue Type: Improvement
> Components: SQL
> Affects Versions: 2.4.0
> Reporter: caoxuewen
> Priority: Major
>
> Currently, when two adjacent window functions have the same partition and the same intersection of order,
> There will be two sorted after shuffling, which is not necessary. This PR adds a new optimization rule to eliminate unnecessary sort by exchanged adjacent Window expressions.
> For example:
> val df = Seq(
> ("a", "p1", 10.0, 20.0, 30.0),
> ("a", "p2", 20.0, 10.0, 40.0)).toDF("key", "value", "value1", "value2", "value3").select(
> $"key",
> sum("value1").over(Window.partitionBy("key").orderBy("value")),
> max("value2").over(Window.partitionBy("key").orderBy("value", "value1")),
> avg("value3").over(Window.partitionBy("key").orderBy("value", "value1", "value2"))
> ).queryExecution.executedPlan
>
> Before this PR:
> *(5) Project [key#16, sum(value1) OVER (PARTITION BY key ORDER BY value ASC NULLS FIRST unspecifiedframe$())#29, max(value2) OVER (PARTITION BY key ORDER BY value ASC NULLS FIRST, value1 ASC NULLS FIRST unspecifiedframe$())#30, avg(value3) OVER (PARTITION BY key ORDER BY value ASC NULLS FIRST, value1 ASC NULLS FIRST, value2 ASC NULLS FIRST unspecifiedframe$())#31]
> +- Window [max(value2#19) windowspecdefinition(key#16, value#17 ASC NULLS FIRST, value1#18 ASC NULLS FIRST, specifiedwindowframe(RangeFrame, unboundedpreceding$(), currentrow$())) AS max(value2) OVER (PARTITION BY key ORDER BY value ASC NULLS FIRST, value1 ASC NULLS FIRST unspecifiedframe$())#30], [key#16], [value#17 ASC NULLS FIRST, value1#18 ASC NULLS FIRST]
> +- *(4) Project [key#16, value1#18, value#17, value2#19, sum(value1) OVER (PARTITION BY key ORDER BY value ASC NULLS FIRST unspecifiedframe$())#29, avg(value3) OVER (PARTITION BY key ORDER BY value ASC NULLS FIRST, value1 ASC NULLS FIRST, value2 ASC NULLS FIRST unspecifiedframe$())#31]
> +- Window [avg(value3#20) windowspecdefinition(key#16, value#17 ASC NULLS FIRST, value1#18 ASC NULLS FIRST, value2#19 ASC NULLS FIRST, specifiedwindowframe(RangeFrame, unboundedpreceding$(), currentrow$())) AS avg(value3) OVER (PARTITION BY key ORDER BY value ASC NULLS FIRST, value1 ASC NULLS FIRST, value2 ASC NULLS FIRST unspecifiedframe$())#31], [key#16], [value#17 ASC NULLS FIRST, value1#18 ASC NULLS FIRST, value2#19 ASC NULLS FIRST]
> +- *(3) Sort [key#16 ASC NULLS FIRST, value#17 ASC NULLS FIRST, value1#18 ASC NULLS FIRST, value2#19 ASC NULLS FIRST], false, 0
> +- Window [sum(value1#18) windowspecdefinition(key#16, value#17 ASC NULLS FIRST, specifiedwindowframe(RangeFrame, unboundedpreceding$(), currentrow$())) AS sum(value1) OVER (PARTITION BY key ORDER BY value ASC NULLS FIRST unspecifiedframe$())#29], [key#16], [value#17 ASC NULLS FIRST]
> +- *(2) Sort [key#16 ASC NULLS FIRST, value#17 ASC NULLS FIRST], false, 0
> +- Exchange hashpartitioning(key#16, 5)
> +- *(1) Project [_1#5 AS key#16, _3#7 AS value1#18, _2#6 AS value#17, _4#8 AS value2#19, _5#9 AS value3#20]
> +- LocalTableScan [_1#5, _2#6, _3#7, _4#8, _5#9]
>
> After this PR:
> *(5) Project [key#16, sum(value1) OVER (PARTITION BY key ORDER BY value ASC NULLS FIRST unspecifiedframe$())#29, max(value2) OVER (PARTITION BY key ORDER BY value ASC NULLS FIRST, value1 ASC NULLS FIRST unspecifiedframe$())#30, avg(value3) OVER (PARTITION BY key ORDER BY value ASC NULLS FIRST, value1 ASC NULLS FIRST, value2 ASC NULLS FIRST unspecifiedframe$())#31]
> +- Window [sum(value1#18) windowspecdefinition(key#16, value#17 ASC NULLS FIRST, specifiedwindowframe(RangeFrame, unboundedpreceding$(), currentrow$())) AS sum(value1) OVER (PARTITION BY key ORDER BY value ASC NULLS FIRST unspecifiedframe$())#29], [key#16], [value#17 ASC NULLS FIRST]
> +- *(4) Project [key#16, value1#18, value#17, avg(value3) OVER (PARTITION BY key ORDER BY value ASC NULLS FIRST, value1 ASC NULLS FIRST, value2 ASC NULLS FIRST unspecifiedframe$())#31, max(value2) OVER (PARTITION BY key ORDER BY value ASC NULLS FIRST, value1 ASC NULLS FIRST unspecifiedframe$())#30]
> +- Window [max(value2#19) windowspecdefinition(key#16, value#17 ASC NULLS FIRST, value1#18 ASC NULLS FIRST, specifiedwindowframe(RangeFrame, unboundedpreceding$(), currentrow$())) AS max(value2) OVER (PARTITION BY key ORDER BY value ASC NULLS FIRST, value1 ASC NULLS FIRST unspecifiedframe$())#30], [key#16], [value#17 ASC NULLS FIRST, value1#18 ASC NULLS FIRST]
> +- *(3) Project [key#16, value1#18, value#17, value2#19, avg(value3) OVER (PARTITION BY key ORDER BY value ASC NULLS FIRST, value1 ASC NULLS FIRST, value2 ASC NULLS FIRST unspecifiedframe$())#31]
> +- Window [avg(value3#20) windowspecdefinition(key#16, value#17 ASC NULLS FIRST, value1#18 ASC NULLS FIRST, value2#19 ASC NULLS FIRST, specifiedwindowframe(RangeFrame, unboundedpreceding$(), currentrow$())) AS avg(value3) OVER (PARTITION BY key ORDER BY value ASC NULLS FIRST, value1 ASC NULLS FIRST, value2 ASC NULLS FIRST unspecifiedframe$())#31], [key#16], [value#17 ASC NULLS FIRST, value1#18 ASC NULLS FIRST, value2#19 ASC NULLS FIRST]
> +- *(2) Sort [key#16 ASC NULLS FIRST, value#17 ASC NULLS FIRST, value1#18 ASC NULLS FIRST, value2#19 ASC NULLS FIRST], false, 0
> +- Exchange hashpartitioning(key#16, 5)
> +- *(1) Project [_1#5 AS key#16, _3#7 AS value1#18, _2#6 AS value#17, _4#8 AS value2#19, _5#9 AS value3#20]
> +- LocalTableScan [_1#5, _2#6, _3#7, _4#8, _5#9]
>
>
--
This message was sent by Atlassian JIRA
(v7.6.3#76005)
---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscribe@spark.apache.org
For additional commands, e-mail: issues-help@spark.apache.org