You are viewing a plain text version of this content. The canonical link for it is here.
Posted to issues@spark.apache.org by "Michael Wu (JIRA)" <ji...@apache.org> on 2019/05/03 12:06:00 UTC

[jira] [Updated] (SPARK-27628) SortMergeJoin on a low-cardinality column results in heavy skew and large partitions

     [ https://issues.apache.org/jira/browse/SPARK-27628?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ]

Michael Wu updated SPARK-27628:
-------------------------------
    Description: 
Let's say we have a dataframe *a* that looks like this:
{code:java}
| temp | active |
|------|--------|
| 123  | Yes    |
| 1235 | No     |
...{code}
where the *active* column only contains two string values - "Yes" and "No". 

Let's say we do a join with some other dataframe *b* using *active* as the join key. Assume neither *a* nor *b* is not small enough to allow for a broadcast join and Spark is forced to do a sort-merge-join.

In a sort-merge-join, Spark will partition data frames on both sides of the join, using the values in the join column to determine the partition in which a given row belong. This appears to be catastrophic when the column has low-cardinality. In the case of **dataframe *a*, it would be partitioned into X partitions by hashing the value in the *active* column and modulo X (where X is the value spark.sql.shuffle.partitions) - this means that only two partitions would have any data while rest are empty. In cases where the dataframes involved in the join are large, this can add a lot of pressure on disk usage, not to mention reduced join performance due to pretty extreme skew. Is there anyway around this behavior?

Current workaround I can think of is introducing additional columns as join keys to more evenly distribute data during the partitioning part of the join. Is this the recommended approach?

  was:
Let's say we have a dataframe *a* that looks like this:
{code:java}
| temp | active |
|------|--------|
| 123  | Yes    |
| 1235 | No     |
...{code}
where the *active* column only contains two string values - "Yes" and "No". 

Let's say we do a join with some other dataframe *b* using *active* as the join key. Assume neither *a* nor *b* is not small enough to allow for a broadcast join and Spark is forced to do a sort-merge-join.

In a sort-merge-join, Spark will partition data frames on both sides of the join, using the values in the join column to determine the partition in which a given row belong. This appears to be catastrophic when the column has low-cardinality. In the case of **dataframe *a*, it would be partitioned into X partitions by hashing the value in the *active* column and modulo X (where X is the value spark.sql.shuffle.partitions) - this means that only two partitions would have any data while rest are empty. In cases where **the dataframes involved in the join are large, this can add a lot of pressure on disk usage, not to mention reduced join performance due to pretty extreme skew. Is there anyway around this behavior?

Current workaround I can think of is introducing additional columns as join keys to more evenly distribute data during the partitioning part of the join. Is this the recommended approach?


> SortMergeJoin on a low-cardinality column results in heavy skew and large partitions
> ------------------------------------------------------------------------------------
>
>                 Key: SPARK-27628
>                 URL: https://issues.apache.org/jira/browse/SPARK-27628
>             Project: Spark
>          Issue Type: Bug
>          Components: Spark Core
>    Affects Versions: 2.4.2
>            Reporter: Michael Wu
>            Priority: Major
>
> Let's say we have a dataframe *a* that looks like this:
> {code:java}
> | temp | active |
> |------|--------|
> | 123  | Yes    |
> | 1235 | No     |
> ...{code}
> where the *active* column only contains two string values - "Yes" and "No". 
> Let's say we do a join with some other dataframe *b* using *active* as the join key. Assume neither *a* nor *b* is not small enough to allow for a broadcast join and Spark is forced to do a sort-merge-join.
> In a sort-merge-join, Spark will partition data frames on both sides of the join, using the values in the join column to determine the partition in which a given row belong. This appears to be catastrophic when the column has low-cardinality. In the case of **dataframe *a*, it would be partitioned into X partitions by hashing the value in the *active* column and modulo X (where X is the value spark.sql.shuffle.partitions) - this means that only two partitions would have any data while rest are empty. In cases where the dataframes involved in the join are large, this can add a lot of pressure on disk usage, not to mention reduced join performance due to pretty extreme skew. Is there anyway around this behavior?
> Current workaround I can think of is introducing additional columns as join keys to more evenly distribute data during the partitioning part of the join. Is this the recommended approach?



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)

---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscribe@spark.apache.org
For additional commands, e-mail: issues-help@spark.apache.org