You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@paimon.apache.org by lz...@apache.org on 2023/12/21 06:11:16 UTC

(incubator-paimon) branch master updated: [core] Optimize HashBucketAssigner to more parallilism with less initialized bucket (#2536)

This is an automated email from the ASF dual-hosted git repository.

lzljs3620320 pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/incubator-paimon.git


The following commit(s) were added to refs/heads/master by this push:
     new fdcc24f90 [core] Optimize HashBucketAssigner to more parallilism with less initialized bucket (#2536)
fdcc24f90 is described below

commit fdcc24f90f2762b294ac9ac2270527d6b3de5c95
Author: Jingsong Lee <ji...@gmail.com>
AuthorDate: Thu Dec 21 14:11:12 2023 +0800

    [core] Optimize HashBucketAssigner to more parallilism with less initialized bucket (#2536)
---
 docs/content/concepts/primary-key-table.md         |  2 +-
 .../shortcodes/generated/core_configuration.html   |  6 ++
 .../main/java/org/apache/paimon/CoreOptions.java   | 11 ++++
 .../java/org/apache/paimon/utils/MathUtils.java    | 32 ++++++++++
 .../org/apache/paimon/index/BucketAssigner.java    |  6 ++
 .../apache/paimon/index/HashBucketAssigner.java    | 24 +++++---
 .../apache/paimon/index/BucketAssignerTest.java    | 44 ++++++++++++++
 .../paimon/index/HashBucketAssignerTest.java       | 69 ++++++++++++++++------
 ...mputer.java => CdcAssignerChannelComputer.java} | 15 +++--
 .../flink/sink/cdc/CdcDynamicBucketSink.java       |  4 +-
 .../flink/sink/DynamicBucketCompactSink.java       |  2 +-
 .../paimon/flink/sink/DynamicBucketSink.java       | 11 ++--
 .../flink/sink/HashBucketAssignerOperator.java     | 19 +++---
 ...mputer.java => RowAssignerChannelComputer.java} | 15 +++--
 .../paimon/flink/sink/RowDynamicBucketSink.java    |  4 +-
 .../flink/sink/index/GlobalDynamicBucketSink.java  |  6 +-
 .../paimon/flink/DynamicBucketTableITCase.java     | 14 ++++-
 .../spark/commands/WriteIntoPaimonTable.scala      |  6 +-
 18 files changed, 234 insertions(+), 56 deletions(-)

diff --git a/docs/content/concepts/primary-key-table.md b/docs/content/concepts/primary-key-table.md
index eb3ade035..40cf98ab1 100644
--- a/docs/content/concepts/primary-key-table.md
+++ b/docs/content/concepts/primary-key-table.md
@@ -53,7 +53,7 @@ an index to determine which key corresponds to which bucket.
 Paimon will automatically expand the number of buckets.
 
 - Option1: `'dynamic-bucket.target-row-num'`: controls the target row number for one bucket.
-- Option2: `'dynamic-bucket.assigner-parallelism'`: Parallelism of assigner operator, controls the number of initialized bucket.
+- Option2: `'dynamic-bucket.initial-buckets'`: controls the number of initialized bucket.
 
 {{< hint info >}}
 Dynamic Bucket only support single write job. Please do not start multiple jobs to write to the same partition 
diff --git a/docs/layouts/shortcodes/generated/core_configuration.html b/docs/layouts/shortcodes/generated/core_configuration.html
index 875286150..1ae196721 100644
--- a/docs/layouts/shortcodes/generated/core_configuration.html
+++ b/docs/layouts/shortcodes/generated/core_configuration.html
@@ -140,6 +140,12 @@ under the License.
             <td>Boolean</td>
             <td>Whether to ignore delete records in deduplicate mode.</td>
         </tr>
+        <tr>
+            <td><h5>dynamic-bucket.initial-buckets</h5></td>
+            <td style="word-wrap: break-word;">(none)</td>
+            <td>Integer</td>
+            <td>Initial buckets for a partition in assigner operator for dynamic bucket mode.</td>
+        </tr>
         <tr>
             <td><h5>dynamic-bucket.assigner-parallelism</h5></td>
             <td style="word-wrap: break-word;">(none)</td>
diff --git a/paimon-common/src/main/java/org/apache/paimon/CoreOptions.java b/paimon-common/src/main/java/org/apache/paimon/CoreOptions.java
index e3f410a87..7e757b591 100644
--- a/paimon-common/src/main/java/org/apache/paimon/CoreOptions.java
+++ b/paimon-common/src/main/java/org/apache/paimon/CoreOptions.java
@@ -764,6 +764,13 @@ public class CoreOptions implements Serializable {
                             "If the bucket is -1, for primary key table, is dynamic bucket mode, "
                                     + "this option controls the target row number for one bucket.");
 
+    public static final ConfigOption<Integer> DYNAMIC_BUCKET_INITIAL_BUCKETS =
+            key("dynamic-bucket.initial-buckets")
+                    .intType()
+                    .noDefaultValue()
+                    .withDescription(
+                            "Initial buckets for a partition in assigner operator for dynamic bucket mode.");
+
     public static final ConfigOption<Integer> DYNAMIC_BUCKET_ASSIGNER_PARALLELISM =
             key("dynamic-bucket.assigner-parallelism")
                     .intType()
@@ -1318,6 +1325,10 @@ public class CoreOptions implements Serializable {
         return options.get(SCAN_MANIFEST_PARALLELISM);
     }
 
+    public Integer dynamicBucketInitialBuckets() {
+        return options.get(DYNAMIC_BUCKET_INITIAL_BUCKETS);
+    }
+
     public Integer dynamicBucketAssignerParallelism() {
         return options.get(DYNAMIC_BUCKET_ASSIGNER_PARALLELISM);
     }
diff --git a/paimon-common/src/main/java/org/apache/paimon/utils/MathUtils.java b/paimon-common/src/main/java/org/apache/paimon/utils/MathUtils.java
index 11a5f670e..4bd0ba680 100644
--- a/paimon-common/src/main/java/org/apache/paimon/utils/MathUtils.java
+++ b/paimon-common/src/main/java/org/apache/paimon/utils/MathUtils.java
@@ -51,4 +51,36 @@ public class MathUtils {
         }
         return 31 - Integer.numberOfLeadingZeros(value);
     }
+
+    public static Integer max(Integer v1, Integer v2) {
+        if (v1 == null && v2 == null) {
+            return null;
+        }
+
+        if (v1 != null && v2 == null) {
+            return v1;
+        }
+
+        if (v1 == null) {
+            return v2;
+        }
+
+        return Math.max(v1, v2);
+    }
+
+    public static Integer min(Integer v1, Integer v2) {
+        if (v1 == null && v2 == null) {
+            return null;
+        }
+
+        if (v1 != null && v2 == null) {
+            return v1;
+        }
+
+        if (v1 == null) {
+            return v2;
+        }
+
+        return Math.min(v1, v2);
+    }
 }
diff --git a/paimon-core/src/main/java/org/apache/paimon/index/BucketAssigner.java b/paimon-core/src/main/java/org/apache/paimon/index/BucketAssigner.java
index f87d6a16f..cec768558 100644
--- a/paimon-core/src/main/java/org/apache/paimon/index/BucketAssigner.java
+++ b/paimon-core/src/main/java/org/apache/paimon/index/BucketAssigner.java
@@ -26,4 +26,10 @@ public interface BucketAssigner {
     int assign(BinaryRow partition, int hash);
 
     void prepareCommit(long commitIdentifier);
+
+    static int computeAssigner(int partitionHash, int keyHash, int numChannels, int numAssigners) {
+        int start = Math.abs(partitionHash % numChannels);
+        int id = Math.abs(keyHash % numAssigners);
+        return (start + id) % numChannels;
+    }
 }
diff --git a/paimon-core/src/main/java/org/apache/paimon/index/HashBucketAssigner.java b/paimon-core/src/main/java/org/apache/paimon/index/HashBucketAssigner.java
index 519a495b0..114b8d3df 100644
--- a/paimon-core/src/main/java/org/apache/paimon/index/HashBucketAssigner.java
+++ b/paimon-core/src/main/java/org/apache/paimon/index/HashBucketAssigner.java
@@ -41,6 +41,7 @@ public class HashBucketAssigner implements BucketAssigner {
     private final SnapshotManager snapshotManager;
     private final String commitUser;
     private final IndexFileHandler indexFileHandler;
+    private final int numChannels;
     private final int numAssigners;
     private final int assignId;
     private final long targetBucketRowNumber;
@@ -51,12 +52,14 @@ public class HashBucketAssigner implements BucketAssigner {
             SnapshotManager snapshotManager,
             String commitUser,
             IndexFileHandler indexFileHandler,
+            int numChannels,
             int numAssigners,
             int assignId,
             long targetBucketRowNumber) {
         this.snapshotManager = snapshotManager;
         this.commitUser = commitUser;
         this.indexFileHandler = indexFileHandler;
+        this.numChannels = numChannels;
         this.numAssigners = numAssigners;
         this.assignId = assignId;
         this.targetBucketRowNumber = targetBucketRowNumber;
@@ -66,7 +69,8 @@ public class HashBucketAssigner implements BucketAssigner {
     /** Assign a bucket for key hash of a record. */
     @Override
     public int assign(BinaryRow partition, int hash) {
-        int recordAssignId = computeAssignId(hash);
+        int partitionHash = partition.hashCode();
+        int recordAssignId = computeAssignId(partitionHash, hash);
         checkArgument(
                 recordAssignId == assignId,
                 "This is a bug, record assign id %s should equal to assign id %s.",
@@ -76,11 +80,11 @@ public class HashBucketAssigner implements BucketAssigner {
         PartitionIndex index = this.partitionIndex.get(partition);
         if (index == null) {
             partition = partition.copy();
-            index = loadIndex(partition);
+            index = loadIndex(partition, partitionHash);
             this.partitionIndex.put(partition, index);
         }
 
-        int assigned = index.assign(hash, (bucket) -> computeAssignId(bucket) == assignId);
+        int assigned = index.assign(hash, this::isMyBucket);
         if (LOG.isDebugEnabled()) {
             LOG.debug(
                     "Assign " + assigned + " to the partition " + partition + " key hash " + hash);
@@ -150,16 +154,20 @@ public class HashBucketAssigner implements BucketAssigner {
         return partitionIndex.keySet();
     }
 
-    private int computeAssignId(int hash) {
-        return Math.abs(hash % numAssigners);
+    private int computeAssignId(int partitionHash, int keyHash) {
+        return BucketAssigner.computeAssigner(partitionHash, keyHash, numChannels, numAssigners);
     }
 
-    private PartitionIndex loadIndex(BinaryRow partition) {
+    private boolean isMyBucket(int bucket) {
+        return bucket % numAssigners == assignId % numAssigners;
+    }
+
+    private PartitionIndex loadIndex(BinaryRow partition, int partitionHash) {
         return PartitionIndex.loadIndex(
                 indexFileHandler,
                 partition,
                 targetBucketRowNumber,
-                (hash) -> computeAssignId(hash) == assignId,
-                (bucket) -> computeAssignId(bucket) == assignId);
+                (hash) -> computeAssignId(partitionHash, hash) == assignId,
+                this::isMyBucket);
     }
 }
diff --git a/paimon-core/src/test/java/org/apache/paimon/index/BucketAssignerTest.java b/paimon-core/src/test/java/org/apache/paimon/index/BucketAssignerTest.java
new file mode 100644
index 000000000..edc4699f6
--- /dev/null
+++ b/paimon-core/src/test/java/org/apache/paimon/index/BucketAssignerTest.java
@@ -0,0 +1,44 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.paimon.index;
+
+import org.junit.jupiter.api.Test;
+
+import static java.lang.Integer.MAX_VALUE;
+import static org.apache.paimon.index.BucketAssigner.computeAssigner;
+import static org.assertj.core.api.Assertions.assertThat;
+
+class BucketAssignerTest {
+
+    @Test
+    public void testComputeAssigner() {
+        assertThat(computeAssigner(MAX_VALUE, 0, 5, 5)).isEqualTo(2);
+        assertThat(computeAssigner(MAX_VALUE, 1, 5, 5)).isEqualTo(3);
+        assertThat(computeAssigner(MAX_VALUE, 2, 5, 5)).isEqualTo(4);
+        assertThat(computeAssigner(MAX_VALUE, 3, 5, 5)).isEqualTo(0);
+
+        assertThat(computeAssigner(2, 0, 5, 3)).isEqualTo(2);
+        assertThat(computeAssigner(2, 1, 5, 3)).isEqualTo(3);
+        assertThat(computeAssigner(2, 2, 5, 3)).isEqualTo(4);
+        assertThat(computeAssigner(2, 3, 5, 3)).isEqualTo(2);
+
+        assertThat(computeAssigner(3, 1, 5, 1)).isEqualTo(3);
+        assertThat(computeAssigner(3, 2, 5, 1)).isEqualTo(3);
+    }
+}
diff --git a/paimon-core/src/test/java/org/apache/paimon/index/HashBucketAssignerTest.java b/paimon-core/src/test/java/org/apache/paimon/index/HashBucketAssignerTest.java
index 38f2f7008..7bb11260a 100644
--- a/paimon-core/src/test/java/org/apache/paimon/index/HashBucketAssignerTest.java
+++ b/paimon-core/src/test/java/org/apache/paimon/index/HashBucketAssignerTest.java
@@ -55,14 +55,20 @@ public class HashBucketAssignerTest extends PrimaryKeyTableTestBase {
         commit.close();
     }
 
-    private HashBucketAssigner createAssigner(int numAssigners, int assignId) {
+    private HashBucketAssigner createAssigner(int numChannels, int numAssigners, int assignId) {
         return new HashBucketAssigner(
-                table.snapshotManager(), commitUser, fileHandler, numAssigners, assignId, 5);
+                table.snapshotManager(),
+                commitUser,
+                fileHandler,
+                numChannels,
+                numAssigners,
+                assignId,
+                5);
     }
 
     @Test
     public void testAssign() {
-        HashBucketAssigner assigner = createAssigner(2, 0);
+        HashBucketAssigner assigner = createAssigner(2, 2, 0);
 
         // assign
         assertThat(assigner.assign(row(1), 0)).isEqualTo(0);
@@ -87,7 +93,7 @@ public class HashBucketAssignerTest extends PrimaryKeyTableTestBase {
 
     @Test
     public void testPartitionCopy() {
-        HashBucketAssigner assigner = createAssigner(1, 0);
+        HashBucketAssigner assigner = createAssigner(1, 1, 0);
 
         BinaryRow partition = row(1);
         assertThat(assigner.assign(partition, 0)).isEqualTo(0);
@@ -113,33 +119,60 @@ public class HashBucketAssignerTest extends PrimaryKeyTableTestBase {
 
     @Test
     public void testAssignRestore() {
-        IndexFileMeta bucket0 = fileHandler.writeHashIndex(new int[] {0, 2});
-        IndexFileMeta bucket1 = fileHandler.writeHashIndex(new int[] {3, 5});
+        IndexFileMeta bucket0 = fileHandler.writeHashIndex(new int[] {2, 5});
+        IndexFileMeta bucket2 = fileHandler.writeHashIndex(new int[] {4, 7});
         commit.commit(
                 0,
                 Arrays.asList(
                         createCommitMessage(row(1), 0, bucket0),
-                        createCommitMessage(row(1), 1, bucket1)));
+                        createCommitMessage(row(1), 2, bucket2)));
 
-        HashBucketAssigner assigner0 = createAssigner(3, 0);
-        HashBucketAssigner assigner2 = createAssigner(3, 2);
+        HashBucketAssigner assigner0 = createAssigner(3, 3, 0);
+        HashBucketAssigner assigner2 = createAssigner(3, 3, 2);
 
         // read assigned
-        assertThat(assigner0.assign(row(1), 0)).isEqualTo(0);
-        assertThat(assigner2.assign(row(1), 2)).isEqualTo(0);
-        assertThat(assigner0.assign(row(1), 3)).isEqualTo(1);
-        assertThat(assigner2.assign(row(1), 5)).isEqualTo(1);
+        assertThat(assigner0.assign(row(1), 2)).isEqualTo(0);
+        assertThat(assigner2.assign(row(1), 4)).isEqualTo(2);
+        assertThat(assigner0.assign(row(1), 5)).isEqualTo(0);
+        assertThat(assigner2.assign(row(1), 7)).isEqualTo(2);
 
         // new assign
-        assertThat(assigner0.assign(row(1), 6)).isEqualTo(0);
-        assertThat(assigner0.assign(row(1), 9)).isEqualTo(0);
-        assertThat(assigner0.assign(row(1), 12)).isEqualTo(0);
-        assertThat(assigner0.assign(row(1), 15)).isEqualTo(3);
+        assertThat(assigner0.assign(row(1), 8)).isEqualTo(0);
+        assertThat(assigner0.assign(row(1), 11)).isEqualTo(0);
+        assertThat(assigner0.assign(row(1), 14)).isEqualTo(0);
+        assertThat(assigner0.assign(row(1), 17)).isEqualTo(3);
+    }
+
+    @Test
+    public void testAssignDecoupled() {
+        HashBucketAssigner assigner1 = createAssigner(3, 2, 1);
+        assertThat(assigner1.assign(row(1), 0)).isEqualTo(1);
+        assertThat(assigner1.assign(row(1), 2)).isEqualTo(1);
+        assertThat(assigner1.assign(row(1), 4)).isEqualTo(1);
+        assertThat(assigner1.assign(row(1), 6)).isEqualTo(1);
+        assertThat(assigner1.assign(row(1), 8)).isEqualTo(1);
+        assertThat(assigner1.assign(row(1), 10)).isEqualTo(3);
+
+        HashBucketAssigner assigner2 = createAssigner(3, 2, 2);
+        assertThat(assigner2.assign(row(1), 1)).isEqualTo(0);
+        assertThat(assigner2.assign(row(1), 3)).isEqualTo(0);
+        assertThat(assigner2.assign(row(1), 5)).isEqualTo(0);
+        assertThat(assigner2.assign(row(1), 7)).isEqualTo(0);
+        assertThat(assigner2.assign(row(1), 9)).isEqualTo(0);
+        assertThat(assigner2.assign(row(1), 11)).isEqualTo(2);
+
+        HashBucketAssigner assigner0 = createAssigner(3, 2, 0);
+        assertThat(assigner0.assign(row(2), 1)).isEqualTo(0);
+        assertThat(assigner0.assign(row(2), 3)).isEqualTo(0);
+        assertThat(assigner0.assign(row(2), 5)).isEqualTo(0);
+        assertThat(assigner0.assign(row(2), 7)).isEqualTo(0);
+        assertThat(assigner0.assign(row(2), 9)).isEqualTo(0);
+        assertThat(assigner0.assign(row(2), 11)).isEqualTo(2);
     }
 
     @Test
     public void testIndexEliminate() {
-        HashBucketAssigner assigner = createAssigner(1, 0);
+        HashBucketAssigner assigner = createAssigner(1, 1, 0);
 
         // checkpoint 0
         assertThat(assigner.assign(row(1), 0)).isEqualTo(0);
diff --git a/paimon-flink/paimon-flink-cdc/src/main/java/org/apache/paimon/flink/sink/cdc/CdcHashKeyChannelComputer.java b/paimon-flink/paimon-flink-cdc/src/main/java/org/apache/paimon/flink/sink/cdc/CdcAssignerChannelComputer.java
similarity index 71%
rename from paimon-flink/paimon-flink-cdc/src/main/java/org/apache/paimon/flink/sink/cdc/CdcHashKeyChannelComputer.java
rename to paimon-flink/paimon-flink-cdc/src/main/java/org/apache/paimon/flink/sink/cdc/CdcAssignerChannelComputer.java
index 918233749..93cc31b47 100644
--- a/paimon-flink/paimon-flink-cdc/src/main/java/org/apache/paimon/flink/sink/cdc/CdcHashKeyChannelComputer.java
+++ b/paimon-flink/paimon-flink-cdc/src/main/java/org/apache/paimon/flink/sink/cdc/CdcAssignerChannelComputer.java
@@ -20,32 +20,39 @@ package org.apache.paimon.flink.sink.cdc;
 
 import org.apache.paimon.flink.sink.ChannelComputer;
 import org.apache.paimon.schema.TableSchema;
+import org.apache.paimon.utils.MathUtils;
+
+import static org.apache.paimon.index.BucketAssigner.computeAssigner;
 
 /** Hash key of a {@link CdcRecord}. */
-public class CdcHashKeyChannelComputer implements ChannelComputer<CdcRecord> {
+public class CdcAssignerChannelComputer implements ChannelComputer<CdcRecord> {
 
     private static final long serialVersionUID = 1L;
 
     private final TableSchema schema;
+    private Integer numAssigners;
 
     private transient int numChannels;
     private transient CdcRecordKeyAndBucketExtractor extractor;
 
-    public CdcHashKeyChannelComputer(TableSchema schema) {
+    public CdcAssignerChannelComputer(TableSchema schema, Integer numAssigners) {
         this.schema = schema;
+        this.numAssigners = numAssigners;
     }
 
     @Override
     public void setup(int numChannels) {
         this.numChannels = numChannels;
+        this.numAssigners = MathUtils.min(numAssigners, numChannels);
         this.extractor = new CdcRecordKeyAndBucketExtractor(schema);
     }
 
     @Override
     public int channel(CdcRecord record) {
         extractor.setRecord(record);
-        int hash = extractor.trimmedPrimaryKey().hashCode();
-        return Math.abs(hash % numChannels);
+        int partitionHash = extractor.partition().hashCode();
+        int keyHash = extractor.trimmedPrimaryKey().hashCode();
+        return computeAssigner(partitionHash, keyHash, numChannels, numAssigners);
     }
 
     @Override
diff --git a/paimon-flink/paimon-flink-cdc/src/main/java/org/apache/paimon/flink/sink/cdc/CdcDynamicBucketSink.java b/paimon-flink/paimon-flink-cdc/src/main/java/org/apache/paimon/flink/sink/cdc/CdcDynamicBucketSink.java
index ecde6094c..bf4acf979 100644
--- a/paimon-flink/paimon-flink-cdc/src/main/java/org/apache/paimon/flink/sink/cdc/CdcDynamicBucketSink.java
+++ b/paimon-flink/paimon-flink-cdc/src/main/java/org/apache/paimon/flink/sink/cdc/CdcDynamicBucketSink.java
@@ -41,8 +41,8 @@ public class CdcDynamicBucketSink extends DynamicBucketSink<CdcRecord> {
     }
 
     @Override
-    protected ChannelComputer<CdcRecord> channelComputer1() {
-        return new CdcHashKeyChannelComputer(table.schema());
+    protected ChannelComputer<CdcRecord> assignerChannelComputer(Integer numAssigners) {
+        return new CdcAssignerChannelComputer(table.schema(), numAssigners);
     }
 
     @Override
diff --git a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/sink/DynamicBucketCompactSink.java b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/sink/DynamicBucketCompactSink.java
index 11e21eed6..009dce0ed 100644
--- a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/sink/DynamicBucketCompactSink.java
+++ b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/sink/DynamicBucketCompactSink.java
@@ -50,7 +50,7 @@ public class DynamicBucketCompactSink extends RowDynamicBucketSink {
         // bucket-assigner
         HashBucketAssignerOperator<InternalRow> assignerOperator =
                 new HashBucketAssignerOperator<>(
-                        initialCommitUser, table, extractorFunction(), true);
+                        initialCommitUser, table, null, extractorFunction(), true);
         TupleTypeInfo<Tuple2<InternalRow, Integer>> rowWithBucketType =
                 new TupleTypeInfo<>(input.getType(), BasicTypeInfo.INT_TYPE_INFO);
         DataStream<Tuple2<InternalRow, Integer>> bucketAssigned =
diff --git a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/sink/DynamicBucketSink.java b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/sink/DynamicBucketSink.java
index 5efa87356..5ef2ebe7b 100644
--- a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/sink/DynamicBucketSink.java
+++ b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/sink/DynamicBucketSink.java
@@ -46,7 +46,7 @@ public abstract class DynamicBucketSink<T> extends FlinkWriteSink<Tuple2<T, Inte
         super(table, overwritePartition);
     }
 
-    protected abstract ChannelComputer<T> channelComputer1();
+    protected abstract ChannelComputer<T> assignerChannelComputer(Integer numAssigners);
 
     protected abstract ChannelComputer<Tuple2<T, Integer>> channelComputer2();
 
@@ -58,21 +58,22 @@ public abstract class DynamicBucketSink<T> extends FlinkWriteSink<Tuple2<T, Inte
 
         // Topology:
         // input -- shuffle by key hash --> bucket-assigner -- shuffle by partition & bucket -->
-        // writer -->
-        // committer
+        // writer --> committer
 
         // 1. shuffle by key hash
         Integer assignerParallelism = table.coreOptions().dynamicBucketAssignerParallelism();
         if (assignerParallelism == null) {
             assignerParallelism = parallelism;
         }
+
+        Integer numAssigners = table.coreOptions().dynamicBucketInitialBuckets();
         DataStream<T> partitionByKeyHash =
-                partition(input, channelComputer1(), assignerParallelism);
+                partition(input, assignerChannelComputer(numAssigners), assignerParallelism);
 
         // 2. bucket-assigner
         HashBucketAssignerOperator<T> assignerOperator =
                 new HashBucketAssignerOperator<>(
-                        initialCommitUser, table, extractorFunction(), false);
+                        initialCommitUser, table, numAssigners, extractorFunction(), false);
         TupleTypeInfo<Tuple2<T, Integer>> rowWithBucketType =
                 new TupleTypeInfo<>(partitionByKeyHash.getType(), BasicTypeInfo.INT_TYPE_INFO);
         DataStream<Tuple2<T, Integer>> bucketAssigned =
diff --git a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/sink/HashBucketAssignerOperator.java b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/sink/HashBucketAssignerOperator.java
index eb7934418..b0b9cf2b9 100644
--- a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/sink/HashBucketAssignerOperator.java
+++ b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/sink/HashBucketAssignerOperator.java
@@ -25,6 +25,7 @@ import org.apache.paimon.schema.TableSchema;
 import org.apache.paimon.table.AbstractFileStoreTable;
 import org.apache.paimon.table.Table;
 import org.apache.paimon.table.sink.PartitionKeyExtractor;
+import org.apache.paimon.utils.MathUtils;
 import org.apache.paimon.utils.SerializableFunction;
 
 import org.apache.flink.api.java.tuple.Tuple2;
@@ -42,6 +43,7 @@ public class HashBucketAssignerOperator<T> extends AbstractStreamOperator<Tuple2
     private final String initialCommitUser;
 
     private final AbstractFileStoreTable table;
+    private final Integer numAssigners;
     private final SerializableFunction<TableSchema, PartitionKeyExtractor<T>> extractorFunction;
     private final boolean overwrite;
 
@@ -51,10 +53,12 @@ public class HashBucketAssignerOperator<T> extends AbstractStreamOperator<Tuple2
     public HashBucketAssignerOperator(
             String commitUser,
             Table table,
+            Integer numAssigners,
             SerializableFunction<TableSchema, PartitionKeyExtractor<T>> extractorFunction,
             boolean overwrite) {
         this.initialCommitUser = commitUser;
         this.table = (AbstractFileStoreTable) table;
+        this.numAssigners = numAssigners;
         this.extractorFunction = extractorFunction;
         this.overwrite = overwrite;
     }
@@ -70,19 +74,20 @@ public class HashBucketAssignerOperator<T> extends AbstractStreamOperator<Tuple2
                 StateUtils.getSingleValueFromState(
                         context, "commit_user_state", String.class, initialCommitUser);
 
+        int numberTasks = getRuntimeContext().getNumberOfParallelSubtasks();
+        int taskId = getRuntimeContext().getIndexOfThisSubtask();
+        long targetRowNum = table.coreOptions().dynamicBucketTargetRowNum();
         this.assigner =
                 overwrite
-                        ? new SimpleHashBucketAssigner(
-                                getRuntimeContext().getNumberOfParallelSubtasks(),
-                                getRuntimeContext().getIndexOfThisSubtask(),
-                                table.coreOptions().dynamicBucketTargetRowNum())
+                        ? new SimpleHashBucketAssigner(numberTasks, taskId, targetRowNum)
                         : new HashBucketAssigner(
                                 table.snapshotManager(),
                                 commitUser,
                                 table.store().newIndexFileHandler(),
-                                getRuntimeContext().getNumberOfParallelSubtasks(),
-                                getRuntimeContext().getIndexOfThisSubtask(),
-                                table.coreOptions().dynamicBucketTargetRowNum());
+                                numberTasks,
+                                MathUtils.min(numAssigners, numberTasks),
+                                taskId,
+                                targetRowNum);
         this.extractor = extractorFunction.apply(table.schema());
     }
 
diff --git a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/sink/RowHashKeyChannelComputer.java b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/sink/RowAssignerChannelComputer.java
similarity index 71%
rename from paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/sink/RowHashKeyChannelComputer.java
rename to paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/sink/RowAssignerChannelComputer.java
index e0c131e9b..da004af24 100644
--- a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/sink/RowHashKeyChannelComputer.java
+++ b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/sink/RowAssignerChannelComputer.java
@@ -21,33 +21,40 @@ package org.apache.paimon.flink.sink;
 import org.apache.paimon.data.InternalRow;
 import org.apache.paimon.schema.TableSchema;
 import org.apache.paimon.table.sink.RowPartitionKeyExtractor;
+import org.apache.paimon.utils.MathUtils;
 
 import org.apache.flink.table.data.RowData;
 
+import static org.apache.paimon.index.BucketAssigner.computeAssigner;
+
 /** Hash key of a {@link RowData}. */
-public class RowHashKeyChannelComputer implements ChannelComputer<InternalRow> {
+public class RowAssignerChannelComputer implements ChannelComputer<InternalRow> {
 
     private static final long serialVersionUID = 1L;
 
     private final TableSchema schema;
+    private Integer numAssigners;
 
     private transient int numChannels;
     private transient RowPartitionKeyExtractor extractor;
 
-    public RowHashKeyChannelComputer(TableSchema schema) {
+    public RowAssignerChannelComputer(TableSchema schema, Integer numAssigners) {
         this.schema = schema;
+        this.numAssigners = numAssigners;
     }
 
     @Override
     public void setup(int numChannels) {
         this.numChannels = numChannels;
+        this.numAssigners = MathUtils.min(numAssigners, numChannels);
         this.extractor = new RowPartitionKeyExtractor(schema);
     }
 
     @Override
     public int channel(InternalRow record) {
-        int hash = extractor.trimmedPrimaryKey(record).hashCode();
-        return Math.abs(hash % numChannels);
+        int partitionHash = extractor.partition(record).hashCode();
+        int keyHash = extractor.trimmedPrimaryKey(record).hashCode();
+        return computeAssigner(partitionHash, keyHash, numChannels, numAssigners);
     }
 
     @Override
diff --git a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/sink/RowDynamicBucketSink.java b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/sink/RowDynamicBucketSink.java
index 077fec313..d4917fb90 100644
--- a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/sink/RowDynamicBucketSink.java
+++ b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/sink/RowDynamicBucketSink.java
@@ -43,8 +43,8 @@ public class RowDynamicBucketSink extends DynamicBucketSink<InternalRow> {
     }
 
     @Override
-    protected ChannelComputer<InternalRow> channelComputer1() {
-        return new RowHashKeyChannelComputer(table.schema());
+    protected ChannelComputer<InternalRow> assignerChannelComputer(Integer numAssigners) {
+        return new RowAssignerChannelComputer(table.schema(), numAssigners);
     }
 
     @Override
diff --git a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/sink/index/GlobalDynamicBucketSink.java b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/sink/index/GlobalDynamicBucketSink.java
index 99b78936b..ccab48f0a 100644
--- a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/sink/index/GlobalDynamicBucketSink.java
+++ b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/sink/index/GlobalDynamicBucketSink.java
@@ -31,6 +31,7 @@ import org.apache.paimon.flink.utils.InternalTypeInfo;
 import org.apache.paimon.schema.TableSchema;
 import org.apache.paimon.table.FileStoreTable;
 import org.apache.paimon.types.RowType;
+import org.apache.paimon.utils.MathUtils;
 
 import org.apache.flink.api.common.typeinfo.BasicTypeInfo;
 import org.apache.flink.api.java.tuple.Tuple2;
@@ -93,7 +94,10 @@ public class GlobalDynamicBucketSink extends FlinkWriteSink<Tuple2<InternalRow,
                         .setParallelism(input.getParallelism());
 
         // 1. shuffle by key hash
-        Integer assignerParallelism = options.dynamicBucketAssignerParallelism();
+        Integer assignerParallelism =
+                MathUtils.max(
+                        options.dynamicBucketInitialBuckets(),
+                        options.dynamicBucketAssignerParallelism());
         if (assignerParallelism == null) {
             assignerParallelism = parallelism;
         }
diff --git a/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/DynamicBucketTableITCase.java b/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/DynamicBucketTableITCase.java
index 85e71b731..06aaa23ca 100644
--- a/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/DynamicBucketTableITCase.java
+++ b/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/DynamicBucketTableITCase.java
@@ -81,10 +81,20 @@ public class DynamicBucketTableITCase extends CatalogITCaseBase {
     @Test
     public void testWriteWithAssignerParallelism() {
         sql(
-                "INSERT INTO T /*+ OPTIONS('dynamic-bucket.assigner-parallelism'='3') */ "
+                "INSERT INTO T /*+ OPTIONS('dynamic-bucket.initial-buckets'='3') */ "
                         + "VALUES (1, 1, 1), (1, 2, 2), (1, 3, 3), (1, 4, 4), (1, 5, 5)");
+        // initial-buckets is 3, but parallelism is 2, will use 2
         assertThat(sql("SELECT DISTINCT bucket FROM T$files"))
-                .containsExactlyInAnyOrder(Row.of(0), Row.of(1), Row.of(2));
+                .containsExactlyInAnyOrder(Row.of(0), Row.of(1));
+    }
+
+    @Test
+    public void testWriteWithAssignerParallelism1() {
+        sql(
+                "INSERT INTO T /*+ OPTIONS('dynamic-bucket.initial-buckets'='1') */ "
+                        + "VALUES (1, 1, 1), (1, 2, 2), (1, 3, 3), (1, 4, 4), (1, 5, 5)");
+        assertThat(sql("SELECT DISTINCT bucket FROM T$files"))
+                .containsExactlyInAnyOrder(Row.of(0), Row.of(1));
     }
 
     @Test
diff --git a/paimon-spark/paimon-spark-common/src/main/scala/org/apache/paimon/spark/commands/WriteIntoPaimonTable.scala b/paimon-spark/paimon-spark-common/src/main/scala/org/apache/paimon/spark/commands/WriteIntoPaimonTable.scala
index fe2aeede1..03d27729e 100644
--- a/paimon-spark/paimon-spark-common/src/main/scala/org/apache/paimon/spark/commands/WriteIntoPaimonTable.scala
+++ b/paimon-spark/paimon-spark-common/src/main/scala/org/apache/paimon/spark/commands/WriteIntoPaimonTable.scala
@@ -29,6 +29,7 @@ import org.apache.paimon.spark.util.{EncoderUtils, SparkRowUtils}
 import org.apache.paimon.table.{BucketMode, FileStoreTable}
 import org.apache.paimon.table.sink.{BatchWriteBuilder, CommitMessageSerializer, RowPartitionKeyExtractor}
 import org.apache.paimon.types.RowType
+import org.apache.paimon.utils.MathUtils
 
 import org.apache.spark.TaskContext
 import org.apache.spark.internal.Logging
@@ -106,7 +107,10 @@ case class WriteIntoPaimonTable(
         case BucketMode.DYNAMIC =>
           val partitioned = if (primaryKeyCols.nonEmpty) {
             // Make sure that the records with the same bucket values is within a task.
-            val assignerParallelism = table.coreOptions.dynamicBucketAssignerParallelism
+            // TODO supports decoupling of initialBuckets and assignerParallelism
+            var assignerParallelism = MathUtils.max(
+              table.coreOptions.dynamicBucketInitialBuckets,
+              table.coreOptions.dynamicBucketAssignerParallelism)
             if (assignerParallelism != null) {
               withBucketCol.repartition(assignerParallelism, primaryKeyCols: _*)
             } else {