You are viewing a plain text version of this content. The canonical link for it is here.
Posted to issues@spark.apache.org by "Michael Wu (JIRA)" <ji...@apache.org> on 2019/05/03 12:06:00 UTC
[jira] [Updated] (SPARK-27628) SortMergeJoin on a low-cardinality
column results in heavy skew and large partitions
[ https://issues.apache.org/jira/browse/SPARK-27628?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ]
Michael Wu updated SPARK-27628:
-------------------------------
Description:
Let's say we have a dataframe *a* that looks like this:
{code:java}
| temp | active |
|------|--------|
| 123 | Yes |
| 1235 | No |
...{code}
where the *active* column only contains two string values - "Yes" and "No".
Let's say we do a join with some other dataframe *b* using *active* as the join key. Assume neither *a* nor *b* is not small enough to allow for a broadcast join and Spark is forced to do a sort-merge-join.
In a sort-merge-join, Spark will partition data frames on both sides of the join, using the values in the join column to determine the partition in which a given row belong. This appears to be catastrophic when the column has low-cardinality. In the case of **dataframe *a*, it would be partitioned into X partitions by hashing the value in the *active* column and modulo X (where X is the value spark.sql.shuffle.partitions) - this means that only two partitions would have any data while rest are empty. In cases where the dataframes involved in the join are large, this can add a lot of pressure on disk usage, not to mention reduced join performance due to pretty extreme skew. Is there anyway around this behavior?
Current workaround I can think of is introducing additional columns as join keys to more evenly distribute data during the partitioning part of the join. Is this the recommended approach?
was:
Let's say we have a dataframe *a* that looks like this:
{code:java}
| temp | active |
|------|--------|
| 123 | Yes |
| 1235 | No |
...{code}
where the *active* column only contains two string values - "Yes" and "No".
Let's say we do a join with some other dataframe *b* using *active* as the join key. Assume neither *a* nor *b* is not small enough to allow for a broadcast join and Spark is forced to do a sort-merge-join.
In a sort-merge-join, Spark will partition data frames on both sides of the join, using the values in the join column to determine the partition in which a given row belong. This appears to be catastrophic when the column has low-cardinality. In the case of **dataframe *a*, it would be partitioned into X partitions by hashing the value in the *active* column and modulo X (where X is the value spark.sql.shuffle.partitions) - this means that only two partitions would have any data while rest are empty. In cases where **the dataframes involved in the join are large, this can add a lot of pressure on disk usage, not to mention reduced join performance due to pretty extreme skew. Is there anyway around this behavior?
Current workaround I can think of is introducing additional columns as join keys to more evenly distribute data during the partitioning part of the join. Is this the recommended approach?
> SortMergeJoin on a low-cardinality column results in heavy skew and large partitions
> ------------------------------------------------------------------------------------
>
> Key: SPARK-27628
> URL: https://issues.apache.org/jira/browse/SPARK-27628
> Project: Spark
> Issue Type: Bug
> Components: Spark Core
> Affects Versions: 2.4.2
> Reporter: Michael Wu
> Priority: Major
>
> Let's say we have a dataframe *a* that looks like this:
> {code:java}
> | temp | active |
> |------|--------|
> | 123 | Yes |
> | 1235 | No |
> ...{code}
> where the *active* column only contains two string values - "Yes" and "No".
> Let's say we do a join with some other dataframe *b* using *active* as the join key. Assume neither *a* nor *b* is not small enough to allow for a broadcast join and Spark is forced to do a sort-merge-join.
> In a sort-merge-join, Spark will partition data frames on both sides of the join, using the values in the join column to determine the partition in which a given row belong. This appears to be catastrophic when the column has low-cardinality. In the case of **dataframe *a*, it would be partitioned into X partitions by hashing the value in the *active* column and modulo X (where X is the value spark.sql.shuffle.partitions) - this means that only two partitions would have any data while rest are empty. In cases where the dataframes involved in the join are large, this can add a lot of pressure on disk usage, not to mention reduced join performance due to pretty extreme skew. Is there anyway around this behavior?
> Current workaround I can think of is introducing additional columns as join keys to more evenly distribute data during the partitioning part of the join. Is this the recommended approach?
--
This message was sent by Atlassian JIRA
(v7.6.3#76005)
---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscribe@spark.apache.org
For additional commands, e-mail: issues-help@spark.apache.org