You are viewing a plain text version of this content. The canonical link for it is here.
Posted to issues@spark.apache.org by "Jason Guo (JIRA)" <ji...@apache.org> on 2019/06/02 00:54:00 UTC

[jira] [Updated] (SPARK-27792) SkewJoin--handle only skewed keys with broadcastjoin and other keys with normal join

     [ https://issues.apache.org/jira/browse/SPARK-27792?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ]

Jason Guo updated SPARK-27792:
------------------------------
    Shepherd:   (was: Dongjoon Hyun)

> SkewJoin--handle only skewed keys with broadcastjoin and other keys with normal join
> ------------------------------------------------------------------------------------
>
>                 Key: SPARK-27792
>                 URL: https://issues.apache.org/jira/browse/SPARK-27792
>             Project: Spark
>          Issue Type: New Feature
>          Components: SQL
>    Affects Versions: 2.4.3
>            Reporter: Jason Guo
>            Priority: Major
>         Attachments: SMJ DAG.png, SMJ tasks.png, skew join DAG.png, sql.png, time.png
>
>
> This feature is designed to handle data skew in Join
>  
> *Senario*
>  * A big table (big_skewed) which contains a a few skewed key
>  * A small table (small_even) which has no skewed key and is larger than the broadcast threshold 
>  * When big_skewed.join(small_even), a few tasks will be much slower than other tasks because they need to handle the skewed key
> *Solution*
>  * Provide a hint to indicate which keys are skewed keys
>  * Handle the skewed keys with broadcastjoin and join the non-skewed keys with normal joint method
>  * For the small table, the whole table is larger than the broadcast threshold. But total size of the records with the same keys which is skewed keys in the big table is smaller than the broadcast threshold, so these records can be joint with the big table with broadcast join
>  * For other records with non-skewed keys, they can be joint with normal join method
>  * We can get the final result with union the above two parts
> *Effect*
> This feature reduce the join time from 5.7 minutes to 2.1 minutes
> !time.png!
> !sql.png!  
>  
> *Experiment*
> *Without this feature, the whole job took 5.7 minutes*
> tableA has 2 skewed keys 9500048 and 9500096
> {code:java}
> INSERT OVERWRITE TABLE big_skewed
> SELECT CAST(CASE WHEN id < 908000000 THEN (9500000 + (CAST (RAND() * 2 AS INT) + 1) * 48 )
>  ELSE CAST(id/100 AS INT) END AS STRING), 'A'
>  name
> FROM ids
> WHERE id BETWEEN 900000000 AND 1050000000;{code}
> tableB has no skewed keys
> {code:java}
> INSERT OVERWRITE TABLE small_even
> SELECT CAST(CAST(id/100 AS INT) AS STRING), 'B'
>  name
> FROM ids
> WHERE id BETWEEN 950000000 AND 950500000;{code}
>  
> Join them with setting spark.sql.autoBroadcastJoinThreshold to 3000
> {code:java}
> insert overwrite table result_with_skew
> select big_skewed.id, tabig_skewed.value, small_even.value
> from big_skewed
> join small_even
> on small_even.id=big_skewed.id;
> {code}
>  
> The sort merge join is slow with 2 straggle tasks
> !SMJ DAG.png!  
>   !SMJ tasks.png!
>  
> *With this feature, the job took only 2.1 minutes*
> The skewed keys are joint with broadcast join and the non-skewed keys are joint with sort merge join
> !skew join DAG.png!  
>  
>  



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)

---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscribe@spark.apache.org
For additional commands, e-mail: issues-help@spark.apache.org