You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@paimon.apache.org by lz...@apache.org on 2023/03/24 02:41:02 UTC

[incubator-paimon] branch master updated: [flink] Assign splits with fixed batch size in StaticFileStoreSplitEnumerator (#687)

This is an automated email from the ASF dual-hosted git repository.

lzljs3620320 pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/incubator-paimon.git


The following commit(s) were added to refs/heads/master by this push:
     new 20ce9665b [flink] Assign splits with fixed batch size in StaticFileStoreSplitEnumerator (#687)
20ce9665b is described below

commit 20ce9665b689615653f8170916bd4f286a8de09f
Author: 吴祥平 <40...@qq.com>
AuthorDate: Fri Mar 24 10:40:57 2023 +0800

    [flink] Assign splits with fixed batch size in StaticFileStoreSplitEnumerator (#687)
---
 .../generated/flink_connector_configuration.html   |  6 +++
 .../paimon/flink/AbstractFlinkTableFactory.java    |  6 ++-
 .../apache/paimon/flink/FlinkConnectorOptions.java |  8 ++++
 .../paimon/flink/source/SimpleSystemSource.java    |  6 ++-
 .../paimon/flink/source/StaticFileStoreSource.java |  9 ++++-
 .../source/StaticFileStoreSplitEnumerator.java     | 31 ++++++++++-----
 .../paimon/flink/source/SystemTableSource.java     | 14 +++++--
 .../source/StaticFileStoreSplitEnumeratorTest.java | 46 ++++++++++++++++++++--
 8 files changed, 105 insertions(+), 21 deletions(-)

diff --git a/docs/layouts/shortcodes/generated/flink_connector_configuration.html b/docs/layouts/shortcodes/generated/flink_connector_configuration.html
index 91486fd13..0b4e43135 100644
--- a/docs/layouts/shortcodes/generated/flink_connector_configuration.html
+++ b/docs/layouts/shortcodes/generated/flink_connector_configuration.html
@@ -26,6 +26,12 @@
             <td>String</td>
             <td>The log system used to keep changes of the table.<br /><br />Possible values:<br /><ul><li>"none": No log system, the data is written only to file store, and the streaming read will be directly read from the file store.</li></ul><ul><li>"kafka": Kafka log system, the data is double written to file store and kafka, and the streaming read will be read from kafka.</li></ul></td>
         </tr>
+        <tr>
+            <td><h5>scan.split-enumerator.batch-size</h5></td>
+            <td style="word-wrap: break-word;">10</td>
+            <td>Integer</td>
+            <td>How many splits should assign to subtask per batch in StaticFileStoreSplitEnumerator to avoid exceed `akka.framesize` limit.</td>
+        </tr>
         <tr>
             <td><h5>scan.parallelism</h5></td>
             <td style="word-wrap: break-word;">(none)</td>
diff --git a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/AbstractFlinkTableFactory.java b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/AbstractFlinkTableFactory.java
index da9e2dba5..c12a5b0b2 100644
--- a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/AbstractFlinkTableFactory.java
+++ b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/AbstractFlinkTableFactory.java
@@ -70,7 +70,11 @@ public abstract class AbstractFlinkTableFactory
                 context.getConfiguration().get(ExecutionOptions.RUNTIME_MODE)
                         == RuntimeExecutionMode.STREAMING;
         if (origin instanceof SystemCatalogTable) {
-            return new SystemTableSource(((SystemCatalogTable) origin).table(), isStreamingMode);
+            int splitBatchSize =
+                    Options.fromMap(origin.getOptions())
+                            .get(FlinkConnectorOptions.SCAN_SPLIT_ENUMERATOR_BATCH_SIZE);
+            return new SystemTableSource(
+                    ((SystemCatalogTable) origin).table(), isStreamingMode, splitBatchSize);
         } else {
             return new DataTableSource(
                     context.getObjectIdentifier(),
diff --git a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/FlinkConnectorOptions.java b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/FlinkConnectorOptions.java
index 357c7a2ad..c882ffb60 100644
--- a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/FlinkConnectorOptions.java
+++ b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/FlinkConnectorOptions.java
@@ -161,6 +161,14 @@ public class FlinkConnectorOptions {
                             "How often tasks should notify coordinator about the current watermark "
                                     + "and how often the coordinator should announce the maximal aligned watermark.");
 
+    public static final ConfigOption<Integer> SCAN_SPLIT_ENUMERATOR_BATCH_SIZE =
+            key("scan.split-enumerator.batch-size")
+                    .intType()
+                    .defaultValue(10)
+                    .withDescription(
+                            "How many splits should assign to subtask per batch in StaticFileStoreSplitEnumerator "
+                                    + "to avoid exceed `akka.framesize` limit.");
+
     public static List<ConfigOption<?>> getOptions() {
         final Field[] fields = FlinkConnectorOptions.class.getFields();
         final List<ConfigOption<?>> list = new ArrayList<>(fields.length);
diff --git a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/source/SimpleSystemSource.java b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/source/SimpleSystemSource.java
index 26b2e7167..5fe27f25d 100644
--- a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/source/SimpleSystemSource.java
+++ b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/source/SimpleSystemSource.java
@@ -32,9 +32,11 @@ import java.util.Collection;
 public class SimpleSystemSource extends FlinkSource {
 
     private static final long serialVersionUID = 2L;
+    private final int splitBatchSize;
 
-    public SimpleSystemSource(ReadBuilder readBuilder, @Nullable Long limit) {
+    public SimpleSystemSource(ReadBuilder readBuilder, @Nullable Long limit, int splitBatchSize) {
         super(readBuilder, limit);
+        this.splitBatchSize = splitBatchSize;
     }
 
     @Override
@@ -52,6 +54,6 @@ public class SimpleSystemSource extends FlinkSource {
                                 .createSplits(readBuilder.newScan().plan())
                         : checkpoint.splits();
 
-        return new StaticFileStoreSplitEnumerator(context, null, splits);
+        return new StaticFileStoreSplitEnumerator(context, null, splits, splitBatchSize);
     }
 }
diff --git a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/source/StaticFileStoreSource.java b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/source/StaticFileStoreSource.java
index 49137ea4a..bef56959f 100644
--- a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/source/StaticFileStoreSource.java
+++ b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/source/StaticFileStoreSource.java
@@ -19,6 +19,7 @@
 package org.apache.paimon.flink.source;
 
 import org.apache.paimon.Snapshot;
+import org.apache.paimon.flink.FlinkConnectorOptions;
 import org.apache.paimon.predicate.Predicate;
 import org.apache.paimon.table.DataTable;
 import org.apache.paimon.table.source.BatchDataTableScan;
@@ -94,6 +95,12 @@ public class StaticFileStoreSource extends FlinkSource {
         }
 
         Snapshot snapshot = snapshotId == null ? null : snapshotManager.snapshot(snapshotId);
-        return new StaticFileStoreSplitEnumerator(context, snapshot, splits);
+        return new StaticFileStoreSplitEnumerator(
+                context,
+                snapshot,
+                splits,
+                table.options()
+                        .toConfiguration()
+                        .get(FlinkConnectorOptions.SCAN_SPLIT_ENUMERATOR_BATCH_SIZE));
     }
 }
diff --git a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/source/StaticFileStoreSplitEnumerator.java b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/source/StaticFileStoreSplitEnumerator.java
index 1ce87566f..89b1c0518 100644
--- a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/source/StaticFileStoreSplitEnumerator.java
+++ b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/source/StaticFileStoreSplitEnumerator.java
@@ -30,8 +30,10 @@ import java.util.ArrayList;
 import java.util.Collection;
 import java.util.Collections;
 import java.util.HashMap;
+import java.util.LinkedList;
 import java.util.List;
 import java.util.Map;
+import java.util.Queue;
 
 /** A {@link SplitEnumerator} implementation for {@link StaticFileStoreSource} input. */
 public class StaticFileStoreSplitEnumerator
@@ -41,24 +43,29 @@ public class StaticFileStoreSplitEnumerator
 
     @Nullable private final Snapshot snapshot;
 
-    private final Map<Integer, List<FileStoreSourceSplit>> pendingSplitAssignment;
+    /** Default batch splits size to avoid exceed `akka.framesize`. */
+    private final int splitBatchSize;
+
+    private final Map<Integer, Queue<FileStoreSourceSplit>> pendingSplitAssignment;
 
     public StaticFileStoreSplitEnumerator(
             SplitEnumeratorContext<FileStoreSourceSplit> context,
             @Nullable Snapshot snapshot,
-            Collection<FileStoreSourceSplit> splits) {
+            Collection<FileStoreSourceSplit> splits,
+            int splitBatchSize) {
         this.context = context;
         this.snapshot = snapshot;
         this.pendingSplitAssignment = createSplitAssignment(splits, context.currentParallelism());
+        this.splitBatchSize = splitBatchSize;
     }
 
-    private static Map<Integer, List<FileStoreSourceSplit>> createSplitAssignment(
+    private static Map<Integer, Queue<FileStoreSourceSplit>> createSplitAssignment(
             Collection<FileStoreSourceSplit> splits, int numReaders) {
-        Map<Integer, List<FileStoreSourceSplit>> assignment = new HashMap<>();
+        Map<Integer, Queue<FileStoreSourceSplit>> assignment = new HashMap<>();
         int i = 0;
         for (FileStoreSourceSplit split : splits) {
             int task = i % numReaders;
-            assignment.computeIfAbsent(task, k -> new ArrayList<>()).add(split);
+            assignment.computeIfAbsent(task, k -> new LinkedList<>()).add(split);
             i++;
         }
         return assignment;
@@ -79,10 +86,14 @@ public class StaticFileStoreSplitEnumerator
         // The following batch assignment operation is for two purposes:
         // To distribute splits evenly when batch reading to prevent a few tasks from reading all
         // the data (for example, the current resource can only schedule part of the tasks).
-        // TODO: assignment is already created in constructor, here can just assign per batch
-        List<FileStoreSourceSplit> splits = pendingSplitAssignment.remove(subtask);
-        if (splits != null && splits.size() > 0) {
-            context.assignSplits(new SplitsAssignment<>(Collections.singletonMap(subtask, splits)));
+        Queue<FileStoreSourceSplit> taskSplits = pendingSplitAssignment.get(subtask);
+        List<FileStoreSourceSplit> assignment = new ArrayList<>();
+        while (taskSplits != null && !taskSplits.isEmpty() && assignment.size() < splitBatchSize) {
+            assignment.add(taskSplits.poll());
+        }
+        if (assignment != null && assignment.size() > 0) {
+            context.assignSplits(
+                    new SplitsAssignment<>(Collections.singletonMap(subtask, assignment)));
         } else {
             context.signalNoMoreSplits(subtask);
         }
@@ -91,7 +102,7 @@ public class StaticFileStoreSplitEnumerator
     @Override
     public void addSplitsBack(List<FileStoreSourceSplit> backSplits, int subtaskId) {
         pendingSplitAssignment
-                .computeIfAbsent(subtaskId, k -> new ArrayList<>())
+                .computeIfAbsent(subtaskId, k -> new LinkedList<>())
                 .addAll(backSplits);
     }
 
diff --git a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/source/SystemTableSource.java b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/source/SystemTableSource.java
index b2fa3c08b..e0a9a5502 100644
--- a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/source/SystemTableSource.java
+++ b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/source/SystemTableSource.java
@@ -35,11 +35,13 @@ public class SystemTableSource extends FlinkTableSource {
 
     private final Table table;
     private final boolean isStreamingMode;
+    private final int splitBatchSize;
 
-    public SystemTableSource(Table table, boolean isStreamingMode) {
+    public SystemTableSource(Table table, boolean isStreamingMode, int splitBatchSize) {
         super(table);
         this.table = table;
         this.isStreamingMode = isStreamingMode;
+        this.splitBatchSize = splitBatchSize;
     }
 
     public SystemTableSource(
@@ -47,10 +49,12 @@ public class SystemTableSource extends FlinkTableSource {
             boolean isStreamingMode,
             @Nullable Predicate predicate,
             @Nullable int[][] projectFields,
-            @Nullable Long limit) {
+            @Nullable Long limit,
+            int splitBatchSize) {
         super(table, predicate, projectFields, limit);
         this.table = table;
         this.isStreamingMode = isStreamingMode;
+        this.splitBatchSize = splitBatchSize;
     }
 
     @Override
@@ -74,14 +78,16 @@ public class SystemTableSource extends FlinkTableSource {
                             table.newReadBuilder()
                                     .withFilter(predicate)
                                     .withProjection(projectFields),
-                            limit);
+                            limit,
+                            splitBatchSize);
         }
         return SourceProvider.of(source);
     }
 
     @Override
     public DynamicTableSource copy() {
-        return new SystemTableSource(table, isStreamingMode, predicate, projectFields, limit);
+        return new SystemTableSource(
+                table, isStreamingMode, predicate, projectFields, limit, splitBatchSize);
     }
 
     @Override
diff --git a/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/source/StaticFileStoreSplitEnumeratorTest.java b/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/source/StaticFileStoreSplitEnumeratorTest.java
index 9e58f201e..a0cca4042 100644
--- a/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/source/StaticFileStoreSplitEnumeratorTest.java
+++ b/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/source/StaticFileStoreSplitEnumeratorTest.java
@@ -45,7 +45,7 @@ public class StaticFileStoreSplitEnumeratorTest {
             splits.add(createSnapshotSplit(i, 0, Collections.emptyList()));
         }
         StaticFileStoreSplitEnumerator enumerator =
-                new StaticFileStoreSplitEnumerator(context, null, splits);
+                new StaticFileStoreSplitEnumerator(context, null, splits, 10);
 
         // test assign
         enumerator.handleSplitRequest(0, "test-host");
@@ -67,6 +67,46 @@ public class StaticFileStoreSplitEnumeratorTest {
                 .containsExactly(splits.get(0), splits.get(2));
     }
 
+    @Test
+    public void testSplitBatch() {
+        final TestingSplitEnumeratorContext<FileStoreSourceSplit> context =
+                new TestingSplitEnumeratorContext<>(2);
+        context.registerReader(0, "test-host");
+        context.registerReader(1, "test-host");
+
+        List<FileStoreSourceSplit> splits = new ArrayList<>();
+        for (int i = 1; i <= 28; i++) {
+            splits.add(createSnapshotSplit(i, 0, Collections.emptyList()));
+        }
+        StaticFileStoreSplitEnumerator enumerator =
+                new StaticFileStoreSplitEnumerator(context, null, splits, 10);
+
+        // test assign
+        enumerator.handleSplitRequest(0, "test-host");
+        enumerator.handleSplitRequest(1, "test-host");
+        Map<Integer, SplitAssignmentState<FileStoreSourceSplit>> assignments =
+                context.getSplitAssignments();
+        assertThat(assignments).containsOnlyKeys(0, 1);
+        assertThat(assignments.get(0).getAssignedSplits()).hasSize(10);
+        assertThat(assignments.get(1).getAssignedSplits()).hasSize(10);
+
+        // test second batch assign
+        enumerator.handleSplitRequest(0, "test-host");
+        enumerator.handleSplitRequest(1, "test-host");
+
+        assertThat(assignments).containsOnlyKeys(0, 1);
+        assertThat(assignments.get(0).getAssignedSplits()).hasSize(14);
+        assertThat(assignments.get(1).getAssignedSplits()).hasSize(14);
+
+        // test third batch assign
+        enumerator.handleSplitRequest(0, "test-host");
+        enumerator.handleSplitRequest(1, "test-host");
+
+        assertThat(assignments).containsOnlyKeys(0, 1);
+        assertThat(assignments.get(0).hasReceivedNoMoreSplitsSignal()).isEqualTo(true);
+        assertThat(assignments.get(1).hasReceivedNoMoreSplitsSignal()).isEqualTo(true);
+    }
+
     @Test
     public void testSplitAllocationNotEvenly() {
         final TestingSplitEnumeratorContext<FileStoreSourceSplit> context =
@@ -79,7 +119,7 @@ public class StaticFileStoreSplitEnumeratorTest {
             splits.add(createSnapshotSplit(i, 0, Collections.emptyList()));
         }
         StaticFileStoreSplitEnumerator enumerator =
-                new StaticFileStoreSplitEnumerator(context, null, splits);
+                new StaticFileStoreSplitEnumerator(context, null, splits, 10);
 
         // test assign
         enumerator.handleSplitRequest(0, "test-host");
@@ -105,7 +145,7 @@ public class StaticFileStoreSplitEnumeratorTest {
             splits.add(createSnapshotSplit(i, 0, Collections.emptyList()));
         }
         StaticFileStoreSplitEnumerator enumerator =
-                new StaticFileStoreSplitEnumerator(context, null, splits);
+                new StaticFileStoreSplitEnumerator(context, null, splits, 1);
 
         // test assign
         enumerator.handleSplitRequest(0, "test-host");