You are viewing a plain text version of this content. The canonical link for it is here.
Posted to issues@spark.apache.org by "Cheng Su (Jira)" <ji...@apache.org> on 2020/08/17 06:25:00 UTC

[jira] [Commented] (SPARK-32634) Introduce sort-based fallback mechanism for shuffled hash join

    [ https://issues.apache.org/jira/browse/SPARK-32634?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17178746#comment-17178746 ] 

Cheng Su commented on SPARK-32634:
----------------------------------

[~cloud_fan] - wondering what do you think of this idea? Would like to get your opinion before sending out PR. Please let me know if this is a major change from your perspective and if we need more discussion in dev mailing list. Thanks.

> Introduce sort-based fallback mechanism for shuffled hash join 
> ---------------------------------------------------------------
>
>                 Key: SPARK-32634
>                 URL: https://issues.apache.org/jira/browse/SPARK-32634
>             Project: Spark
>          Issue Type: Sub-task
>          Components: SQL
>    Affects Versions: 3.1.0
>            Reporter: Cheng Su
>            Priority: Minor
>
> A major pain point for spark users to stay away from using shuffled hash join is out of memory issue. Shuffled hash join tends to have OOM issue because it allocates in-memory hashed relation (`UnsafeHashedRelation` or `LongHashedRelation`) for build side, and there's no recovery once the size of hashed relation grows and cannot fit in memory.On the other hand, shuffled hash join is more CPU and IO efficient then sort merge join when joining one large table and a small table (but small table is too large to be broadcasted), as SHJ does not sort the large table, but SMJ needs to do that.
> To improve the reliability of shuffled hash join, a fallback mechanism can be introduced to avoid shuffled hash join OOM issue completely. Similarly we already have a fallback to sort-based aggregation for hash aggregate. The idea is:
> (1).Build hashed relation as current, but monitor the hashed relation size when inserting each build side row. If size of hashed relation being always smaller than a configurable threshold, go to (2.1), else go to (2.2).
> (2.1).Current shuffled hash join logic: reading stream side rows and probing hashed relation.
> (2.2).Fall back to sort merge join: Sort stream side rows, and sort build side rows (iterate rows already in hashed relation (e.g. through `BytesToBytesMap.destructiveIterator`), then iterate rest of un-read build side rows). Then doing sort merge join for stream + build side rows.
>  
> Note:
> (1).the fallback is dynamic and happened per task, which means task 0 can incur the fallback e.g. if it has a big build side, but task 1,2 don't need to incur the fallback depending on the size of hashed relation.
> (2).there's no major code change for SHJ and SMJ. Major change is around HashedRelation to introduce some new methods, e.g. `HashedRelation.destructiveValues()` to return an Iterator of build side rows in hashed relation and cleaning up hashed relation along the way.
> (3).we have run this feature by default in our internal fork more than 2 years, and we benefit a lot from it with users can choose to use SHJ, and we don't need to worry about SHJ reliability (see https://issues.apache.org/jira/browse/SPARK-21505 for the original proposal from our side, I tweak here to make it less intrusive and more acceptable, e.g. not introducing a separate join operator, but doing the fallback automatically inside SHJ operator itself).



--
This message was sent by Atlassian Jira
(v8.3.4#803005)

---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscribe@spark.apache.org
For additional commands, e-mail: issues-help@spark.apache.org