You are viewing a plain text version of this content. The canonical link for it is here.
Posted to issues@spark.apache.org by "Dongjoon Hyun (Jira)" <ji...@apache.org> on 2022/10/06 16:58:00 UTC

[jira] [Updated] (SPARK-40407) Repartition of DataFrame can result in severe data skew in some special case

     [ https://issues.apache.org/jira/browse/SPARK-40407?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ]

Dongjoon Hyun updated SPARK-40407:
----------------------------------
    Issue Type: Bug  (was: Improvement)

> Repartition of DataFrame can result in severe data skew in some special case
> ----------------------------------------------------------------------------
>
>                 Key: SPARK-40407
>                 URL: https://issues.apache.org/jira/browse/SPARK-40407
>             Project: Spark
>          Issue Type: Bug
>          Components: SQL
>    Affects Versions: 3.0.1, 3.1.1, 3.1.2, 3.1.3, 3.2.1, 3.3.0, 3.2.2
>            Reporter: Bobby Wang
>            Assignee: Bobby Wang
>            Priority: Major
>             Fix For: 3.4.0, 3.3.1, 3.2.3
>
>
> {code:scala}
> _val df = spark.range(0, 100, 1, 50).repartition(4)_
> _val v = df.rdd.mapPartitions { iter => {_
>         _Iterator.single(iter.length)_
> {_}}{_}{_}.collect(){_}
> _println(v.mkString(","))_
> {code}
> The above simple code outputs `50,0,0,50`, which means there is no data in partition 1 and partition 2.
> I just debugged it and found the RoundRobin seems to ensure to distribute the records evenly **in the same partition**, and not guarantee it between partitions.
> Below is the code to generate the key
> {code:scala}
>       case RoundRobinPartitioning(numPartitions) =>
>         // Distributes elements evenly across output partitions, starting from a random partition.
>         var position = new Random(TaskContext.get().partitionId()).nextInt(numPartitions)  
>         (row: InternalRow) => {
>           // The HashPartitioner will handle the `mod` by the number of partitions
>           position += 1
>           position
>         }
> {code}
> In this case, There are 50 partitions, each partition will only compute 2 elements. The issue for RoundRobin here is it always starts with *position=2* to do the Roundrobin.
> See the output of Random
> {code:scala}
> scala> (1 to 200).foreach(partitionId => print(new Random(partitionId).nextInt(4) + " "))  // the position is always 2.
> 2 2 2 2 2 2 2 2 2 2 2 2 2 2 2 2 2 2 2 2 2 2 2 2 2 2 2 2 2 2 2 2 2 2 2 2 2 2 2 2 2 2 2 2 2 2 2 2 2 2 2 2 2 2 2 2 2 2 2 2 2 2 2 2 2 2 2 2 2 2 2 2 2 2 2 2 2 2 2 2 2 2 2 2 2 2 2 2 2 2 2 2 2 2 2 2 2 2 2 2 2 2 2 2 2 2 2 2 2 2 2 2 2 2 2 2 2 2 2 2 2 2 2 2 2 2 2 2 2 2 2 2 2 2 2 2 2 2 2 2 2 2 2 2 2 2 2 2 2 2 2 2 2 2 2 2 2 2 2 2 2 2 2 2 2 2 2 2 2 2 2 2 2 2 2 2 2 2 2 2 2 2 2 2 2 2 2 2 2 2 2 2 2 2 2 2 2 2 2 2 
> {code}
> Similarly, the below Random code also outputs the same value, 
> {code:scala}
> (1 to 200).foreach(partitionId => print(new Random(partitionId).nextInt(2) + " "))
> (1 to 200).foreach(partitionId => print(new Random(partitionId).nextInt(4) + " "))
> (1 to 200).foreach(partitionId => print(new Random(partitionId).nextInt(8) + " "))
> (1 to 200).foreach(partitionId => print(new Random(partitionId).nextInt(16) + " "))
> (1 to 200).foreach(partitionId => print(new Random(partitionId).nextInt(32) + " "))
> {code}
> Let's go back to this case,
> Consider partition 0, the total elements are [0, 1], so when shuffle writes, for element 0, the key will be (position + 1) = 2 + 1 = 3%4=3, the element 1, the key will be (position + 1)=(3+1)=4%4 = 0
> consider partition 1, the total elements are [2, 3], so when shuffle writes, for element 2, the key will be (position + 1) = 2 + 1 = 3%4=3, the element 3, the key will be (position + 1)=(3+1)=4%4 = 0
>  
> The calculation is also applied for other left partitions since the starting position is always 2 for this case.
> So, as you can see, each partition will write its elements to Partition [0, 3], which results in Partition [1, 2] without any data.
>  
> I will try to provide the patch to fix this issue.



--
This message was sent by Atlassian Jira
(v8.20.10#820010)

---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscribe@spark.apache.org
For additional commands, e-mail: issues-help@spark.apache.org