You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@paimon.apache.org by ni...@apache.org on 2023/03/22 08:00:10 UTC

[incubator-paimon] branch master updated: [flink] Limit should not rely on assign all splits in StaticFileStoreSplitEnumerator (#680)

This is an automated email from the ASF dual-hosted git repository.

nicholasjiang pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/incubator-paimon.git


The following commit(s) were added to refs/heads/master by this push:
     new 2fa01148c [flink] Limit should not rely on assign all splits in StaticFileStoreSplitEnumerator (#680)
2fa01148c is described below

commit 2fa01148ca79d2a6015f755e80590fd6203eef5e
Author: Jingsong Lee <ji...@gmail.com>
AuthorDate: Wed Mar 22 16:00:05 2023 +0800

    [flink] Limit should not rely on assign all splits in StaticFileStoreSplitEnumerator (#680)
---
 .../paimon/flink/source/FileStoreSourceReader.java | 15 +++++++-
 .../flink/source/FileStoreSourceSplitReader.java   | 19 +++++----
 .../apache/paimon/flink/source/RecordLimiter.java  | 45 ++++++++++++++++++++++
 .../source/StaticFileStoreSplitEnumerator.java     |  7 +---
 .../source/FileStoreSourceSplitReaderTest.java     |  5 ++-
 5 files changed, 77 insertions(+), 14 deletions(-)

diff --git a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/source/FileStoreSourceReader.java b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/source/FileStoreSourceReader.java
index 8e30b02d1..a001bbf5a 100644
--- a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/source/FileStoreSourceReader.java
+++ b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/source/FileStoreSourceReader.java
@@ -39,8 +39,21 @@ public final class FileStoreSourceReader<T>
             SourceReaderContext readerContext,
             TableRead tableRead,
             @Nullable Long limit) {
+        this(
+                recordsFunction,
+                readerContext,
+                tableRead,
+                limit == null ? null : new RecordLimiter(limit));
+    }
+
+    private FileStoreSourceReader(
+            RecordsFunction<T> recordsFunction,
+            SourceReaderContext readerContext,
+            TableRead tableRead,
+            @Nullable RecordLimiter limiter) {
+        // limiter is created in SourceReader, it can be shared in all split readers
         super(
-                () -> new FileStoreSourceSplitReader<>(recordsFunction, tableRead, limit),
+                () -> new FileStoreSourceSplitReader<>(recordsFunction, tableRead, limiter),
                 recordsFunction,
                 readerContext.getConfiguration(),
                 readerContext);
diff --git a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/source/FileStoreSourceSplitReader.java b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/source/FileStoreSourceSplitReader.java
index 69600ef9c..dff3e6e19 100644
--- a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/source/FileStoreSourceSplitReader.java
+++ b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/source/FileStoreSourceSplitReader.java
@@ -48,7 +48,7 @@ public class FileStoreSourceSplitReader<T> implements SplitReader<T, FileStoreSo
 
     private final TableRead tableRead;
 
-    @Nullable private final Long limit;
+    @Nullable private final RecordLimiter limiter;
 
     private final Queue<FileStoreSourceSplit> splits;
 
@@ -56,15 +56,16 @@ public class FileStoreSourceSplitReader<T> implements SplitReader<T, FileStoreSo
 
     @Nullable private LazyRecordReader currentReader;
     @Nullable private String currentSplitId;
-    private long totalNumRead;
     private long currentNumRead;
     private RecordIterator<InternalRow> currentFirstBatch;
 
     public FileStoreSourceSplitReader(
-            RecordsFunction<T> recordsFunction, TableRead tableRead, @Nullable Long limit) {
+            RecordsFunction<T> recordsFunction,
+            TableRead tableRead,
+            @Nullable RecordLimiter limiter) {
         this.recordsFunction = recordsFunction;
         this.tableRead = tableRead;
-        this.limit = limit;
+        this.limiter = limiter;
         this.splits = new LinkedList<>();
         this.pool = new Pool<>(1);
         this.pool.add(new FileStoreRecordIterator());
@@ -93,7 +94,7 @@ public class FileStoreSourceSplitReader<T> implements SplitReader<T, FileStoreSo
     }
 
     private boolean reachLimit() {
-        return limit != null && totalNumRead >= limit;
+        return limiter != null && limiter.reachLimit();
     }
 
     private FileStoreRecordIterator pool() throws IOException {
@@ -142,7 +143,9 @@ public class FileStoreSourceSplitReader<T> implements SplitReader<T, FileStoreSo
         currentSplitId = nextSplit.splitId();
         currentReader = new LazyRecordReader(nextSplit.split());
         currentNumRead = nextSplit.recordsToSkip();
-        totalNumRead += currentNumRead;
+        if (limiter != null) {
+            limiter.add(currentNumRead);
+        }
         if (currentNumRead > 0) {
             seek(currentNumRead);
         }
@@ -212,7 +215,9 @@ public class FileStoreSourceSplitReader<T> implements SplitReader<T, FileStoreSo
 
             recordAndPosition.setNext(new FlinkRowData(row));
             currentNumRead++;
-            totalNumRead++;
+            if (limiter != null) {
+                limiter.increment();
+            }
             return recordAndPosition;
         }
 
diff --git a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/source/RecordLimiter.java b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/source/RecordLimiter.java
new file mode 100644
index 000000000..afaac6a82
--- /dev/null
+++ b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/source/RecordLimiter.java
@@ -0,0 +1,45 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.paimon.flink.source;
+
+import java.util.concurrent.atomic.AtomicLong;
+
+/** A limiter to limit record reading. */
+public class RecordLimiter {
+
+    private final long limit;
+    private final AtomicLong counter;
+
+    public RecordLimiter(long limit) {
+        this.limit = limit;
+        this.counter = new AtomicLong(0);
+    }
+
+    public boolean reachLimit() {
+        return counter.get() >= limit;
+    }
+
+    public void increment() {
+        counter.incrementAndGet();
+    }
+
+    public void add(long delta) {
+        counter.addAndGet(delta);
+    }
+}
diff --git a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/source/StaticFileStoreSplitEnumerator.java b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/source/StaticFileStoreSplitEnumerator.java
index d048cb6d1..1ce87566f 100644
--- a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/source/StaticFileStoreSplitEnumerator.java
+++ b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/source/StaticFileStoreSplitEnumerator.java
@@ -77,12 +77,9 @@ public class StaticFileStoreSplitEnumerator
         }
 
         // The following batch assignment operation is for two purposes:
-        // 1. To distribute splits evenly when batch reading to prevent a few tasks from reading all
+        // To distribute splits evenly when batch reading to prevent a few tasks from reading all
         // the data (for example, the current resource can only schedule part of the tasks).
-        // 2. Optimize limit reading. In limit reading, the task will repeatedly create SplitFetcher
-        // to read the data of the limit number for each coming split (the limit status is in the
-        // SplitFetcher). So if the assigment are divided too small, the task will cost more time on
-        // creating SplitFetcher and reading data.
+        // TODO: assignment is already created in constructor, here can just assign per batch
         List<FileStoreSourceSplit> splits = pendingSplitAssignment.remove(subtask);
         if (splits != null && splits.size() > 0) {
             context.assignSplits(new SplitsAssignment<>(Collections.singletonMap(subtask, splits)));
diff --git a/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/source/FileStoreSourceSplitReaderTest.java b/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/source/FileStoreSourceSplitReaderTest.java
index d447739c4..e52cc294a 100644
--- a/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/source/FileStoreSourceSplitReaderTest.java
+++ b/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/source/FileStoreSourceSplitReaderTest.java
@@ -105,7 +105,10 @@ public class FileStoreSourceSplitReaderTest {
 
     private FileStoreSourceSplitReader<RecordAndPosition<RowData>> createReader(
             TableRead tableRead, @Nullable Long limit) {
-        return new FileStoreSourceSplitReader<>(RecordsFunction.forIterate(), tableRead, limit);
+        return new FileStoreSourceSplitReader<>(
+                RecordsFunction.forIterate(),
+                tableRead,
+                limit == null ? null : new RecordLimiter(limit));
     }
 
     private void innerTestOnce(boolean valueCountMode, int skip) throws Exception {