You are viewing a plain text version of this content. The canonical link for it is here.
Posted to issues@spark.apache.org by "Michael Tong (Jira)" <ji...@apache.org> on 2022/10/17 22:47:00 UTC

[jira] [Created] (SPARK-40824) Certain aggregations cause extra exchange steps on unioned and bucketed tables

Michael Tong created SPARK-40824:
------------------------------------

             Summary: Certain aggregations cause extra exchange steps on unioned and bucketed tables
                 Key: SPARK-40824
                 URL: https://issues.apache.org/jira/browse/SPARK-40824
             Project: Spark
          Issue Type: Bug
          Components: Spark Core
    Affects Versions: 3.3.0
            Reporter: Michael Tong


An extension to https://issues.apache.org/jira/browse/SPARK-22898

 

Currently working on a POC where we store aggregations of features across different datasets. I have noticed that when you try to do certain aggregation operations across multiple tables, spark will introduce an extra exchange step

 
{code:java}
# initializing the tables
sql("""
    CREATE TABLE t1 (`id` BIGINT, `value` INT)
    USING PARQUET
    CLUSTERED BY (id)
    INTO 1 BUCKETS
    """)
sql("""
    CREATE TABLE t2 (`id` BIGINT, `value` INT)
    USING PARQUET
    CLUSTERED BY (id)
    INTO 1 BUCKETS
    """)
sql("INSERT INTO TABLE t1 VALUES(1, 2)")
sql("INSERT INTO TABLE t2 VALUES(1, 3)")

# aggregation, note the exchange after the union operation
sql("""
    SELECT id, COUNT(*)
    FROM (SELECT id FROM t1 UNION SELECT id FROM t2)
    GROUP BY id
    """).explain()

== Physical Plan ==
AdaptiveSparkPlan isFinalPlan=false
+- HashAggregate(keys=[id#92L], functions=[count(1)])
   +- HashAggregate(keys=[id#92L], functions=[partial_count(1)])
      +- HashAggregate(keys=[id#92L], functions=[])
         +- Exchange hashpartitioning(id#92L, 100), ENSURE_REQUIREMENTS, [id=#202]
            +- HashAggregate(keys=[id#92L], functions=[])
               +- Union
                  :- FileScan parquet default.t1[id#92L] Batched: true, Bucketed: true, DataFilters: [], Format: Parquet, Location: InMemoryFileIndex(1 paths)[s3a://spark-warehouse/t1], PartitionFilters: [], PushedFilters: [], ReadSchema: struct<id:bigint>, SelectedBucketsCount: 1 out of 1
                  +- FileScan parquet default.t2[id#94L] Batched: true, Bucketed: true, DataFilters: [], Format: Parquet, Location: InMemoryFileIndex(1 paths)[s3a://spark-warehouse/t2], PartitionFilters: [], PushedFilters: [], ReadSchema: struct<id:bigint>, SelectedBucketsCount: 1 out of 1
 {code}
This seems like an issue with the query optimizer because if you use a different set and order of operations (in this case groupby/count on individual tables, join the tables, then infer the union count from the joined values), you get a query plan that doesn't have this exchange step
{code:java}
sql("""
    SELECT t1_agg.id, t1_agg.count + t2_agg.count as count
    FROM (SELECT id, COUNT(*) as count from t1 GROUP BY id) as t1_agg
    JOIN (SELECT id, COUNT(*) as count from t2 GROUP BY id) as t2_agg ON t1_agg.id=t2_agg.id
""").explain()

# note the lack of an exchange step
== Physical Plan ==
AdaptiveSparkPlan isFinalPlan=false
+- Project [id#92L, (count#121L + count#122L) AS count#123L]
   +- SortMergeJoin [id#92L], [id#94L], Inner
      :- Sort [id#92L ASC NULLS FIRST], false, 0
      :  +- HashAggregate(keys=[id#92L], functions=[count(1)])
      :     +- HashAggregate(keys=[id#92L], functions=[partial_count(1)])
      :        +- Filter isnotnull(id#92L)
      :           +- FileScan parquet default.t1[id#92L] Batched: true, Bucketed: true, DataFilters: [isnotnull(id#92L)], Format: Parquet, Location: InMemoryFileIndex(1 paths)[s3a://spark-warehouse/t1], PartitionFilters: [], PushedFilters: [IsNotNull(id)], ReadSchema: struct<id:bigint>, SelectedBucketsCount: 1 out of 1
      +- Sort [id#94L ASC NULLS FIRST], false, 0
         +- HashAggregate(keys=[id#94L], functions=[count(1)])
            +- HashAggregate(keys=[id#94L], functions=[partial_count(1)])
               +- Filter isnotnull(id#94L)
                  +- FileScan parquet default.t2[id#94L] Batched: true, Bucketed: true, DataFilters: [isnotnull(id#94L)], Format: Parquet, Location: InMemoryFileIndex(1 paths)[s3a://spark-warehouse/t2], PartitionFilters: [], PushedFilters: [IsNotNull(id)], ReadSchema: struct<id:bigint>, SelectedBucketsCount: 1 out of 1 {code}
It feels like the first union->aggregate query should not have an exchange step similar to the second one.

 



--
This message was sent by Atlassian Jira
(v8.20.10#820010)

---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscribe@spark.apache.org
For additional commands, e-mail: issues-help@spark.apache.org