You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@hudi.apache.org by da...@apache.org on 2023/02/03 05:56:16 UTC

[hudi] branch master updated: [HUDI-5671] BucketIndexPartitioner partition algorithm skew (#7815)

This is an automated email from the ASF dual-hosted git repository.

danny0405 pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/hudi.git


The following commit(s) were added to refs/heads/master by this push:
     new 3282caa2242 [HUDI-5671] BucketIndexPartitioner partition algorithm skew (#7815)
3282caa2242 is described below

commit 3282caa22420f3012698ec8cce376ad986034300
Author: luokey <85...@qq.com>
AuthorDate: Fri Feb 3 00:56:07 2023 -0500

    [HUDI-5671] BucketIndexPartitioner partition algorithm skew (#7815)
---
 .../java/org/apache/hudi/sink/bucket/BucketStreamWriteFunction.java  | 5 +++--
 .../org/apache/hudi/sink/partitioner/BucketIndexPartitioner.java     | 5 +++--
 2 files changed, 6 insertions(+), 4 deletions(-)

diff --git a/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/sink/bucket/BucketStreamWriteFunction.java b/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/sink/bucket/BucketStreamWriteFunction.java
index cf06dbc18d6..dcbb30fb8bc 100644
--- a/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/sink/bucket/BucketStreamWriteFunction.java
+++ b/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/sink/bucket/BucketStreamWriteFunction.java
@@ -135,8 +135,9 @@ public class BucketStreamWriteFunction<I> extends StreamWriteFunction<I> {
    * (partition + curBucket) % numPartitions == this taskID belongs to this task.
    */
   public boolean isBucketToLoad(int bucketNumber, String partition) {
-    int globalHash = ((partition + bucketNumber).hashCode()) & Integer.MAX_VALUE;
-    return BucketIdentifier.mod(globalHash, parallelism) == taskID;
+    final int partitionIndex = (partition.hashCode() & Integer.MAX_VALUE) % parallelism;
+    int globalIndex = partitionIndex + bucketNumber;
+    return BucketIdentifier.mod(globalIndex, parallelism)  == taskID;
   }
 
   /**
diff --git a/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/sink/partitioner/BucketIndexPartitioner.java b/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/sink/partitioner/BucketIndexPartitioner.java
index 5fa3d1ab9a0..4e0c08b1046 100644
--- a/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/sink/partitioner/BucketIndexPartitioner.java
+++ b/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/sink/partitioner/BucketIndexPartitioner.java
@@ -42,7 +42,8 @@ public class BucketIndexPartitioner<T extends HoodieKey> implements Partitioner<
   @Override
   public int partition(HoodieKey key, int numPartitions) {
     int curBucket = BucketIdentifier.getBucketId(key, indexKeyFields, bucketNum);
-    int globalHash = (key.getPartitionPath() + curBucket).hashCode() & Integer.MAX_VALUE;
-    return BucketIdentifier.mod(globalHash, numPartitions);
+    int partitionIndex = (key.getPartitionPath().hashCode() & Integer.MAX_VALUE) % numPartitions;
+    int globalIndex = partitionIndex + curBucket;
+    return BucketIdentifier.mod(globalIndex, numPartitions);
   }
 }