You are viewing a plain text version of this content. The canonical link for it is here.
Posted to issues@spark.apache.org by "Apache Spark (Jira)" <ji...@apache.org> on 2022/10/14 04:44:00 UTC
[jira] [Commented] (SPARK-40382) Reduce projections in Expand when multiple distinct aggregations have semantically equivalent children
[ https://issues.apache.org/jira/browse/SPARK-40382?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17617466#comment-17617466 ]
Apache Spark commented on SPARK-40382:
--------------------------------------
User 'HyukjinKwon' has created a pull request for this issue:
https://github.com/apache/spark/pull/38250
> Reduce projections in Expand when multiple distinct aggregations have semantically equivalent children
> ------------------------------------------------------------------------------------------------------
>
> Key: SPARK-40382
> URL: https://issues.apache.org/jira/browse/SPARK-40382
> Project: Spark
> Issue Type: Improvement
> Components: SQL
> Affects Versions: 3.4.0
> Reporter: Bruce Robbins
> Assignee: Bruce Robbins
> Priority: Major
> Fix For: 3.4.0
>
>
> In RewriteDistinctAggregates, when grouping aggregate expressions by function children, we should treat children that are semantically equivalent as the same.
> This proposed change potentially reduces the number of projections in the Expand operator added to a plan. In some cases, it may eliminate the need for an Expand operator.
> Example: In the following query, the Expand operator creates 3*n rows (where n is the number of incoming rows) because it has a projection for function children b + 1, 1 + b and c.
> {noformat}
> create or replace temp view v1 as
> select * from values
> (1, 2, 3.0),
> (1, 3, 4.0),
> (2, 4, 2.5),
> (2, 3, 1.0)
> v1(a, b, c);
> select
> a,
> count(distinct b + 1),
> avg(distinct 1 + b) filter (where c > 0),
> sum(c)
> from
> v1
> group by a;
> {noformat}
> The Expand operator has three projections (each producing a row for each incoming row):
> {noformat}
> [a#87, null, null, 0, null, UnscaledValue(c#89)], <== projection #1 (for regular aggregation)
> [a#87, (b#88 + 1), null, 1, null, null], <== projection #2 (for distinct aggregation of b + 1)
> [a#87, null, (1 + b#88), 2, (c#89 > 0.0), null]], <== projection #3 (for distinct aggregation of 1 + b)
> {noformat}
> In reality, the Expand only needs one projection for 1 + b and b + 1, because they are semantically equivalent.
> With the proposed change, the Expand operator's projections look like this:
> {noformat}
> [a#67, null, 0, null, UnscaledValue(c#69)], <== projection #1 (for regular aggregations)
> [a#67, (b#68 + 1), 1, (c#69 > 0.0), null]], <== projection #2 (for distinct aggregation on b + 1 and 1 + b)
> {noformat}
> With one less projection, Expand produces n*2 rows instead of n*3 rows, but still produces the correct result.
> In the case where all distinct aggregates have semantically equivalent children, the Expand operator is not needed at all.
> Assume this benchmark:
> {noformat}
> runBenchmark("distinct aggregates") {
> val N = 20 << 22
> val benchmark = new Benchmark("distinct aggregates", N, output = output)
> spark.range(N).selectExpr("id % 100 as k", "id % 10 as id1")
> .createOrReplaceTempView("test")
> def f1(): Unit = spark.sql(
> """
> select
> k,
> sum(distinct id1 + 1),
> count(distinct 1 + id1),
> avg(distinct 1 + ID1)
> from
> test
> group by k""").noop()
> benchmark.addCase("all semantically equivalent", numIters = 2) { _ =>
> f1()
> }
> def f2(): Unit = spark.sql(
> """
> select
> k,
> sum(distinct id1 + 1),
> count(distinct 1 + id1),
> avg(distinct 2 + ID1)
> from
> test
> group by k""").noop()
> benchmark.addCase("some semantically equivalent", numIters = 2) { _ =>
> f2()
> }
> def f3(): Unit = spark.sql(
> """
> select
> k,
> sum(distinct id1 + 1),
> count(distinct 3 + id1),
> avg(distinct 2 + ID1)
> from
> test
> group by k""").noop()
> benchmark.addCase("none semantically equivalent", numIters = 2) { _ =>
> f3()
> }
> benchmark.run()
> }
> {noformat}
> Before the change:
> {noformat}
> [info] distinct aggregates: Best Time(ms) Avg Time(ms) Stdev(ms) Rate(M/s) Per Row(ns) Relative
> [info] ------------------------------------------------------------------------------------------------------------------------
> [info] all semantically equivalent 14721 14859 195 5.7 175.5 1.0X
> [info] some semantically equivalent 14569 14572 5 5.8 173.7 1.0X
> [info] none semantically equivalent 14408 14488 113 5.8 171.8 1.0X
> {noformat}
> After the proposed change:
> {noformat}
> [info] distinct aggregates: Best Time(ms) Avg Time(ms) Stdev(ms) Rate(M/s) Per Row(ns) Relative
> [info] ------------------------------------------------------------------------------------------------------------------------
> [info] all semantically equivalent 3658 3692 49 22.9 43.6 1.0X
> [info] some semantically equivalent 9124 9214 127 9.2 108.8 0.4X
> [info] none semantically equivalent 14601 14777 250 5.7 174.1 0.3X
> {noformat}
--
This message was sent by Atlassian Jira
(v8.20.10#820010)
---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscribe@spark.apache.org
For additional commands, e-mail: issues-help@spark.apache.org