You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@paimon.apache.org by lz...@apache.org on 2023/03/31 06:46:10 UTC
[incubator-paimon] branch master updated: [flink] Assign batch splits for ContinuousFileSplitEnumerator (#770)
This is an automated email from the ASF dual-hosted git repository.
lzljs3620320 pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/incubator-paimon.git
The following commit(s) were added to refs/heads/master by this push:
new 07f24d9f4 [flink] Assign batch splits for ContinuousFileSplitEnumerator (#770)
07f24d9f4 is described below
commit 07f24d9f46a4af306c12839e28e8b34afca257dd
Author: 吴祥平 <40...@qq.com>
AuthorDate: Fri Mar 31 14:46:05 2023 +0800
[flink] Assign batch splits for ContinuousFileSplitEnumerator (#770)
---
.../source/ContinuousFileSplitEnumerator.java | 13 ++++--
.../flink/source/ContinuousFileStoreSource.java | 8 +++-
.../source/ContinuousFileSplitEnumeratorTest.java | 48 +++++++++++++++++++++-
3 files changed, 63 insertions(+), 6 deletions(-)
diff --git a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/source/ContinuousFileSplitEnumerator.java b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/source/ContinuousFileSplitEnumerator.java
index 771c5b57a..166c2acca 100644
--- a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/source/ContinuousFileSplitEnumerator.java
+++ b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/source/ContinuousFileSplitEnumerator.java
@@ -64,6 +64,9 @@ public class ContinuousFileSplitEnumerator
private final StreamTableScan scan;
+ /** Default batch splits size to avoid exceed `akka.framesize`. */
+ private final int splitBatchSize;
+
@Nullable private Long nextSnapshotId;
private boolean finished = false;
@@ -73,6 +76,7 @@ public class ContinuousFileSplitEnumerator
Collection<FileStoreSourceSplit> remainSplits,
@Nullable Long nextSnapshotId,
long discoveryInterval,
+ int splitBatchSize,
StreamTableScan scan) {
checkArgument(discoveryInterval > 0L);
this.context = checkNotNull(context);
@@ -80,6 +84,7 @@ public class ContinuousFileSplitEnumerator
addSplits(remainSplits);
this.nextSnapshotId = nextSnapshotId;
this.discoveryInterval = discoveryInterval;
+ this.splitBatchSize = splitBatchSize;
this.readersAwaitingSplit = new HashSet<>();
this.splitGenerator = new FileStoreSourceSplitGenerator();
this.scan = scan;
@@ -205,9 +210,11 @@ public class ContinuousFileSplitEnumerator
readersAwaitingSplit.remove(task);
return;
}
- assignment
- .computeIfAbsent(task, i -> new ArrayList<>())
- .add(splits.poll());
+ List<FileStoreSourceSplit> taskAssignment =
+ assignment.computeIfAbsent(task, i -> new ArrayList<>());
+ if (taskAssignment.size() < splitBatchSize) {
+ taskAssignment.add(splits.poll());
+ }
}
}
});
diff --git a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/source/ContinuousFileStoreSource.java b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/source/ContinuousFileStoreSource.java
index 09abb116d..e9c14ca15 100644
--- a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/source/ContinuousFileStoreSource.java
+++ b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/source/ContinuousFileStoreSource.java
@@ -19,6 +19,7 @@
package org.apache.paimon.flink.source;
import org.apache.paimon.CoreOptions;
+import org.apache.paimon.flink.FlinkConnectorOptions;
import org.apache.paimon.flink.utils.TableScanUtils;
import org.apache.paimon.options.Options;
import org.apache.paimon.table.source.ReadBuilder;
@@ -75,12 +76,15 @@ public class ContinuousFileStoreSource extends FlinkSource {
nextSnapshotId = checkpoint.currentSnapshotId();
splits = checkpoint.splits();
}
-
+ CoreOptions coreOptions = CoreOptions.fromMap(options);
return new ContinuousFileSplitEnumerator(
context,
splits,
nextSnapshotId,
- CoreOptions.fromMap(options).continuousDiscoveryInterval().toMillis(),
+ coreOptions.continuousDiscoveryInterval().toMillis(),
+ coreOptions
+ .toConfiguration()
+ .get(FlinkConnectorOptions.SCAN_SPLIT_ENUMERATOR_BATCH_SIZE),
scanFactory.create(readBuilder, nextSnapshotId));
}
diff --git a/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/source/ContinuousFileSplitEnumeratorTest.java b/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/source/ContinuousFileSplitEnumeratorTest.java
index a06cd198e..1c9ea6b71 100644
--- a/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/source/ContinuousFileSplitEnumeratorTest.java
+++ b/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/source/ContinuousFileSplitEnumeratorTest.java
@@ -104,6 +104,45 @@ public class ContinuousFileSplitEnumeratorTest {
assertThat(assignedSplits).hasSameElementsAs(expectedSplits.subList(2, 4));
}
+ @Test
+ public void testSplitWithBatch() {
+ final TestingSplitEnumeratorContext<FileStoreSourceSplit> context =
+ new TestingSplitEnumeratorContext<>(1);
+ context.registerReader(0, "test-host");
+
+ List<FileStoreSourceSplit> initialSplits = new ArrayList<>();
+ for (int i = 1; i <= 18; i++) {
+ initialSplits.add(createSnapshotSplit(i, i, Collections.emptyList()));
+ }
+ final ContinuousFileSplitEnumerator enumerator =
+ new Builder()
+ .setSplitEnumeratorContext(context)
+ .setInitialSplits(initialSplits)
+ .setDiscoveryInterval(3)
+ .setSplitBatchSize(10)
+ .build();
+
+ // The first time split is allocated, split1 and split2 should be allocated
+ enumerator.handleSplitRequest(0, "test-host");
+ Map<Integer, SplitAssignmentState<FileStoreSourceSplit>> assignments =
+ context.getSplitAssignments();
+ // Only subtask-0 is allocated.
+ assertThat(assignments).containsOnlyKeys(0);
+ assertThat(assignments.get(0).getAssignedSplits()).hasSize(10);
+
+ // test second batch assign
+ enumerator.handleSplitRequest(0, "test-host");
+
+ assertThat(assignments).containsOnlyKeys(0);
+ assertThat(assignments.get(0).getAssignedSplits()).hasSize(18);
+
+ // test third batch assign
+ enumerator.handleSplitRequest(0, "test-host");
+
+ assertThat(assignments).containsOnlyKeys(0);
+ assertThat(assignments.get(0).getAssignedSplits()).hasSize(18);
+ }
+
@Test
public void testSplitAllocationIsFair() {
final TestingSplitEnumeratorContext<FileStoreSourceSplit> context =
@@ -227,6 +266,8 @@ public class ContinuousFileSplitEnumeratorTest {
private SplitEnumeratorContext<FileStoreSourceSplit> context;
private Collection<FileStoreSourceSplit> initialSplits = Collections.emptyList();
private long discoveryInterval = Long.MAX_VALUE;
+
+ private int splitBatchSize = 10;
private StreamDataTableScan scan;
public Builder setSplitEnumeratorContext(
@@ -245,6 +286,11 @@ public class ContinuousFileSplitEnumeratorTest {
return this;
}
+ public Builder setSplitBatchSize(int splitBatchSize) {
+ this.splitBatchSize = splitBatchSize;
+ return this;
+ }
+
public Builder setScan(StreamDataTableScan scan) {
this.scan = scan;
return this;
@@ -252,7 +298,7 @@ public class ContinuousFileSplitEnumeratorTest {
public ContinuousFileSplitEnumerator build() {
return new ContinuousFileSplitEnumerator(
- context, initialSplits, null, discoveryInterval, scan);
+ context, initialSplits, null, discoveryInterval, splitBatchSize, scan);
}
}