You are viewing a plain text version of this content. The canonical link for it is here.
Posted to issues@spark.apache.org by "Marco Gaido (JIRA)" <ji...@apache.org> on 2018/07/25 10:09:00 UTC

[jira] [Commented] (SPARK-24904) Join with broadcasted dataframe causes shuffle of redundant data

    [ https://issues.apache.org/jira/browse/SPARK-24904?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16555477#comment-16555477 ] 

Marco Gaido commented on SPARK-24904:
-------------------------------------

You cannot do a broadcast join when it is on the side of the small table, as the join requires to compare each row of the small table with the whole big table and output it into the result if it is not met. Since the big table is available only in small pieces in each task, no task can determine whether the row matched at least once (as it doesn't know what other tasks did).

> Join with broadcasted dataframe causes shuffle of redundant data
> ----------------------------------------------------------------
>
>                 Key: SPARK-24904
>                 URL: https://issues.apache.org/jira/browse/SPARK-24904
>             Project: Spark
>          Issue Type: Question
>          Components: SQL
>    Affects Versions: 2.1.2
>            Reporter: Shay Elbaz
>            Priority: Minor
>
> When joining a "large" dataframe with broadcasted small one, and join-type is on the small DF side (see right-join below), the physical plan does not include broadcasting the small table. But when the join is on the large DF side, the broadcast does take place. Is there a good reason for this? In the below example it sure doesn't make any sense to shuffle the entire large table:
>  
> {code:java}
> val small = spark.range(1, 10)
> val big = spark.range(1, 1 << 30)
>   .withColumnRenamed("id", "id2")
> big.join(broadcast(small), $"id" === $"id2", "right")
> .explain
> //OUTPUT:
> == Physical Plan == 
> SortMergeJoin [id2#16307L], [id#16310L], RightOuter 
> :- *Sort [id2#16307L ASC NULLS FIRST], false, 0
>  :  +- Exchange hashpartitioning(id2#16307L, 1000)
>  :     +- *Project [id#16304L AS id2#16307L]
>  :        +- *Range (1, 1073741824, step=1, splits=Some(600))
>  +- *Sort [id#16310L ASC NULLS FIRST], false, 0
>     +- Exchange hashpartitioning(id#16310L, 1000)
>        +- *Range (1, 10, step=1, splits=Some(600))
> {code}
> As a workaround, users need to perform inner instead of right join, and then join the result back with the small DF to fill the missing rows.
>  
>  
>  
>  



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)

---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscribe@spark.apache.org
For additional commands, e-mail: issues-help@spark.apache.org