You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@hudi.apache.org by "sivabalan narayanan (Jira)" <ji...@apache.org> on 2023/03/30 02:17:00 UTC

[jira] [Updated] (HUDI-5671) BucketIndexPartitioner partition algorithm skew

     [ https://issues.apache.org/jira/browse/HUDI-5671?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ]

sivabalan narayanan updated HUDI-5671:
--------------------------------------
    Fix Version/s: 0.12.3

> BucketIndexPartitioner partition algorithm skew
> -----------------------------------------------
>
>                 Key: HUDI-5671
>                 URL: https://issues.apache.org/jira/browse/HUDI-5671
>             Project: Apache Hudi
>          Issue Type: Improvement
>          Components: flink, index
>            Reporter: loukey_j
>            Priority: Major
>              Labels: pull-request-available
>             Fix For: 0.13.0, 0.12.3
>
>         Attachments: image-2023-02-01-14-45-33-116.png, image-2023-02-01-14-50-29-703.png, image-2023-02-01-15-00-14-889.png, image-2023-02-01-15-05-15-491.png
>
>
> The online job runs for 13 days and finds that there are subtasks but no data processing, as shown in the figure below, this job uses the update time as the partition, uses the bucket index, the number of buckets is 128, and the write parallelism is 128. The key is uniform because the file size of each bucket is not much different from the storage point of view. After positioning, there is a skew in the shuffle algorithm.
> !image-2023-02-01-14-45-33-116.png!
> Potential disadvantages of algorithmic tilt:
>   1. The memory usage is uneven, some nodes may have high pressure on the JVM, and TM nodes are prone to timeout
>   2. It may cause the checkpoint to time out, because the data will be flushed to hdfs during the snapshot state. If the skew is serious, it will cause some nodes to take too long and cause timeout.
> current algorithm:
> !image-2023-02-01-14-50-29-703.png!
> Algorithm flaws:
>   1. curBucket ∈ [0, numBuckets -1]
>   2. For the number of globalHash values in the same partition <= numBuckets number, globalHash is divergent, and mod(globalHash, numPartitions) is easy to conflict
>   3. When numBuckets is relatively large, shuffleIndex is prone to conflicts, resulting in skew
> Algorithm optimization:
> !image-2023-02-01-15-00-14-889.png!
> kb = key % b; kb ∈ [0, b-1] pw = pt % w;
> pw ∈ [0, w-1] shuffleIndex = (pw + kb) % w 
> shuffleIndex ∈ [0, w-1] 
>  
> In fact, it is to calculate a pw according to the partition first. Pw can be understood as a slot Wn allocated to the partition. Different partitions have a slot.
> Then move b slots back on the basis of this slot as the writing of data for this partition
> !image-2023-02-01-15-05-15-491.png!
>  
>  
>  



--
This message was sent by Atlassian Jira
(v8.20.10#820010)