You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@paimon.apache.org by lz...@apache.org on 2023/03/24 02:41:02 UTC
[incubator-paimon] branch master updated: [flink] Assign splits with fixed batch size in StaticFileStoreSplitEnumerator (#687)
This is an automated email from the ASF dual-hosted git repository.
lzljs3620320 pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/incubator-paimon.git
The following commit(s) were added to refs/heads/master by this push:
new 20ce9665b [flink] Assign splits with fixed batch size in StaticFileStoreSplitEnumerator (#687)
20ce9665b is described below
commit 20ce9665b689615653f8170916bd4f286a8de09f
Author: 吴祥平 <40...@qq.com>
AuthorDate: Fri Mar 24 10:40:57 2023 +0800
[flink] Assign splits with fixed batch size in StaticFileStoreSplitEnumerator (#687)
---
.../generated/flink_connector_configuration.html | 6 +++
.../paimon/flink/AbstractFlinkTableFactory.java | 6 ++-
.../apache/paimon/flink/FlinkConnectorOptions.java | 8 ++++
.../paimon/flink/source/SimpleSystemSource.java | 6 ++-
.../paimon/flink/source/StaticFileStoreSource.java | 9 ++++-
.../source/StaticFileStoreSplitEnumerator.java | 31 ++++++++++-----
.../paimon/flink/source/SystemTableSource.java | 14 +++++--
.../source/StaticFileStoreSplitEnumeratorTest.java | 46 ++++++++++++++++++++--
8 files changed, 105 insertions(+), 21 deletions(-)
diff --git a/docs/layouts/shortcodes/generated/flink_connector_configuration.html b/docs/layouts/shortcodes/generated/flink_connector_configuration.html
index 91486fd13..0b4e43135 100644
--- a/docs/layouts/shortcodes/generated/flink_connector_configuration.html
+++ b/docs/layouts/shortcodes/generated/flink_connector_configuration.html
@@ -26,6 +26,12 @@
<td>String</td>
<td>The log system used to keep changes of the table.<br /><br />Possible values:<br /><ul><li>"none": No log system, the data is written only to file store, and the streaming read will be directly read from the file store.</li></ul><ul><li>"kafka": Kafka log system, the data is double written to file store and kafka, and the streaming read will be read from kafka.</li></ul></td>
</tr>
+ <tr>
+ <td><h5>scan.split-enumerator.batch-size</h5></td>
+ <td style="word-wrap: break-word;">10</td>
+ <td>Integer</td>
+ <td>How many splits should assign to subtask per batch in StaticFileStoreSplitEnumerator to avoid exceed `akka.framesize` limit.</td>
+ </tr>
<tr>
<td><h5>scan.parallelism</h5></td>
<td style="word-wrap: break-word;">(none)</td>
diff --git a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/AbstractFlinkTableFactory.java b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/AbstractFlinkTableFactory.java
index da9e2dba5..c12a5b0b2 100644
--- a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/AbstractFlinkTableFactory.java
+++ b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/AbstractFlinkTableFactory.java
@@ -70,7 +70,11 @@ public abstract class AbstractFlinkTableFactory
context.getConfiguration().get(ExecutionOptions.RUNTIME_MODE)
== RuntimeExecutionMode.STREAMING;
if (origin instanceof SystemCatalogTable) {
- return new SystemTableSource(((SystemCatalogTable) origin).table(), isStreamingMode);
+ int splitBatchSize =
+ Options.fromMap(origin.getOptions())
+ .get(FlinkConnectorOptions.SCAN_SPLIT_ENUMERATOR_BATCH_SIZE);
+ return new SystemTableSource(
+ ((SystemCatalogTable) origin).table(), isStreamingMode, splitBatchSize);
} else {
return new DataTableSource(
context.getObjectIdentifier(),
diff --git a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/FlinkConnectorOptions.java b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/FlinkConnectorOptions.java
index 357c7a2ad..c882ffb60 100644
--- a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/FlinkConnectorOptions.java
+++ b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/FlinkConnectorOptions.java
@@ -161,6 +161,14 @@ public class FlinkConnectorOptions {
"How often tasks should notify coordinator about the current watermark "
+ "and how often the coordinator should announce the maximal aligned watermark.");
+ public static final ConfigOption<Integer> SCAN_SPLIT_ENUMERATOR_BATCH_SIZE =
+ key("scan.split-enumerator.batch-size")
+ .intType()
+ .defaultValue(10)
+ .withDescription(
+ "How many splits should assign to subtask per batch in StaticFileStoreSplitEnumerator "
+ + "to avoid exceed `akka.framesize` limit.");
+
public static List<ConfigOption<?>> getOptions() {
final Field[] fields = FlinkConnectorOptions.class.getFields();
final List<ConfigOption<?>> list = new ArrayList<>(fields.length);
diff --git a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/source/SimpleSystemSource.java b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/source/SimpleSystemSource.java
index 26b2e7167..5fe27f25d 100644
--- a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/source/SimpleSystemSource.java
+++ b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/source/SimpleSystemSource.java
@@ -32,9 +32,11 @@ import java.util.Collection;
public class SimpleSystemSource extends FlinkSource {
private static final long serialVersionUID = 2L;
+ private final int splitBatchSize;
- public SimpleSystemSource(ReadBuilder readBuilder, @Nullable Long limit) {
+ public SimpleSystemSource(ReadBuilder readBuilder, @Nullable Long limit, int splitBatchSize) {
super(readBuilder, limit);
+ this.splitBatchSize = splitBatchSize;
}
@Override
@@ -52,6 +54,6 @@ public class SimpleSystemSource extends FlinkSource {
.createSplits(readBuilder.newScan().plan())
: checkpoint.splits();
- return new StaticFileStoreSplitEnumerator(context, null, splits);
+ return new StaticFileStoreSplitEnumerator(context, null, splits, splitBatchSize);
}
}
diff --git a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/source/StaticFileStoreSource.java b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/source/StaticFileStoreSource.java
index 49137ea4a..bef56959f 100644
--- a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/source/StaticFileStoreSource.java
+++ b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/source/StaticFileStoreSource.java
@@ -19,6 +19,7 @@
package org.apache.paimon.flink.source;
import org.apache.paimon.Snapshot;
+import org.apache.paimon.flink.FlinkConnectorOptions;
import org.apache.paimon.predicate.Predicate;
import org.apache.paimon.table.DataTable;
import org.apache.paimon.table.source.BatchDataTableScan;
@@ -94,6 +95,12 @@ public class StaticFileStoreSource extends FlinkSource {
}
Snapshot snapshot = snapshotId == null ? null : snapshotManager.snapshot(snapshotId);
- return new StaticFileStoreSplitEnumerator(context, snapshot, splits);
+ return new StaticFileStoreSplitEnumerator(
+ context,
+ snapshot,
+ splits,
+ table.options()
+ .toConfiguration()
+ .get(FlinkConnectorOptions.SCAN_SPLIT_ENUMERATOR_BATCH_SIZE));
}
}
diff --git a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/source/StaticFileStoreSplitEnumerator.java b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/source/StaticFileStoreSplitEnumerator.java
index 1ce87566f..89b1c0518 100644
--- a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/source/StaticFileStoreSplitEnumerator.java
+++ b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/source/StaticFileStoreSplitEnumerator.java
@@ -30,8 +30,10 @@ import java.util.ArrayList;
import java.util.Collection;
import java.util.Collections;
import java.util.HashMap;
+import java.util.LinkedList;
import java.util.List;
import java.util.Map;
+import java.util.Queue;
/** A {@link SplitEnumerator} implementation for {@link StaticFileStoreSource} input. */
public class StaticFileStoreSplitEnumerator
@@ -41,24 +43,29 @@ public class StaticFileStoreSplitEnumerator
@Nullable private final Snapshot snapshot;
- private final Map<Integer, List<FileStoreSourceSplit>> pendingSplitAssignment;
+ /** Default batch splits size to avoid exceed `akka.framesize`. */
+ private final int splitBatchSize;
+
+ private final Map<Integer, Queue<FileStoreSourceSplit>> pendingSplitAssignment;
public StaticFileStoreSplitEnumerator(
SplitEnumeratorContext<FileStoreSourceSplit> context,
@Nullable Snapshot snapshot,
- Collection<FileStoreSourceSplit> splits) {
+ Collection<FileStoreSourceSplit> splits,
+ int splitBatchSize) {
this.context = context;
this.snapshot = snapshot;
this.pendingSplitAssignment = createSplitAssignment(splits, context.currentParallelism());
+ this.splitBatchSize = splitBatchSize;
}
- private static Map<Integer, List<FileStoreSourceSplit>> createSplitAssignment(
+ private static Map<Integer, Queue<FileStoreSourceSplit>> createSplitAssignment(
Collection<FileStoreSourceSplit> splits, int numReaders) {
- Map<Integer, List<FileStoreSourceSplit>> assignment = new HashMap<>();
+ Map<Integer, Queue<FileStoreSourceSplit>> assignment = new HashMap<>();
int i = 0;
for (FileStoreSourceSplit split : splits) {
int task = i % numReaders;
- assignment.computeIfAbsent(task, k -> new ArrayList<>()).add(split);
+ assignment.computeIfAbsent(task, k -> new LinkedList<>()).add(split);
i++;
}
return assignment;
@@ -79,10 +86,14 @@ public class StaticFileStoreSplitEnumerator
// The following batch assignment operation is for two purposes:
// To distribute splits evenly when batch reading to prevent a few tasks from reading all
// the data (for example, the current resource can only schedule part of the tasks).
- // TODO: assignment is already created in constructor, here can just assign per batch
- List<FileStoreSourceSplit> splits = pendingSplitAssignment.remove(subtask);
- if (splits != null && splits.size() > 0) {
- context.assignSplits(new SplitsAssignment<>(Collections.singletonMap(subtask, splits)));
+ Queue<FileStoreSourceSplit> taskSplits = pendingSplitAssignment.get(subtask);
+ List<FileStoreSourceSplit> assignment = new ArrayList<>();
+ while (taskSplits != null && !taskSplits.isEmpty() && assignment.size() < splitBatchSize) {
+ assignment.add(taskSplits.poll());
+ }
+ if (assignment != null && assignment.size() > 0) {
+ context.assignSplits(
+ new SplitsAssignment<>(Collections.singletonMap(subtask, assignment)));
} else {
context.signalNoMoreSplits(subtask);
}
@@ -91,7 +102,7 @@ public class StaticFileStoreSplitEnumerator
@Override
public void addSplitsBack(List<FileStoreSourceSplit> backSplits, int subtaskId) {
pendingSplitAssignment
- .computeIfAbsent(subtaskId, k -> new ArrayList<>())
+ .computeIfAbsent(subtaskId, k -> new LinkedList<>())
.addAll(backSplits);
}
diff --git a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/source/SystemTableSource.java b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/source/SystemTableSource.java
index b2fa3c08b..e0a9a5502 100644
--- a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/source/SystemTableSource.java
+++ b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/source/SystemTableSource.java
@@ -35,11 +35,13 @@ public class SystemTableSource extends FlinkTableSource {
private final Table table;
private final boolean isStreamingMode;
+ private final int splitBatchSize;
- public SystemTableSource(Table table, boolean isStreamingMode) {
+ public SystemTableSource(Table table, boolean isStreamingMode, int splitBatchSize) {
super(table);
this.table = table;
this.isStreamingMode = isStreamingMode;
+ this.splitBatchSize = splitBatchSize;
}
public SystemTableSource(
@@ -47,10 +49,12 @@ public class SystemTableSource extends FlinkTableSource {
boolean isStreamingMode,
@Nullable Predicate predicate,
@Nullable int[][] projectFields,
- @Nullable Long limit) {
+ @Nullable Long limit,
+ int splitBatchSize) {
super(table, predicate, projectFields, limit);
this.table = table;
this.isStreamingMode = isStreamingMode;
+ this.splitBatchSize = splitBatchSize;
}
@Override
@@ -74,14 +78,16 @@ public class SystemTableSource extends FlinkTableSource {
table.newReadBuilder()
.withFilter(predicate)
.withProjection(projectFields),
- limit);
+ limit,
+ splitBatchSize);
}
return SourceProvider.of(source);
}
@Override
public DynamicTableSource copy() {
- return new SystemTableSource(table, isStreamingMode, predicate, projectFields, limit);
+ return new SystemTableSource(
+ table, isStreamingMode, predicate, projectFields, limit, splitBatchSize);
}
@Override
diff --git a/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/source/StaticFileStoreSplitEnumeratorTest.java b/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/source/StaticFileStoreSplitEnumeratorTest.java
index 9e58f201e..a0cca4042 100644
--- a/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/source/StaticFileStoreSplitEnumeratorTest.java
+++ b/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/source/StaticFileStoreSplitEnumeratorTest.java
@@ -45,7 +45,7 @@ public class StaticFileStoreSplitEnumeratorTest {
splits.add(createSnapshotSplit(i, 0, Collections.emptyList()));
}
StaticFileStoreSplitEnumerator enumerator =
- new StaticFileStoreSplitEnumerator(context, null, splits);
+ new StaticFileStoreSplitEnumerator(context, null, splits, 10);
// test assign
enumerator.handleSplitRequest(0, "test-host");
@@ -67,6 +67,46 @@ public class StaticFileStoreSplitEnumeratorTest {
.containsExactly(splits.get(0), splits.get(2));
}
+ @Test
+ public void testSplitBatch() {
+ final TestingSplitEnumeratorContext<FileStoreSourceSplit> context =
+ new TestingSplitEnumeratorContext<>(2);
+ context.registerReader(0, "test-host");
+ context.registerReader(1, "test-host");
+
+ List<FileStoreSourceSplit> splits = new ArrayList<>();
+ for (int i = 1; i <= 28; i++) {
+ splits.add(createSnapshotSplit(i, 0, Collections.emptyList()));
+ }
+ StaticFileStoreSplitEnumerator enumerator =
+ new StaticFileStoreSplitEnumerator(context, null, splits, 10);
+
+ // test assign
+ enumerator.handleSplitRequest(0, "test-host");
+ enumerator.handleSplitRequest(1, "test-host");
+ Map<Integer, SplitAssignmentState<FileStoreSourceSplit>> assignments =
+ context.getSplitAssignments();
+ assertThat(assignments).containsOnlyKeys(0, 1);
+ assertThat(assignments.get(0).getAssignedSplits()).hasSize(10);
+ assertThat(assignments.get(1).getAssignedSplits()).hasSize(10);
+
+ // test second batch assign
+ enumerator.handleSplitRequest(0, "test-host");
+ enumerator.handleSplitRequest(1, "test-host");
+
+ assertThat(assignments).containsOnlyKeys(0, 1);
+ assertThat(assignments.get(0).getAssignedSplits()).hasSize(14);
+ assertThat(assignments.get(1).getAssignedSplits()).hasSize(14);
+
+ // test third batch assign
+ enumerator.handleSplitRequest(0, "test-host");
+ enumerator.handleSplitRequest(1, "test-host");
+
+ assertThat(assignments).containsOnlyKeys(0, 1);
+ assertThat(assignments.get(0).hasReceivedNoMoreSplitsSignal()).isEqualTo(true);
+ assertThat(assignments.get(1).hasReceivedNoMoreSplitsSignal()).isEqualTo(true);
+ }
+
@Test
public void testSplitAllocationNotEvenly() {
final TestingSplitEnumeratorContext<FileStoreSourceSplit> context =
@@ -79,7 +119,7 @@ public class StaticFileStoreSplitEnumeratorTest {
splits.add(createSnapshotSplit(i, 0, Collections.emptyList()));
}
StaticFileStoreSplitEnumerator enumerator =
- new StaticFileStoreSplitEnumerator(context, null, splits);
+ new StaticFileStoreSplitEnumerator(context, null, splits, 10);
// test assign
enumerator.handleSplitRequest(0, "test-host");
@@ -105,7 +145,7 @@ public class StaticFileStoreSplitEnumeratorTest {
splits.add(createSnapshotSplit(i, 0, Collections.emptyList()));
}
StaticFileStoreSplitEnumerator enumerator =
- new StaticFileStoreSplitEnumerator(context, null, splits);
+ new StaticFileStoreSplitEnumerator(context, null, splits, 1);
// test assign
enumerator.handleSplitRequest(0, "test-host");