You are viewing a plain text version of this content. The canonical link for it is here.
Posted to issues@spark.apache.org by "Eyal Farago (JIRA)" <ji...@apache.org> on 2018/08/22 20:57:00 UTC

[jira] [Commented] (SPARK-25203) spark sql, union all does not propagate child partitioning (when possible)

    [ https://issues.apache.org/jira/browse/SPARK-25203?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16589332#comment-16589332 ] 

Eyal Farago commented on SPARK-25203:
-------------------------------------

CC: [~hvanhovell], [~cloud_fan]

> spark sql, union all does not propagate child partitioning (when possible)
> --------------------------------------------------------------------------
>
>                 Key: SPARK-25203
>                 URL: https://issues.apache.org/jira/browse/SPARK-25203
>             Project: Spark
>          Issue Type: Bug
>          Components: Optimizer, SQL
>    Affects Versions: 2.2.0, 2.3.0, 2.4.0
>            Reporter: Eyal Farago
>            Priority: Major
>
> in spark-sql, union all does not propagate partitioning when all child plans have the same partitioning, this causes introduction of non necessary Exchange nodes when parent operator requires a distribution satisfied by this partitioning.
>  
> {code:java}
> CREATE OR REPLACE TEMPORARY VIEW t1 AS VALUES (1, 'a'), (2, 'b') tbl(c1, c2);
> CREATE OR REPLACE TEMPORARY VIEW t1D1 AS select c1, c2 from t1 distribute by c1;
> CREATE OR REPLACE TEMPORARY VIEW t1D2 AS select c1 + 1 as c11, c2 from t1 distribute by c11;
> create or REPLACE TEMPORARY VIEW t1DU as
> select * from t1D1
> UNION ALL
> select * from t1D2;
> EXPLAIN select * from t1DU distribute by c1;
> == Physical Plan ==
> Exchange hashpartitioning(c1#x, 200)
> +- Union
>    :- Exchange hashpartitioning(c1#x, 200)
>    :  +- LocalTableScan [c1#x, c2#x]
>    +- Exchange hashpartitioning(c11#x, 200)
>       +- LocalTableScan [c11#x, c2#x]
> {code}
> the Exchange introduced in the last query is unnecessary since the unioned data is already partitioned by column _c1_, in fact the equivalent RDD operation identifies this scenario and introduces a PartitionerAwareUnionRDD which maintains children's shared partitioner.
> I suggest modifying modifying org.apache.spark.sql.execution.UnionExec by overriding _outputPartitioning_ in a way that identifies common partitioning among child plans and use that (falling back to default implementation otherwise).
> furthermore, it seems current implementation does not properly clusters data:
> {code:java}
> select *, spark_partition_id() as P  from t1DU distribute by c1
> -- !query 15 schema
> struct<c1:int,c2:string,P:int>
> -- !query 15 output
> 1	a	43
> 2	a	374
> 2	b	174
> 3	b	251
> {code}
> notice _c1=2_ in partitions 174 and 374.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)

---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscribe@spark.apache.org
For additional commands, e-mail: issues-help@spark.apache.org