You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@paimon.apache.org by lz...@apache.org on 2023/03/31 06:46:10 UTC

[incubator-paimon] branch master updated: [flink] Assign batch splits for ContinuousFileSplitEnumerator (#770)

This is an automated email from the ASF dual-hosted git repository.

lzljs3620320 pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/incubator-paimon.git


The following commit(s) were added to refs/heads/master by this push:
     new 07f24d9f4 [flink] Assign batch splits for ContinuousFileSplitEnumerator (#770)
07f24d9f4 is described below

commit 07f24d9f46a4af306c12839e28e8b34afca257dd
Author: 吴祥平 <40...@qq.com>
AuthorDate: Fri Mar 31 14:46:05 2023 +0800

    [flink] Assign batch splits for ContinuousFileSplitEnumerator (#770)
---
 .../source/ContinuousFileSplitEnumerator.java      | 13 ++++--
 .../flink/source/ContinuousFileStoreSource.java    |  8 +++-
 .../source/ContinuousFileSplitEnumeratorTest.java  | 48 +++++++++++++++++++++-
 3 files changed, 63 insertions(+), 6 deletions(-)

diff --git a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/source/ContinuousFileSplitEnumerator.java b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/source/ContinuousFileSplitEnumerator.java
index 771c5b57a..166c2acca 100644
--- a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/source/ContinuousFileSplitEnumerator.java
+++ b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/source/ContinuousFileSplitEnumerator.java
@@ -64,6 +64,9 @@ public class ContinuousFileSplitEnumerator
 
     private final StreamTableScan scan;
 
+    /** Default batch splits size to avoid exceed `akka.framesize`. */
+    private final int splitBatchSize;
+
     @Nullable private Long nextSnapshotId;
 
     private boolean finished = false;
@@ -73,6 +76,7 @@ public class ContinuousFileSplitEnumerator
             Collection<FileStoreSourceSplit> remainSplits,
             @Nullable Long nextSnapshotId,
             long discoveryInterval,
+            int splitBatchSize,
             StreamTableScan scan) {
         checkArgument(discoveryInterval > 0L);
         this.context = checkNotNull(context);
@@ -80,6 +84,7 @@ public class ContinuousFileSplitEnumerator
         addSplits(remainSplits);
         this.nextSnapshotId = nextSnapshotId;
         this.discoveryInterval = discoveryInterval;
+        this.splitBatchSize = splitBatchSize;
         this.readersAwaitingSplit = new HashSet<>();
         this.splitGenerator = new FileStoreSourceSplitGenerator();
         this.scan = scan;
@@ -205,9 +210,11 @@ public class ContinuousFileSplitEnumerator
                                 readersAwaitingSplit.remove(task);
                                 return;
                             }
-                            assignment
-                                    .computeIfAbsent(task, i -> new ArrayList<>())
-                                    .add(splits.poll());
+                            List<FileStoreSourceSplit> taskAssignment =
+                                    assignment.computeIfAbsent(task, i -> new ArrayList<>());
+                            if (taskAssignment.size() < splitBatchSize) {
+                                taskAssignment.add(splits.poll());
+                            }
                         }
                     }
                 });
diff --git a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/source/ContinuousFileStoreSource.java b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/source/ContinuousFileStoreSource.java
index 09abb116d..e9c14ca15 100644
--- a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/source/ContinuousFileStoreSource.java
+++ b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/source/ContinuousFileStoreSource.java
@@ -19,6 +19,7 @@
 package org.apache.paimon.flink.source;
 
 import org.apache.paimon.CoreOptions;
+import org.apache.paimon.flink.FlinkConnectorOptions;
 import org.apache.paimon.flink.utils.TableScanUtils;
 import org.apache.paimon.options.Options;
 import org.apache.paimon.table.source.ReadBuilder;
@@ -75,12 +76,15 @@ public class ContinuousFileStoreSource extends FlinkSource {
             nextSnapshotId = checkpoint.currentSnapshotId();
             splits = checkpoint.splits();
         }
-
+        CoreOptions coreOptions = CoreOptions.fromMap(options);
         return new ContinuousFileSplitEnumerator(
                 context,
                 splits,
                 nextSnapshotId,
-                CoreOptions.fromMap(options).continuousDiscoveryInterval().toMillis(),
+                coreOptions.continuousDiscoveryInterval().toMillis(),
+                coreOptions
+                        .toConfiguration()
+                        .get(FlinkConnectorOptions.SCAN_SPLIT_ENUMERATOR_BATCH_SIZE),
                 scanFactory.create(readBuilder, nextSnapshotId));
     }
 
diff --git a/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/source/ContinuousFileSplitEnumeratorTest.java b/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/source/ContinuousFileSplitEnumeratorTest.java
index a06cd198e..1c9ea6b71 100644
--- a/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/source/ContinuousFileSplitEnumeratorTest.java
+++ b/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/source/ContinuousFileSplitEnumeratorTest.java
@@ -104,6 +104,45 @@ public class ContinuousFileSplitEnumeratorTest {
         assertThat(assignedSplits).hasSameElementsAs(expectedSplits.subList(2, 4));
     }
 
+    @Test
+    public void testSplitWithBatch() {
+        final TestingSplitEnumeratorContext<FileStoreSourceSplit> context =
+                new TestingSplitEnumeratorContext<>(1);
+        context.registerReader(0, "test-host");
+
+        List<FileStoreSourceSplit> initialSplits = new ArrayList<>();
+        for (int i = 1; i <= 18; i++) {
+            initialSplits.add(createSnapshotSplit(i, i, Collections.emptyList()));
+        }
+        final ContinuousFileSplitEnumerator enumerator =
+                new Builder()
+                        .setSplitEnumeratorContext(context)
+                        .setInitialSplits(initialSplits)
+                        .setDiscoveryInterval(3)
+                        .setSplitBatchSize(10)
+                        .build();
+
+        // The first time split is allocated, split1 and split2 should be allocated
+        enumerator.handleSplitRequest(0, "test-host");
+        Map<Integer, SplitAssignmentState<FileStoreSourceSplit>> assignments =
+                context.getSplitAssignments();
+        // Only subtask-0 is allocated.
+        assertThat(assignments).containsOnlyKeys(0);
+        assertThat(assignments.get(0).getAssignedSplits()).hasSize(10);
+
+        // test second batch assign
+        enumerator.handleSplitRequest(0, "test-host");
+
+        assertThat(assignments).containsOnlyKeys(0);
+        assertThat(assignments.get(0).getAssignedSplits()).hasSize(18);
+
+        // test third batch assign
+        enumerator.handleSplitRequest(0, "test-host");
+
+        assertThat(assignments).containsOnlyKeys(0);
+        assertThat(assignments.get(0).getAssignedSplits()).hasSize(18);
+    }
+
     @Test
     public void testSplitAllocationIsFair() {
         final TestingSplitEnumeratorContext<FileStoreSourceSplit> context =
@@ -227,6 +266,8 @@ public class ContinuousFileSplitEnumeratorTest {
         private SplitEnumeratorContext<FileStoreSourceSplit> context;
         private Collection<FileStoreSourceSplit> initialSplits = Collections.emptyList();
         private long discoveryInterval = Long.MAX_VALUE;
+
+        private int splitBatchSize = 10;
         private StreamDataTableScan scan;
 
         public Builder setSplitEnumeratorContext(
@@ -245,6 +286,11 @@ public class ContinuousFileSplitEnumeratorTest {
             return this;
         }
 
+        public Builder setSplitBatchSize(int splitBatchSize) {
+            this.splitBatchSize = splitBatchSize;
+            return this;
+        }
+
         public Builder setScan(StreamDataTableScan scan) {
             this.scan = scan;
             return this;
@@ -252,7 +298,7 @@ public class ContinuousFileSplitEnumeratorTest {
 
         public ContinuousFileSplitEnumerator build() {
             return new ContinuousFileSplitEnumerator(
-                    context, initialSplits, null, discoveryInterval, scan);
+                    context, initialSplits, null, discoveryInterval, splitBatchSize, scan);
         }
     }