You are viewing a plain text version of this content. The canonical link for it is here.
Posted to issues@spark.apache.org by "Apache Spark (JIRA)" <ji...@apache.org> on 2019/05/21 12:48:00 UTC

[jira] [Assigned] (SPARK-27792) SkewJoin--handle only skewed keys with broadcastjoin and other keys with normal join

     [ https://issues.apache.org/jira/browse/SPARK-27792?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ]

Apache Spark reassigned SPARK-27792:
------------------------------------

    Assignee: Apache Spark

> SkewJoin--handle only skewed keys with broadcastjoin and other keys with normal join
> ------------------------------------------------------------------------------------
>
>                 Key: SPARK-27792
>                 URL: https://issues.apache.org/jira/browse/SPARK-27792
>             Project: Spark
>          Issue Type: New Feature
>          Components: SQL
>    Affects Versions: 2.4.3
>            Reporter: Jason Guo
>            Assignee: Apache Spark
>            Priority: Major
>         Attachments: SMJ DAG.png, SMJ tasks.png, skew join DAG.png, sql.png, time.png
>
>
> This feature is designed to handle data skew in Join
>  
> *Senario*
>  * A big table (big_skewed) which contains a a few skewed key
>  * A small table (small_even) which has no skewed key and is larger than the broadcast threshold 
>  * When big_skewed.join(small_even), a few tasks will be much slower than other tasks because they need to handle the skewed key
> *Effect*
> This feature reduce the join time from 5.7 minutes to 2.1 minutes
> !time.png!
> !sql.png!  
>  
> *Experiment*
> *Without this feature, the whole job took 5.7 minutes*
> tableA has 2 skewed keys 9500048 and 9500096
> {code:java}
> INSERT OVERWRITE TABLE big_skewed
> SELECT CAST(CASE WHEN id < 908000000 THEN (9500000 + (CAST (RAND() * 2 AS INT) + 1) * 48 )
>  ELSE CAST(id/100 AS INT) END AS STRING), 'A'
>  name
> FROM ids
> WHERE id BETWEEN 900000000 AND 1050000000;{code}
> tableB has no skewed keys
> {code:java}
> INSERT OVERWRITE TABLE small_even
> SELECT CAST(CAST(id/100 AS INT) AS STRING), 'B'
>  name
> FROM ids
> WHERE id BETWEEN 950000000 AND 950500000;{code}
>  
> Join them with setting spark.sql.autoBroadcastJoinThreshold to 3000
> {code:java}
> insert overwrite table result_with_skew
> select big_skewed.id, tabig_skewed.value, small_even.value
> from big_skewed
> join small_even
> on small_even.id=big_skewed.id;
> {code}
>  
> The sort merge join is slow with 2 straggle tasks
> !SMJ DAG.png!  
>   !SMJ tasks.png!
>  
> *With this feature, the job took only 2.1 minutes*
> The skewed keys are joint with broadcast join and the non-skewed keys are joint with sort merge join
> !skew join DAG.png!  
>  
>  



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)

---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscribe@spark.apache.org
For additional commands, e-mail: issues-help@spark.apache.org