You are viewing a plain text version of this content. The canonical link for it is here.
Posted to issues@spark.apache.org by "Eyal Farago (JIRA)" <ji...@apache.org> on 2018/09/14 11:38:00 UTC

[jira] [Commented] (SPARK-24410) Missing optimization for Union on bucketed tables

    [ https://issues.apache.org/jira/browse/SPARK-24410?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16614678#comment-16614678 ] 

Eyal Farago commented on SPARK-24410:
-------------------------------------

[~viirya], I've opened SPARK-25203 because of your answer on August 13th.
however looking at your PR it seems you opted to solve the general case, and bucketed table is just a private case of the general case.
do you think SPARK-25203 should be closed as a duplicate of this? if so can you please add the example used there to this issue as well?

> Missing optimization for Union on bucketed tables
> -------------------------------------------------
>
>                 Key: SPARK-24410
>                 URL: https://issues.apache.org/jira/browse/SPARK-24410
>             Project: Spark
>          Issue Type: Improvement
>          Components: SQL
>    Affects Versions: 2.3.0
>            Reporter: Ohad Raviv
>            Priority: Major
>
> A common use-case we have is of a partially aggregated table and daily increments that we need to further aggregate. we do this my unioning the two tables and aggregating again.
> we tried to optimize this process by bucketing the tables, but currently it seems that the union operator doesn't leverage the tables being bucketed (like the join operator).
> for example, for two bucketed tables a1,a2:
> {code}
>     sparkSession.range(N).selectExpr(
>       "id as key",
>       "id % 2 as t1",
>       "id % 3 as t2")
>         .repartition(col("key"))
>         .write
>       .mode(SaveMode.Overwrite)
>         .bucketBy(3, "key")
>         .sortBy("t1")
>         .saveAsTable("a1")
>     sparkSession.range(N).selectExpr(
>       "id as key",
>       "id % 2 as t1",
>       "id % 3 as t2")
>       .repartition(col("key"))
>       .write.mode(SaveMode.Overwrite)
>       .bucketBy(3, "key")
>       .sortBy("t1")
>       .saveAsTable("a2")
> {code}
> for the join query we get the "SortMergeJoin"
> {code}
> select * from a1 join a2 on (a1.key=a2.key)
> == Physical Plan ==
> *(3) SortMergeJoin [key#24L], [key#27L], Inner
> :- *(1) Sort [key#24L ASC NULLS FIRST], false, 0
> :  +- *(1) Project [key#24L, t1#25L, t2#26L]
> :     +- *(1) Filter isnotnull(key#24L)
> :        +- *(1) FileScan parquet default.a1[key#24L,t1#25L,t2#26L] Batched: true, Format: Parquet, Location: InMemoryFileIndex[file:/some/where/spark-warehouse/a1], PartitionFilters: [], PushedFilters: [IsNotNull(key)], ReadSchema: struct<key:bigint,t1:bigint,t2:bigint>
> +- *(2) Sort [key#27L ASC NULLS FIRST], false, 0
>    +- *(2) Project [key#27L, t1#28L, t2#29L]
>       +- *(2) Filter isnotnull(key#27L)
>          +- *(2) FileScan parquet default.a2[key#27L,t1#28L,t2#29L] Batched: true, Format: Parquet, Location: InMemoryFileIndex[file:/some/where/spark-warehouse/a2], PartitionFilters: [], PushedFilters: [IsNotNull(key)], ReadSchema: struct<key:bigint,t1:bigint,t2:bigint>
> {code}
> but for aggregation after union we get a shuffle:
> {code}
> select key,count(*) from (select * from a1 union all select * from a2)z group by key
> == Physical Plan ==
> *(4) HashAggregate(keys=[key#25L], functions=[count(1)], output=[key#25L, count(1)#36L])
> +- Exchange hashpartitioning(key#25L, 1)
>    +- *(3) HashAggregate(keys=[key#25L], functions=[partial_count(1)], output=[key#25L, count#38L])
>       +- Union
>          :- *(1) Project [key#25L]
>          :  +- *(1) FileScan parquet default.a1[key#25L] Batched: true, Format: Parquet, Location: InMemoryFileIndex[file:/some/where/spark-warehouse/a1], PartitionFilters: [], PushedFilters: [], ReadSchema: struct<key:bigint>
>          +- *(2) Project [key#28L]
>             +- *(2) FileScan parquet default.a2[key#28L] Batched: true, Format: Parquet, Location: InMemoryFileIndex[file:/some/where/spark-warehouse/a2], PartitionFilters: [], PushedFilters: [], ReadSchema: struct<key:bigint>
> {code}



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)

---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscribe@spark.apache.org
For additional commands, e-mail: issues-help@spark.apache.org