You are viewing a plain text version of this content. The canonical link for it is here.
Posted to issues@spark.apache.org by "Michael Tong (Jira)" <ji...@apache.org> on 2022/10/17 22:47:00 UTC
[jira] [Created] (SPARK-40824) Certain aggregations cause extra exchange steps on unioned and bucketed tables
Michael Tong created SPARK-40824:
------------------------------------
Summary: Certain aggregations cause extra exchange steps on unioned and bucketed tables
Key: SPARK-40824
URL: https://issues.apache.org/jira/browse/SPARK-40824
Project: Spark
Issue Type: Bug
Components: Spark Core
Affects Versions: 3.3.0
Reporter: Michael Tong
An extension to https://issues.apache.org/jira/browse/SPARK-22898
Currently working on a POC where we store aggregations of features across different datasets. I have noticed that when you try to do certain aggregation operations across multiple tables, spark will introduce an extra exchange step
{code:java}
# initializing the tables
sql("""
CREATE TABLE t1 (`id` BIGINT, `value` INT)
USING PARQUET
CLUSTERED BY (id)
INTO 1 BUCKETS
""")
sql("""
CREATE TABLE t2 (`id` BIGINT, `value` INT)
USING PARQUET
CLUSTERED BY (id)
INTO 1 BUCKETS
""")
sql("INSERT INTO TABLE t1 VALUES(1, 2)")
sql("INSERT INTO TABLE t2 VALUES(1, 3)")
# aggregation, note the exchange after the union operation
sql("""
SELECT id, COUNT(*)
FROM (SELECT id FROM t1 UNION SELECT id FROM t2)
GROUP BY id
""").explain()
== Physical Plan ==
AdaptiveSparkPlan isFinalPlan=false
+- HashAggregate(keys=[id#92L], functions=[count(1)])
+- HashAggregate(keys=[id#92L], functions=[partial_count(1)])
+- HashAggregate(keys=[id#92L], functions=[])
+- Exchange hashpartitioning(id#92L, 100), ENSURE_REQUIREMENTS, [id=#202]
+- HashAggregate(keys=[id#92L], functions=[])
+- Union
:- FileScan parquet default.t1[id#92L] Batched: true, Bucketed: true, DataFilters: [], Format: Parquet, Location: InMemoryFileIndex(1 paths)[s3a://spark-warehouse/t1], PartitionFilters: [], PushedFilters: [], ReadSchema: struct<id:bigint>, SelectedBucketsCount: 1 out of 1
+- FileScan parquet default.t2[id#94L] Batched: true, Bucketed: true, DataFilters: [], Format: Parquet, Location: InMemoryFileIndex(1 paths)[s3a://spark-warehouse/t2], PartitionFilters: [], PushedFilters: [], ReadSchema: struct<id:bigint>, SelectedBucketsCount: 1 out of 1
{code}
This seems like an issue with the query optimizer because if you use a different set and order of operations (in this case groupby/count on individual tables, join the tables, then infer the union count from the joined values), you get a query plan that doesn't have this exchange step
{code:java}
sql("""
SELECT t1_agg.id, t1_agg.count + t2_agg.count as count
FROM (SELECT id, COUNT(*) as count from t1 GROUP BY id) as t1_agg
JOIN (SELECT id, COUNT(*) as count from t2 GROUP BY id) as t2_agg ON t1_agg.id=t2_agg.id
""").explain()
# note the lack of an exchange step
== Physical Plan ==
AdaptiveSparkPlan isFinalPlan=false
+- Project [id#92L, (count#121L + count#122L) AS count#123L]
+- SortMergeJoin [id#92L], [id#94L], Inner
:- Sort [id#92L ASC NULLS FIRST], false, 0
: +- HashAggregate(keys=[id#92L], functions=[count(1)])
: +- HashAggregate(keys=[id#92L], functions=[partial_count(1)])
: +- Filter isnotnull(id#92L)
: +- FileScan parquet default.t1[id#92L] Batched: true, Bucketed: true, DataFilters: [isnotnull(id#92L)], Format: Parquet, Location: InMemoryFileIndex(1 paths)[s3a://spark-warehouse/t1], PartitionFilters: [], PushedFilters: [IsNotNull(id)], ReadSchema: struct<id:bigint>, SelectedBucketsCount: 1 out of 1
+- Sort [id#94L ASC NULLS FIRST], false, 0
+- HashAggregate(keys=[id#94L], functions=[count(1)])
+- HashAggregate(keys=[id#94L], functions=[partial_count(1)])
+- Filter isnotnull(id#94L)
+- FileScan parquet default.t2[id#94L] Batched: true, Bucketed: true, DataFilters: [isnotnull(id#94L)], Format: Parquet, Location: InMemoryFileIndex(1 paths)[s3a://spark-warehouse/t2], PartitionFilters: [], PushedFilters: [IsNotNull(id)], ReadSchema: struct<id:bigint>, SelectedBucketsCount: 1 out of 1 {code}
It feels like the first union->aggregate query should not have an exchange step similar to the second one.
--
This message was sent by Atlassian Jira
(v8.20.10#820010)
---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscribe@spark.apache.org
For additional commands, e-mail: issues-help@spark.apache.org