You are viewing a plain text version of this content. The canonical link for it is here.
Posted to issues@spark.apache.org by "Marco Gaido (JIRA)" <ji...@apache.org> on 2018/07/25 13:29:00 UTC

[jira] [Comment Edited] (SPARK-24904) Join with broadcasted dataframe causes shuffle of redundant data

    [ https://issues.apache.org/jira/browse/SPARK-24904?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16555652#comment-16555652 ] 

Marco Gaido edited comment on SPARK-24904 at 7/25/18 1:28 PM:
--------------------------------------------------------------

I see now what you mean, but yes, I think there is an assumption you are doing which is not always true, ie. "The output is (expected to be) very small compared to the big table". That is not true. If all the rows from the big table match the small one, this is not the case. We may trying to do something like what you mentioned in the optimizer if CBO is enabled and we have good enough statistics about the output size of the inner join, but i am not sure.


was (Author: mgaido):
I see now what you mean, but yes, It think there is an assumption you are doing which is not always true, ie. "The output is (expected to be) very small compared to the big table". That is not true. If all the rows from the big table match the small one, this is not the case. We may trying to do something like what you mentioned in the optimizer if CBO is enabled and we have good enough statistics about the output size of the inner join, but i am not sure.

> Join with broadcasted dataframe causes shuffle of redundant data
> ----------------------------------------------------------------
>
>                 Key: SPARK-24904
>                 URL: https://issues.apache.org/jira/browse/SPARK-24904
>             Project: Spark
>          Issue Type: Improvement
>          Components: SQL
>    Affects Versions: 2.1.2
>            Reporter: Shay Elbaz
>            Priority: Minor
>
> When joining a "large" dataframe with broadcasted small one, and join-type is on the small DF side (see right-join below), the physical plan falls back to sort merge join. But when the join is on the large DF side, the broadcast does take place. Is there a good reason for this? In the below example it sure doesn't make any sense to shuffle the entire large table:
>  
> {code:java}
> val small = spark.range(1, 10)
> val big = spark.range(1, 1 << 30)
>   .withColumnRenamed("id", "id2")
> big.join(broadcast(small), $"id" === $"id2", "right")
> .explain
> //OUTPUT:
> == Physical Plan == 
> SortMergeJoin [id2#16307L], [id#16310L], RightOuter 
> :- *Sort [id2#16307L ASC NULLS FIRST], false, 0
>  :  +- Exchange hashpartitioning(id2#16307L, 1000)
>  :     +- *Project [id#16304L AS id2#16307L]
>  :        +- *Range (1, 1073741824, step=1, splits=Some(600))
>  +- *Sort [id#16310L ASC NULLS FIRST], false, 0
>     +- Exchange hashpartitioning(id#16310L, 1000)
>        +- *Range (1, 10, step=1, splits=Some(600))
> {code}
> As a workaround, users need to perform inner instead of right join, and then join the result back with the small DF to fill the missing rows.
>  
>  
>  
>  



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)

---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscribe@spark.apache.org
For additional commands, e-mail: issues-help@spark.apache.org