You are viewing a plain text version of this content. The canonical link for it is here.
Posted to issues@spark.apache.org by "Josh Rosen (JIRA)" <ji...@apache.org> on 2019/04/26 02:10:00 UTC

[jira] [Updated] (SPARK-27573) Collapse adjacent physical aggregate operators when possible

     [ https://issues.apache.org/jira/browse/SPARK-27573?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ]

Josh Rosen updated SPARK-27573:
-------------------------------
    Summary: Collapse adjacent physical aggregate operators when possible  (was: Collapse adjacent aggregate physical operators when possible)

> Collapse adjacent physical aggregate operators when possible
> ------------------------------------------------------------
>
>                 Key: SPARK-27573
>                 URL: https://issues.apache.org/jira/browse/SPARK-27573
>             Project: Spark
>          Issue Type: Improvement
>          Components: Optimizer, SQL
>    Affects Versions: 2.4.0
>            Reporter: Josh Rosen
>            Priority: Major
>
> When an aggregation requires a shuffle, Spark SQL performs separate partial and final aggregations:
> {code:java}
> sql("select id % 100 as k, id as v from range(100000)")
>   .groupBy("k")
>   .sum("v")
>   .explain
> == Physical Plan ==
> *(2) HashAggregate(keys=[k#64L], functions=[sum(v#65L)])
> +- Exchange(coordinator id: 2031684357) hashpartitioning(k#64L, 5340), coordinator[target post-shuffle partition size: 67108864]
>    +- *(1) HashAggregate(keys=[k#64L], functions=[partial_sum(v#65L)])
>       +- *(1) Project [(id#66L % 100) AS k#64L, id#66L AS v#65L]
>          +- *(1) Range (0, 100000, step=1, splits=10)
> {code}
> However, consider what happens if the dataset being aggregated is already pre-partitioned by the aggregate's grouping columns:
> {code:java}
> sql("select id % 100 as k, id as v from range(100000)")
>   .repartition(10, $"k")
>   .groupBy("k")
>   .sum("v")
>   .explain
> == Physical Plan ==
> *(2) HashAggregate(keys=[k#50L], functions=[sum(v#51L)], output=[k#50L, sum(v)#58L])
> +- *(2) HashAggregate(keys=[k#50L], functions=[partial_sum(v#51L)], output=[k#50L, sum#63L])
>    +- Exchange(coordinator id: 39015877) hashpartitioning(k#50L, 10), coordinator[target post-shuffle partition size: 67108864]
>       +- *(1) Project [(id#52L % 100) AS k#50L, id#52L AS v#51L]
>          +- *(1) Range (0, 100000, step=1, splits=10) 
> {code}
> Here, we end up with back-to-back HashAggregate operators which are performed as part of the same stage.
> For certain aggregates (e.g. _sum_, _count_), this duplication is unnecessary: we could have just performed a total aggregation instead!
> The duplicate aggregate is problematic in cases where the aggregate inputs and outputs are the same order of magnitude (e.g.counting the number of duplicate records in a dataset where duplicates are extremely rare).
> My motivation for this optimization is similar to SPARK-1412: I know that partial aggregation doesn't help for my workload, so I want to somehow coerce Spark into skipping the ineffective partial aggregation and jumping directly to total aggregation.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)

---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscribe@spark.apache.org
For additional commands, e-mail: issues-help@spark.apache.org