You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@paimon.apache.org by lz...@apache.org on 2023/12/21 06:11:16 UTC
(incubator-paimon) branch master updated: [core] Optimize HashBucketAssigner to more parallilism with less initialized bucket (#2536)
This is an automated email from the ASF dual-hosted git repository.
lzljs3620320 pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/incubator-paimon.git
The following commit(s) were added to refs/heads/master by this push:
new fdcc24f90 [core] Optimize HashBucketAssigner to more parallilism with less initialized bucket (#2536)
fdcc24f90 is described below
commit fdcc24f90f2762b294ac9ac2270527d6b3de5c95
Author: Jingsong Lee <ji...@gmail.com>
AuthorDate: Thu Dec 21 14:11:12 2023 +0800
[core] Optimize HashBucketAssigner to more parallilism with less initialized bucket (#2536)
---
docs/content/concepts/primary-key-table.md | 2 +-
.../shortcodes/generated/core_configuration.html | 6 ++
.../main/java/org/apache/paimon/CoreOptions.java | 11 ++++
.../java/org/apache/paimon/utils/MathUtils.java | 32 ++++++++++
.../org/apache/paimon/index/BucketAssigner.java | 6 ++
.../apache/paimon/index/HashBucketAssigner.java | 24 +++++---
.../apache/paimon/index/BucketAssignerTest.java | 44 ++++++++++++++
.../paimon/index/HashBucketAssignerTest.java | 69 ++++++++++++++++------
...mputer.java => CdcAssignerChannelComputer.java} | 15 +++--
.../flink/sink/cdc/CdcDynamicBucketSink.java | 4 +-
.../flink/sink/DynamicBucketCompactSink.java | 2 +-
.../paimon/flink/sink/DynamicBucketSink.java | 11 ++--
.../flink/sink/HashBucketAssignerOperator.java | 19 +++---
...mputer.java => RowAssignerChannelComputer.java} | 15 +++--
.../paimon/flink/sink/RowDynamicBucketSink.java | 4 +-
.../flink/sink/index/GlobalDynamicBucketSink.java | 6 +-
.../paimon/flink/DynamicBucketTableITCase.java | 14 ++++-
.../spark/commands/WriteIntoPaimonTable.scala | 6 +-
18 files changed, 234 insertions(+), 56 deletions(-)
diff --git a/docs/content/concepts/primary-key-table.md b/docs/content/concepts/primary-key-table.md
index eb3ade035..40cf98ab1 100644
--- a/docs/content/concepts/primary-key-table.md
+++ b/docs/content/concepts/primary-key-table.md
@@ -53,7 +53,7 @@ an index to determine which key corresponds to which bucket.
Paimon will automatically expand the number of buckets.
- Option1: `'dynamic-bucket.target-row-num'`: controls the target row number for one bucket.
-- Option2: `'dynamic-bucket.assigner-parallelism'`: Parallelism of assigner operator, controls the number of initialized bucket.
+- Option2: `'dynamic-bucket.initial-buckets'`: controls the number of initialized bucket.
{{< hint info >}}
Dynamic Bucket only support single write job. Please do not start multiple jobs to write to the same partition
diff --git a/docs/layouts/shortcodes/generated/core_configuration.html b/docs/layouts/shortcodes/generated/core_configuration.html
index 875286150..1ae196721 100644
--- a/docs/layouts/shortcodes/generated/core_configuration.html
+++ b/docs/layouts/shortcodes/generated/core_configuration.html
@@ -140,6 +140,12 @@ under the License.
<td>Boolean</td>
<td>Whether to ignore delete records in deduplicate mode.</td>
</tr>
+ <tr>
+ <td><h5>dynamic-bucket.initial-buckets</h5></td>
+ <td style="word-wrap: break-word;">(none)</td>
+ <td>Integer</td>
+ <td>Initial buckets for a partition in assigner operator for dynamic bucket mode.</td>
+ </tr>
<tr>
<td><h5>dynamic-bucket.assigner-parallelism</h5></td>
<td style="word-wrap: break-word;">(none)</td>
diff --git a/paimon-common/src/main/java/org/apache/paimon/CoreOptions.java b/paimon-common/src/main/java/org/apache/paimon/CoreOptions.java
index e3f410a87..7e757b591 100644
--- a/paimon-common/src/main/java/org/apache/paimon/CoreOptions.java
+++ b/paimon-common/src/main/java/org/apache/paimon/CoreOptions.java
@@ -764,6 +764,13 @@ public class CoreOptions implements Serializable {
"If the bucket is -1, for primary key table, is dynamic bucket mode, "
+ "this option controls the target row number for one bucket.");
+ public static final ConfigOption<Integer> DYNAMIC_BUCKET_INITIAL_BUCKETS =
+ key("dynamic-bucket.initial-buckets")
+ .intType()
+ .noDefaultValue()
+ .withDescription(
+ "Initial buckets for a partition in assigner operator for dynamic bucket mode.");
+
public static final ConfigOption<Integer> DYNAMIC_BUCKET_ASSIGNER_PARALLELISM =
key("dynamic-bucket.assigner-parallelism")
.intType()
@@ -1318,6 +1325,10 @@ public class CoreOptions implements Serializable {
return options.get(SCAN_MANIFEST_PARALLELISM);
}
+ public Integer dynamicBucketInitialBuckets() {
+ return options.get(DYNAMIC_BUCKET_INITIAL_BUCKETS);
+ }
+
public Integer dynamicBucketAssignerParallelism() {
return options.get(DYNAMIC_BUCKET_ASSIGNER_PARALLELISM);
}
diff --git a/paimon-common/src/main/java/org/apache/paimon/utils/MathUtils.java b/paimon-common/src/main/java/org/apache/paimon/utils/MathUtils.java
index 11a5f670e..4bd0ba680 100644
--- a/paimon-common/src/main/java/org/apache/paimon/utils/MathUtils.java
+++ b/paimon-common/src/main/java/org/apache/paimon/utils/MathUtils.java
@@ -51,4 +51,36 @@ public class MathUtils {
}
return 31 - Integer.numberOfLeadingZeros(value);
}
+
+ public static Integer max(Integer v1, Integer v2) {
+ if (v1 == null && v2 == null) {
+ return null;
+ }
+
+ if (v1 != null && v2 == null) {
+ return v1;
+ }
+
+ if (v1 == null) {
+ return v2;
+ }
+
+ return Math.max(v1, v2);
+ }
+
+ public static Integer min(Integer v1, Integer v2) {
+ if (v1 == null && v2 == null) {
+ return null;
+ }
+
+ if (v1 != null && v2 == null) {
+ return v1;
+ }
+
+ if (v1 == null) {
+ return v2;
+ }
+
+ return Math.min(v1, v2);
+ }
}
diff --git a/paimon-core/src/main/java/org/apache/paimon/index/BucketAssigner.java b/paimon-core/src/main/java/org/apache/paimon/index/BucketAssigner.java
index f87d6a16f..cec768558 100644
--- a/paimon-core/src/main/java/org/apache/paimon/index/BucketAssigner.java
+++ b/paimon-core/src/main/java/org/apache/paimon/index/BucketAssigner.java
@@ -26,4 +26,10 @@ public interface BucketAssigner {
int assign(BinaryRow partition, int hash);
void prepareCommit(long commitIdentifier);
+
+ static int computeAssigner(int partitionHash, int keyHash, int numChannels, int numAssigners) {
+ int start = Math.abs(partitionHash % numChannels);
+ int id = Math.abs(keyHash % numAssigners);
+ return (start + id) % numChannels;
+ }
}
diff --git a/paimon-core/src/main/java/org/apache/paimon/index/HashBucketAssigner.java b/paimon-core/src/main/java/org/apache/paimon/index/HashBucketAssigner.java
index 519a495b0..114b8d3df 100644
--- a/paimon-core/src/main/java/org/apache/paimon/index/HashBucketAssigner.java
+++ b/paimon-core/src/main/java/org/apache/paimon/index/HashBucketAssigner.java
@@ -41,6 +41,7 @@ public class HashBucketAssigner implements BucketAssigner {
private final SnapshotManager snapshotManager;
private final String commitUser;
private final IndexFileHandler indexFileHandler;
+ private final int numChannels;
private final int numAssigners;
private final int assignId;
private final long targetBucketRowNumber;
@@ -51,12 +52,14 @@ public class HashBucketAssigner implements BucketAssigner {
SnapshotManager snapshotManager,
String commitUser,
IndexFileHandler indexFileHandler,
+ int numChannels,
int numAssigners,
int assignId,
long targetBucketRowNumber) {
this.snapshotManager = snapshotManager;
this.commitUser = commitUser;
this.indexFileHandler = indexFileHandler;
+ this.numChannels = numChannels;
this.numAssigners = numAssigners;
this.assignId = assignId;
this.targetBucketRowNumber = targetBucketRowNumber;
@@ -66,7 +69,8 @@ public class HashBucketAssigner implements BucketAssigner {
/** Assign a bucket for key hash of a record. */
@Override
public int assign(BinaryRow partition, int hash) {
- int recordAssignId = computeAssignId(hash);
+ int partitionHash = partition.hashCode();
+ int recordAssignId = computeAssignId(partitionHash, hash);
checkArgument(
recordAssignId == assignId,
"This is a bug, record assign id %s should equal to assign id %s.",
@@ -76,11 +80,11 @@ public class HashBucketAssigner implements BucketAssigner {
PartitionIndex index = this.partitionIndex.get(partition);
if (index == null) {
partition = partition.copy();
- index = loadIndex(partition);
+ index = loadIndex(partition, partitionHash);
this.partitionIndex.put(partition, index);
}
- int assigned = index.assign(hash, (bucket) -> computeAssignId(bucket) == assignId);
+ int assigned = index.assign(hash, this::isMyBucket);
if (LOG.isDebugEnabled()) {
LOG.debug(
"Assign " + assigned + " to the partition " + partition + " key hash " + hash);
@@ -150,16 +154,20 @@ public class HashBucketAssigner implements BucketAssigner {
return partitionIndex.keySet();
}
- private int computeAssignId(int hash) {
- return Math.abs(hash % numAssigners);
+ private int computeAssignId(int partitionHash, int keyHash) {
+ return BucketAssigner.computeAssigner(partitionHash, keyHash, numChannels, numAssigners);
}
- private PartitionIndex loadIndex(BinaryRow partition) {
+ private boolean isMyBucket(int bucket) {
+ return bucket % numAssigners == assignId % numAssigners;
+ }
+
+ private PartitionIndex loadIndex(BinaryRow partition, int partitionHash) {
return PartitionIndex.loadIndex(
indexFileHandler,
partition,
targetBucketRowNumber,
- (hash) -> computeAssignId(hash) == assignId,
- (bucket) -> computeAssignId(bucket) == assignId);
+ (hash) -> computeAssignId(partitionHash, hash) == assignId,
+ this::isMyBucket);
}
}
diff --git a/paimon-core/src/test/java/org/apache/paimon/index/BucketAssignerTest.java b/paimon-core/src/test/java/org/apache/paimon/index/BucketAssignerTest.java
new file mode 100644
index 000000000..edc4699f6
--- /dev/null
+++ b/paimon-core/src/test/java/org/apache/paimon/index/BucketAssignerTest.java
@@ -0,0 +1,44 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.paimon.index;
+
+import org.junit.jupiter.api.Test;
+
+import static java.lang.Integer.MAX_VALUE;
+import static org.apache.paimon.index.BucketAssigner.computeAssigner;
+import static org.assertj.core.api.Assertions.assertThat;
+
+class BucketAssignerTest {
+
+ @Test
+ public void testComputeAssigner() {
+ assertThat(computeAssigner(MAX_VALUE, 0, 5, 5)).isEqualTo(2);
+ assertThat(computeAssigner(MAX_VALUE, 1, 5, 5)).isEqualTo(3);
+ assertThat(computeAssigner(MAX_VALUE, 2, 5, 5)).isEqualTo(4);
+ assertThat(computeAssigner(MAX_VALUE, 3, 5, 5)).isEqualTo(0);
+
+ assertThat(computeAssigner(2, 0, 5, 3)).isEqualTo(2);
+ assertThat(computeAssigner(2, 1, 5, 3)).isEqualTo(3);
+ assertThat(computeAssigner(2, 2, 5, 3)).isEqualTo(4);
+ assertThat(computeAssigner(2, 3, 5, 3)).isEqualTo(2);
+
+ assertThat(computeAssigner(3, 1, 5, 1)).isEqualTo(3);
+ assertThat(computeAssigner(3, 2, 5, 1)).isEqualTo(3);
+ }
+}
diff --git a/paimon-core/src/test/java/org/apache/paimon/index/HashBucketAssignerTest.java b/paimon-core/src/test/java/org/apache/paimon/index/HashBucketAssignerTest.java
index 38f2f7008..7bb11260a 100644
--- a/paimon-core/src/test/java/org/apache/paimon/index/HashBucketAssignerTest.java
+++ b/paimon-core/src/test/java/org/apache/paimon/index/HashBucketAssignerTest.java
@@ -55,14 +55,20 @@ public class HashBucketAssignerTest extends PrimaryKeyTableTestBase {
commit.close();
}
- private HashBucketAssigner createAssigner(int numAssigners, int assignId) {
+ private HashBucketAssigner createAssigner(int numChannels, int numAssigners, int assignId) {
return new HashBucketAssigner(
- table.snapshotManager(), commitUser, fileHandler, numAssigners, assignId, 5);
+ table.snapshotManager(),
+ commitUser,
+ fileHandler,
+ numChannels,
+ numAssigners,
+ assignId,
+ 5);
}
@Test
public void testAssign() {
- HashBucketAssigner assigner = createAssigner(2, 0);
+ HashBucketAssigner assigner = createAssigner(2, 2, 0);
// assign
assertThat(assigner.assign(row(1), 0)).isEqualTo(0);
@@ -87,7 +93,7 @@ public class HashBucketAssignerTest extends PrimaryKeyTableTestBase {
@Test
public void testPartitionCopy() {
- HashBucketAssigner assigner = createAssigner(1, 0);
+ HashBucketAssigner assigner = createAssigner(1, 1, 0);
BinaryRow partition = row(1);
assertThat(assigner.assign(partition, 0)).isEqualTo(0);
@@ -113,33 +119,60 @@ public class HashBucketAssignerTest extends PrimaryKeyTableTestBase {
@Test
public void testAssignRestore() {
- IndexFileMeta bucket0 = fileHandler.writeHashIndex(new int[] {0, 2});
- IndexFileMeta bucket1 = fileHandler.writeHashIndex(new int[] {3, 5});
+ IndexFileMeta bucket0 = fileHandler.writeHashIndex(new int[] {2, 5});
+ IndexFileMeta bucket2 = fileHandler.writeHashIndex(new int[] {4, 7});
commit.commit(
0,
Arrays.asList(
createCommitMessage(row(1), 0, bucket0),
- createCommitMessage(row(1), 1, bucket1)));
+ createCommitMessage(row(1), 2, bucket2)));
- HashBucketAssigner assigner0 = createAssigner(3, 0);
- HashBucketAssigner assigner2 = createAssigner(3, 2);
+ HashBucketAssigner assigner0 = createAssigner(3, 3, 0);
+ HashBucketAssigner assigner2 = createAssigner(3, 3, 2);
// read assigned
- assertThat(assigner0.assign(row(1), 0)).isEqualTo(0);
- assertThat(assigner2.assign(row(1), 2)).isEqualTo(0);
- assertThat(assigner0.assign(row(1), 3)).isEqualTo(1);
- assertThat(assigner2.assign(row(1), 5)).isEqualTo(1);
+ assertThat(assigner0.assign(row(1), 2)).isEqualTo(0);
+ assertThat(assigner2.assign(row(1), 4)).isEqualTo(2);
+ assertThat(assigner0.assign(row(1), 5)).isEqualTo(0);
+ assertThat(assigner2.assign(row(1), 7)).isEqualTo(2);
// new assign
- assertThat(assigner0.assign(row(1), 6)).isEqualTo(0);
- assertThat(assigner0.assign(row(1), 9)).isEqualTo(0);
- assertThat(assigner0.assign(row(1), 12)).isEqualTo(0);
- assertThat(assigner0.assign(row(1), 15)).isEqualTo(3);
+ assertThat(assigner0.assign(row(1), 8)).isEqualTo(0);
+ assertThat(assigner0.assign(row(1), 11)).isEqualTo(0);
+ assertThat(assigner0.assign(row(1), 14)).isEqualTo(0);
+ assertThat(assigner0.assign(row(1), 17)).isEqualTo(3);
+ }
+
+ @Test
+ public void testAssignDecoupled() {
+ HashBucketAssigner assigner1 = createAssigner(3, 2, 1);
+ assertThat(assigner1.assign(row(1), 0)).isEqualTo(1);
+ assertThat(assigner1.assign(row(1), 2)).isEqualTo(1);
+ assertThat(assigner1.assign(row(1), 4)).isEqualTo(1);
+ assertThat(assigner1.assign(row(1), 6)).isEqualTo(1);
+ assertThat(assigner1.assign(row(1), 8)).isEqualTo(1);
+ assertThat(assigner1.assign(row(1), 10)).isEqualTo(3);
+
+ HashBucketAssigner assigner2 = createAssigner(3, 2, 2);
+ assertThat(assigner2.assign(row(1), 1)).isEqualTo(0);
+ assertThat(assigner2.assign(row(1), 3)).isEqualTo(0);
+ assertThat(assigner2.assign(row(1), 5)).isEqualTo(0);
+ assertThat(assigner2.assign(row(1), 7)).isEqualTo(0);
+ assertThat(assigner2.assign(row(1), 9)).isEqualTo(0);
+ assertThat(assigner2.assign(row(1), 11)).isEqualTo(2);
+
+ HashBucketAssigner assigner0 = createAssigner(3, 2, 0);
+ assertThat(assigner0.assign(row(2), 1)).isEqualTo(0);
+ assertThat(assigner0.assign(row(2), 3)).isEqualTo(0);
+ assertThat(assigner0.assign(row(2), 5)).isEqualTo(0);
+ assertThat(assigner0.assign(row(2), 7)).isEqualTo(0);
+ assertThat(assigner0.assign(row(2), 9)).isEqualTo(0);
+ assertThat(assigner0.assign(row(2), 11)).isEqualTo(2);
}
@Test
public void testIndexEliminate() {
- HashBucketAssigner assigner = createAssigner(1, 0);
+ HashBucketAssigner assigner = createAssigner(1, 1, 0);
// checkpoint 0
assertThat(assigner.assign(row(1), 0)).isEqualTo(0);
diff --git a/paimon-flink/paimon-flink-cdc/src/main/java/org/apache/paimon/flink/sink/cdc/CdcHashKeyChannelComputer.java b/paimon-flink/paimon-flink-cdc/src/main/java/org/apache/paimon/flink/sink/cdc/CdcAssignerChannelComputer.java
similarity index 71%
rename from paimon-flink/paimon-flink-cdc/src/main/java/org/apache/paimon/flink/sink/cdc/CdcHashKeyChannelComputer.java
rename to paimon-flink/paimon-flink-cdc/src/main/java/org/apache/paimon/flink/sink/cdc/CdcAssignerChannelComputer.java
index 918233749..93cc31b47 100644
--- a/paimon-flink/paimon-flink-cdc/src/main/java/org/apache/paimon/flink/sink/cdc/CdcHashKeyChannelComputer.java
+++ b/paimon-flink/paimon-flink-cdc/src/main/java/org/apache/paimon/flink/sink/cdc/CdcAssignerChannelComputer.java
@@ -20,32 +20,39 @@ package org.apache.paimon.flink.sink.cdc;
import org.apache.paimon.flink.sink.ChannelComputer;
import org.apache.paimon.schema.TableSchema;
+import org.apache.paimon.utils.MathUtils;
+
+import static org.apache.paimon.index.BucketAssigner.computeAssigner;
/** Hash key of a {@link CdcRecord}. */
-public class CdcHashKeyChannelComputer implements ChannelComputer<CdcRecord> {
+public class CdcAssignerChannelComputer implements ChannelComputer<CdcRecord> {
private static final long serialVersionUID = 1L;
private final TableSchema schema;
+ private Integer numAssigners;
private transient int numChannels;
private transient CdcRecordKeyAndBucketExtractor extractor;
- public CdcHashKeyChannelComputer(TableSchema schema) {
+ public CdcAssignerChannelComputer(TableSchema schema, Integer numAssigners) {
this.schema = schema;
+ this.numAssigners = numAssigners;
}
@Override
public void setup(int numChannels) {
this.numChannels = numChannels;
+ this.numAssigners = MathUtils.min(numAssigners, numChannels);
this.extractor = new CdcRecordKeyAndBucketExtractor(schema);
}
@Override
public int channel(CdcRecord record) {
extractor.setRecord(record);
- int hash = extractor.trimmedPrimaryKey().hashCode();
- return Math.abs(hash % numChannels);
+ int partitionHash = extractor.partition().hashCode();
+ int keyHash = extractor.trimmedPrimaryKey().hashCode();
+ return computeAssigner(partitionHash, keyHash, numChannels, numAssigners);
}
@Override
diff --git a/paimon-flink/paimon-flink-cdc/src/main/java/org/apache/paimon/flink/sink/cdc/CdcDynamicBucketSink.java b/paimon-flink/paimon-flink-cdc/src/main/java/org/apache/paimon/flink/sink/cdc/CdcDynamicBucketSink.java
index ecde6094c..bf4acf979 100644
--- a/paimon-flink/paimon-flink-cdc/src/main/java/org/apache/paimon/flink/sink/cdc/CdcDynamicBucketSink.java
+++ b/paimon-flink/paimon-flink-cdc/src/main/java/org/apache/paimon/flink/sink/cdc/CdcDynamicBucketSink.java
@@ -41,8 +41,8 @@ public class CdcDynamicBucketSink extends DynamicBucketSink<CdcRecord> {
}
@Override
- protected ChannelComputer<CdcRecord> channelComputer1() {
- return new CdcHashKeyChannelComputer(table.schema());
+ protected ChannelComputer<CdcRecord> assignerChannelComputer(Integer numAssigners) {
+ return new CdcAssignerChannelComputer(table.schema(), numAssigners);
}
@Override
diff --git a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/sink/DynamicBucketCompactSink.java b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/sink/DynamicBucketCompactSink.java
index 11e21eed6..009dce0ed 100644
--- a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/sink/DynamicBucketCompactSink.java
+++ b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/sink/DynamicBucketCompactSink.java
@@ -50,7 +50,7 @@ public class DynamicBucketCompactSink extends RowDynamicBucketSink {
// bucket-assigner
HashBucketAssignerOperator<InternalRow> assignerOperator =
new HashBucketAssignerOperator<>(
- initialCommitUser, table, extractorFunction(), true);
+ initialCommitUser, table, null, extractorFunction(), true);
TupleTypeInfo<Tuple2<InternalRow, Integer>> rowWithBucketType =
new TupleTypeInfo<>(input.getType(), BasicTypeInfo.INT_TYPE_INFO);
DataStream<Tuple2<InternalRow, Integer>> bucketAssigned =
diff --git a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/sink/DynamicBucketSink.java b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/sink/DynamicBucketSink.java
index 5efa87356..5ef2ebe7b 100644
--- a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/sink/DynamicBucketSink.java
+++ b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/sink/DynamicBucketSink.java
@@ -46,7 +46,7 @@ public abstract class DynamicBucketSink<T> extends FlinkWriteSink<Tuple2<T, Inte
super(table, overwritePartition);
}
- protected abstract ChannelComputer<T> channelComputer1();
+ protected abstract ChannelComputer<T> assignerChannelComputer(Integer numAssigners);
protected abstract ChannelComputer<Tuple2<T, Integer>> channelComputer2();
@@ -58,21 +58,22 @@ public abstract class DynamicBucketSink<T> extends FlinkWriteSink<Tuple2<T, Inte
// Topology:
// input -- shuffle by key hash --> bucket-assigner -- shuffle by partition & bucket -->
- // writer -->
- // committer
+ // writer --> committer
// 1. shuffle by key hash
Integer assignerParallelism = table.coreOptions().dynamicBucketAssignerParallelism();
if (assignerParallelism == null) {
assignerParallelism = parallelism;
}
+
+ Integer numAssigners = table.coreOptions().dynamicBucketInitialBuckets();
DataStream<T> partitionByKeyHash =
- partition(input, channelComputer1(), assignerParallelism);
+ partition(input, assignerChannelComputer(numAssigners), assignerParallelism);
// 2. bucket-assigner
HashBucketAssignerOperator<T> assignerOperator =
new HashBucketAssignerOperator<>(
- initialCommitUser, table, extractorFunction(), false);
+ initialCommitUser, table, numAssigners, extractorFunction(), false);
TupleTypeInfo<Tuple2<T, Integer>> rowWithBucketType =
new TupleTypeInfo<>(partitionByKeyHash.getType(), BasicTypeInfo.INT_TYPE_INFO);
DataStream<Tuple2<T, Integer>> bucketAssigned =
diff --git a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/sink/HashBucketAssignerOperator.java b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/sink/HashBucketAssignerOperator.java
index eb7934418..b0b9cf2b9 100644
--- a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/sink/HashBucketAssignerOperator.java
+++ b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/sink/HashBucketAssignerOperator.java
@@ -25,6 +25,7 @@ import org.apache.paimon.schema.TableSchema;
import org.apache.paimon.table.AbstractFileStoreTable;
import org.apache.paimon.table.Table;
import org.apache.paimon.table.sink.PartitionKeyExtractor;
+import org.apache.paimon.utils.MathUtils;
import org.apache.paimon.utils.SerializableFunction;
import org.apache.flink.api.java.tuple.Tuple2;
@@ -42,6 +43,7 @@ public class HashBucketAssignerOperator<T> extends AbstractStreamOperator<Tuple2
private final String initialCommitUser;
private final AbstractFileStoreTable table;
+ private final Integer numAssigners;
private final SerializableFunction<TableSchema, PartitionKeyExtractor<T>> extractorFunction;
private final boolean overwrite;
@@ -51,10 +53,12 @@ public class HashBucketAssignerOperator<T> extends AbstractStreamOperator<Tuple2
public HashBucketAssignerOperator(
String commitUser,
Table table,
+ Integer numAssigners,
SerializableFunction<TableSchema, PartitionKeyExtractor<T>> extractorFunction,
boolean overwrite) {
this.initialCommitUser = commitUser;
this.table = (AbstractFileStoreTable) table;
+ this.numAssigners = numAssigners;
this.extractorFunction = extractorFunction;
this.overwrite = overwrite;
}
@@ -70,19 +74,20 @@ public class HashBucketAssignerOperator<T> extends AbstractStreamOperator<Tuple2
StateUtils.getSingleValueFromState(
context, "commit_user_state", String.class, initialCommitUser);
+ int numberTasks = getRuntimeContext().getNumberOfParallelSubtasks();
+ int taskId = getRuntimeContext().getIndexOfThisSubtask();
+ long targetRowNum = table.coreOptions().dynamicBucketTargetRowNum();
this.assigner =
overwrite
- ? new SimpleHashBucketAssigner(
- getRuntimeContext().getNumberOfParallelSubtasks(),
- getRuntimeContext().getIndexOfThisSubtask(),
- table.coreOptions().dynamicBucketTargetRowNum())
+ ? new SimpleHashBucketAssigner(numberTasks, taskId, targetRowNum)
: new HashBucketAssigner(
table.snapshotManager(),
commitUser,
table.store().newIndexFileHandler(),
- getRuntimeContext().getNumberOfParallelSubtasks(),
- getRuntimeContext().getIndexOfThisSubtask(),
- table.coreOptions().dynamicBucketTargetRowNum());
+ numberTasks,
+ MathUtils.min(numAssigners, numberTasks),
+ taskId,
+ targetRowNum);
this.extractor = extractorFunction.apply(table.schema());
}
diff --git a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/sink/RowHashKeyChannelComputer.java b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/sink/RowAssignerChannelComputer.java
similarity index 71%
rename from paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/sink/RowHashKeyChannelComputer.java
rename to paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/sink/RowAssignerChannelComputer.java
index e0c131e9b..da004af24 100644
--- a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/sink/RowHashKeyChannelComputer.java
+++ b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/sink/RowAssignerChannelComputer.java
@@ -21,33 +21,40 @@ package org.apache.paimon.flink.sink;
import org.apache.paimon.data.InternalRow;
import org.apache.paimon.schema.TableSchema;
import org.apache.paimon.table.sink.RowPartitionKeyExtractor;
+import org.apache.paimon.utils.MathUtils;
import org.apache.flink.table.data.RowData;
+import static org.apache.paimon.index.BucketAssigner.computeAssigner;
+
/** Hash key of a {@link RowData}. */
-public class RowHashKeyChannelComputer implements ChannelComputer<InternalRow> {
+public class RowAssignerChannelComputer implements ChannelComputer<InternalRow> {
private static final long serialVersionUID = 1L;
private final TableSchema schema;
+ private Integer numAssigners;
private transient int numChannels;
private transient RowPartitionKeyExtractor extractor;
- public RowHashKeyChannelComputer(TableSchema schema) {
+ public RowAssignerChannelComputer(TableSchema schema, Integer numAssigners) {
this.schema = schema;
+ this.numAssigners = numAssigners;
}
@Override
public void setup(int numChannels) {
this.numChannels = numChannels;
+ this.numAssigners = MathUtils.min(numAssigners, numChannels);
this.extractor = new RowPartitionKeyExtractor(schema);
}
@Override
public int channel(InternalRow record) {
- int hash = extractor.trimmedPrimaryKey(record).hashCode();
- return Math.abs(hash % numChannels);
+ int partitionHash = extractor.partition(record).hashCode();
+ int keyHash = extractor.trimmedPrimaryKey(record).hashCode();
+ return computeAssigner(partitionHash, keyHash, numChannels, numAssigners);
}
@Override
diff --git a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/sink/RowDynamicBucketSink.java b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/sink/RowDynamicBucketSink.java
index 077fec313..d4917fb90 100644
--- a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/sink/RowDynamicBucketSink.java
+++ b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/sink/RowDynamicBucketSink.java
@@ -43,8 +43,8 @@ public class RowDynamicBucketSink extends DynamicBucketSink<InternalRow> {
}
@Override
- protected ChannelComputer<InternalRow> channelComputer1() {
- return new RowHashKeyChannelComputer(table.schema());
+ protected ChannelComputer<InternalRow> assignerChannelComputer(Integer numAssigners) {
+ return new RowAssignerChannelComputer(table.schema(), numAssigners);
}
@Override
diff --git a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/sink/index/GlobalDynamicBucketSink.java b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/sink/index/GlobalDynamicBucketSink.java
index 99b78936b..ccab48f0a 100644
--- a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/sink/index/GlobalDynamicBucketSink.java
+++ b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/sink/index/GlobalDynamicBucketSink.java
@@ -31,6 +31,7 @@ import org.apache.paimon.flink.utils.InternalTypeInfo;
import org.apache.paimon.schema.TableSchema;
import org.apache.paimon.table.FileStoreTable;
import org.apache.paimon.types.RowType;
+import org.apache.paimon.utils.MathUtils;
import org.apache.flink.api.common.typeinfo.BasicTypeInfo;
import org.apache.flink.api.java.tuple.Tuple2;
@@ -93,7 +94,10 @@ public class GlobalDynamicBucketSink extends FlinkWriteSink<Tuple2<InternalRow,
.setParallelism(input.getParallelism());
// 1. shuffle by key hash
- Integer assignerParallelism = options.dynamicBucketAssignerParallelism();
+ Integer assignerParallelism =
+ MathUtils.max(
+ options.dynamicBucketInitialBuckets(),
+ options.dynamicBucketAssignerParallelism());
if (assignerParallelism == null) {
assignerParallelism = parallelism;
}
diff --git a/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/DynamicBucketTableITCase.java b/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/DynamicBucketTableITCase.java
index 85e71b731..06aaa23ca 100644
--- a/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/DynamicBucketTableITCase.java
+++ b/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/DynamicBucketTableITCase.java
@@ -81,10 +81,20 @@ public class DynamicBucketTableITCase extends CatalogITCaseBase {
@Test
public void testWriteWithAssignerParallelism() {
sql(
- "INSERT INTO T /*+ OPTIONS('dynamic-bucket.assigner-parallelism'='3') */ "
+ "INSERT INTO T /*+ OPTIONS('dynamic-bucket.initial-buckets'='3') */ "
+ "VALUES (1, 1, 1), (1, 2, 2), (1, 3, 3), (1, 4, 4), (1, 5, 5)");
+ // initial-buckets is 3, but parallelism is 2, will use 2
assertThat(sql("SELECT DISTINCT bucket FROM T$files"))
- .containsExactlyInAnyOrder(Row.of(0), Row.of(1), Row.of(2));
+ .containsExactlyInAnyOrder(Row.of(0), Row.of(1));
+ }
+
+ @Test
+ public void testWriteWithAssignerParallelism1() {
+ sql(
+ "INSERT INTO T /*+ OPTIONS('dynamic-bucket.initial-buckets'='1') */ "
+ + "VALUES (1, 1, 1), (1, 2, 2), (1, 3, 3), (1, 4, 4), (1, 5, 5)");
+ assertThat(sql("SELECT DISTINCT bucket FROM T$files"))
+ .containsExactlyInAnyOrder(Row.of(0), Row.of(1));
}
@Test
diff --git a/paimon-spark/paimon-spark-common/src/main/scala/org/apache/paimon/spark/commands/WriteIntoPaimonTable.scala b/paimon-spark/paimon-spark-common/src/main/scala/org/apache/paimon/spark/commands/WriteIntoPaimonTable.scala
index fe2aeede1..03d27729e 100644
--- a/paimon-spark/paimon-spark-common/src/main/scala/org/apache/paimon/spark/commands/WriteIntoPaimonTable.scala
+++ b/paimon-spark/paimon-spark-common/src/main/scala/org/apache/paimon/spark/commands/WriteIntoPaimonTable.scala
@@ -29,6 +29,7 @@ import org.apache.paimon.spark.util.{EncoderUtils, SparkRowUtils}
import org.apache.paimon.table.{BucketMode, FileStoreTable}
import org.apache.paimon.table.sink.{BatchWriteBuilder, CommitMessageSerializer, RowPartitionKeyExtractor}
import org.apache.paimon.types.RowType
+import org.apache.paimon.utils.MathUtils
import org.apache.spark.TaskContext
import org.apache.spark.internal.Logging
@@ -106,7 +107,10 @@ case class WriteIntoPaimonTable(
case BucketMode.DYNAMIC =>
val partitioned = if (primaryKeyCols.nonEmpty) {
// Make sure that the records with the same bucket values is within a task.
- val assignerParallelism = table.coreOptions.dynamicBucketAssignerParallelism
+ // TODO supports decoupling of initialBuckets and assignerParallelism
+ var assignerParallelism = MathUtils.max(
+ table.coreOptions.dynamicBucketInitialBuckets,
+ table.coreOptions.dynamicBucketAssignerParallelism)
if (assignerParallelism != null) {
withBucketCol.repartition(assignerParallelism, primaryKeyCols: _*)
} else {