You are viewing a plain text version of this content. The canonical link for it is here.
Posted to issues@spark.apache.org by "Apache Spark (Jira)" <ji...@apache.org> on 2022/10/14 04:44:00 UTC

[jira] [Commented] (SPARK-40382) Reduce projections in Expand when multiple distinct aggregations have semantically equivalent children

    [ https://issues.apache.org/jira/browse/SPARK-40382?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17617466#comment-17617466 ] 

Apache Spark commented on SPARK-40382:
--------------------------------------

User 'HyukjinKwon' has created a pull request for this issue:
https://github.com/apache/spark/pull/38250

> Reduce projections in Expand when multiple distinct aggregations have semantically equivalent children
> ------------------------------------------------------------------------------------------------------
>
>                 Key: SPARK-40382
>                 URL: https://issues.apache.org/jira/browse/SPARK-40382
>             Project: Spark
>          Issue Type: Improvement
>          Components: SQL
>    Affects Versions: 3.4.0
>            Reporter: Bruce Robbins
>            Assignee: Bruce Robbins
>            Priority: Major
>             Fix For: 3.4.0
>
>
> In RewriteDistinctAggregates, when grouping aggregate expressions by function children, we should treat children that are semantically equivalent as the same.
> This proposed change potentially reduces the number of projections in the Expand operator added to a plan. In some cases, it may eliminate the need for an Expand operator.
> Example: In the following query, the Expand operator creates 3*n rows (where n is the number of incoming rows) because it has a projection for function children b + 1, 1 + b and c.
> {noformat}
> create or replace temp view v1 as
> select * from values
> (1, 2, 3.0),
> (1, 3, 4.0),
> (2, 4, 2.5),
> (2, 3, 1.0)
> v1(a, b, c);
> select
>   a,
>   count(distinct b + 1),
>   avg(distinct 1 + b) filter (where c > 0),
>   sum(c)
> from
>   v1
> group by a;
> {noformat}
> The Expand operator has three projections (each producing a row for each incoming row):
> {noformat}
> [a#87, null, null, 0, null, UnscaledValue(c#89)], <== projection #1 (for regular aggregation)
> [a#87, (b#88 + 1), null, 1, null, null],          <== projection #2 (for distinct aggregation of b + 1)
> [a#87, null, (1 + b#88), 2, (c#89 > 0.0), null]], <== projection #3 (for distinct aggregation of 1 + b)
> {noformat}
> In reality, the Expand only needs one projection for 1 + b and b + 1, because they are semantically equivalent.
> With the proposed change, the Expand operator's projections look like this:
> {noformat}
> [a#67, null, 0, null, UnscaledValue(c#69)],  <== projection #1 (for regular aggregations)
> [a#67, (b#68 + 1), 1, (c#69 > 0.0), null]],  <== projection #2 (for distinct aggregation on b + 1 and 1 + b)
> {noformat}
> With one less projection, Expand produces n*2 rows instead of n*3 rows, but still produces the correct result.
> In the case where all distinct aggregates have semantically equivalent children, the Expand operator is not needed at all.
> Assume this benchmark:
> {noformat}
>     runBenchmark("distinct aggregates") {
>       val N = 20 << 22
>       val benchmark = new Benchmark("distinct aggregates", N, output = output)
>       spark.range(N).selectExpr("id % 100 as k", "id % 10 as id1")
>         .createOrReplaceTempView("test")
>       def f1(): Unit = spark.sql(
>         """
>           select
>             k,
>             sum(distinct id1 + 1),
>             count(distinct 1 + id1),
>             avg(distinct 1 + ID1)
>           from
>             test
>           group by k""").noop()
>       benchmark.addCase("all semantically equivalent", numIters = 2) { _ =>
>         f1()
>       }
>       def f2(): Unit = spark.sql(
>         """
>           select
>             k,
>             sum(distinct id1 + 1),
>             count(distinct 1 + id1),
>             avg(distinct 2 + ID1)
>           from
>             test
>           group by k""").noop()
>       benchmark.addCase("some semantically equivalent", numIters = 2) { _ =>
>         f2()
>       }
>       def f3(): Unit = spark.sql(
>         """
>           select
>             k,
>             sum(distinct id1 + 1),
>             count(distinct 3 + id1),
>             avg(distinct 2 + ID1)
>           from
>             test
>           group by k""").noop()
>       benchmark.addCase("none semantically equivalent", numIters = 2) { _ =>
>         f3()
>       }
>       benchmark.run()
>     }
>  {noformat}
> Before the change:
> {noformat}
> [info] distinct aggregates:                      Best Time(ms)   Avg Time(ms)   Stdev(ms)    Rate(M/s)   Per Row(ns)   Relative
> [info] ------------------------------------------------------------------------------------------------------------------------
> [info] all semantically equivalent                       14721          14859         195          5.7         175.5       1.0X
> [info] some semantically equivalent                      14569          14572           5          5.8         173.7       1.0X
> [info] none semantically equivalent                      14408          14488         113          5.8         171.8       1.0X
> {noformat}
> After the proposed change:
> {noformat}
> [info] distinct aggregates:                      Best Time(ms)   Avg Time(ms)   Stdev(ms)    Rate(M/s)   Per Row(ns)   Relative
> [info] ------------------------------------------------------------------------------------------------------------------------
> [info] all semantically equivalent                        3658           3692          49         22.9          43.6       1.0X
> [info] some semantically equivalent                       9124           9214         127          9.2         108.8       0.4X
> [info] none semantically equivalent                      14601          14777         250          5.7         174.1       0.3X
> {noformat}



--
This message was sent by Atlassian Jira
(v8.20.10#820010)

---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscribe@spark.apache.org
For additional commands, e-mail: issues-help@spark.apache.org