You are viewing a plain text version of this content. The canonical link for it is here.
Posted to issues@spark.apache.org by "Eyal Farago (JIRA)" <ji...@apache.org> on 2018/08/13 15:49:00 UTC

[jira] [Commented] (SPARK-24410) Missing optimization for Union on bucketed tables

    [ https://issues.apache.org/jira/browse/SPARK-24410?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16578489#comment-16578489 ] 

Eyal Farago commented on SPARK-24410:
-------------------------------------

[~viirya], I think your conclusion about co-partitioning is wrong, the following code segment from your comment:
{code:java}
val df1 = spark.table("a1").select(spark_partition_id(), $"key")
val df2 = spark.table("a2").select(spark_partition_id(), $"key")
df1.union(df2).select(spark_partition_id(), $"key").collect
{code}
this prints the partition ids as assigned by union, assuming union simply concatenates the partitions from df1 and df2 assigning them a running number id, it really makes sense you'd get two partitions per key: one coming from df1 and the other from df2.

applying this select on each dataframe separately you'd get the exact same results meaning a given key will have the same partition id in both dataframes.

I think this code fragment basically shows what's wrong with current implementation of Union, no that we can't optimize unions of co-partitioned relations.

if union was a bit more 'partitioning aware' it'd be able to identify that both children have the same partitioning scheme and 'inherit' it. as you actually showed this might be a bit tricky as the same logical attribute from different children has a different expression id, but Union eventually maps these child attributes into a single output attribute, so this information can be used to resolve the partitioning columns and determine their equality.

furthermore, Union being smarter on its output partitioning won't cut it, few rules have to be added/modified:

1. applying exchange on a union should sometimes be pushed to the children (children can be partitioned to those supporting the required partitioning and others not supporting it, the exchange can be applied to a union of the non-supporting children and then unioned with the rest of the children)
 2. partial aggregate also has to be pushed to the children resulting with a union of partial aggregations, again it's possible to partition children according to their support of the required partitioning.
 3. final aggregation over a union introduces an exchange which will then be pushed to the children, the aggregation is then applied on top of the partitioning aware union (think of the way PartitionerAwareUnionRDD handles partitioning).
 * partition children = partitioning an array by a predicate (scala.collection.TraversableLike#partition)
 * other operators like join may require additional rules.
 * some of this ideas were discussed offline with [~hvanhovell]

> Missing optimization for Union on bucketed tables
> -------------------------------------------------
>
>                 Key: SPARK-24410
>                 URL: https://issues.apache.org/jira/browse/SPARK-24410
>             Project: Spark
>          Issue Type: Improvement
>          Components: SQL
>    Affects Versions: 2.3.0
>            Reporter: Ohad Raviv
>            Priority: Major
>
> A common use-case we have is of a partially aggregated table and daily increments that we need to further aggregate. we do this my unioning the two tables and aggregating again.
> we tried to optimize this process by bucketing the tables, but currently it seems that the union operator doesn't leverage the tables being bucketed (like the join operator).
> for example, for two bucketed tables a1,a2:
> {code}
>     sparkSession.range(N).selectExpr(
>       "id as key",
>       "id % 2 as t1",
>       "id % 3 as t2")
>         .repartition(col("key"))
>         .write
>       .mode(SaveMode.Overwrite)
>         .bucketBy(3, "key")
>         .sortBy("t1")
>         .saveAsTable("a1")
>     sparkSession.range(N).selectExpr(
>       "id as key",
>       "id % 2 as t1",
>       "id % 3 as t2")
>       .repartition(col("key"))
>       .write.mode(SaveMode.Overwrite)
>       .bucketBy(3, "key")
>       .sortBy("t1")
>       .saveAsTable("a2")
> {code}
> for the join query we get the "SortMergeJoin"
> {code}
> select * from a1 join a2 on (a1.key=a2.key)
> == Physical Plan ==
> *(3) SortMergeJoin [key#24L], [key#27L], Inner
> :- *(1) Sort [key#24L ASC NULLS FIRST], false, 0
> :  +- *(1) Project [key#24L, t1#25L, t2#26L]
> :     +- *(1) Filter isnotnull(key#24L)
> :        +- *(1) FileScan parquet default.a1[key#24L,t1#25L,t2#26L] Batched: true, Format: Parquet, Location: InMemoryFileIndex[file:/some/where/spark-warehouse/a1], PartitionFilters: [], PushedFilters: [IsNotNull(key)], ReadSchema: struct<key:bigint,t1:bigint,t2:bigint>
> +- *(2) Sort [key#27L ASC NULLS FIRST], false, 0
>    +- *(2) Project [key#27L, t1#28L, t2#29L]
>       +- *(2) Filter isnotnull(key#27L)
>          +- *(2) FileScan parquet default.a2[key#27L,t1#28L,t2#29L] Batched: true, Format: Parquet, Location: InMemoryFileIndex[file:/some/where/spark-warehouse/a2], PartitionFilters: [], PushedFilters: [IsNotNull(key)], ReadSchema: struct<key:bigint,t1:bigint,t2:bigint>
> {code}
> but for aggregation after union we get a shuffle:
> {code}
> select key,count(*) from (select * from a1 union all select * from a2)z group by key
> == Physical Plan ==
> *(4) HashAggregate(keys=[key#25L], functions=[count(1)], output=[key#25L, count(1)#36L])
> +- Exchange hashpartitioning(key#25L, 1)
>    +- *(3) HashAggregate(keys=[key#25L], functions=[partial_count(1)], output=[key#25L, count#38L])
>       +- Union
>          :- *(1) Project [key#25L]
>          :  +- *(1) FileScan parquet default.a1[key#25L] Batched: true, Format: Parquet, Location: InMemoryFileIndex[file:/some/where/spark-warehouse/a1], PartitionFilters: [], PushedFilters: [], ReadSchema: struct<key:bigint>
>          +- *(2) Project [key#28L]
>             +- *(2) FileScan parquet default.a2[key#28L] Batched: true, Format: Parquet, Location: InMemoryFileIndex[file:/some/where/spark-warehouse/a2], PartitionFilters: [], PushedFilters: [], ReadSchema: struct<key:bigint>
> {code}



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)

---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscribe@spark.apache.org
For additional commands, e-mail: issues-help@spark.apache.org