You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@paimon.apache.org by ni...@apache.org on 2023/03/22 08:00:10 UTC
[incubator-paimon] branch master updated: [flink] Limit should not rely on assign all splits in StaticFileStoreSplitEnumerator (#680)
This is an automated email from the ASF dual-hosted git repository.
nicholasjiang pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/incubator-paimon.git
The following commit(s) were added to refs/heads/master by this push:
new 2fa01148c [flink] Limit should not rely on assign all splits in StaticFileStoreSplitEnumerator (#680)
2fa01148c is described below
commit 2fa01148ca79d2a6015f755e80590fd6203eef5e
Author: Jingsong Lee <ji...@gmail.com>
AuthorDate: Wed Mar 22 16:00:05 2023 +0800
[flink] Limit should not rely on assign all splits in StaticFileStoreSplitEnumerator (#680)
---
.../paimon/flink/source/FileStoreSourceReader.java | 15 +++++++-
.../flink/source/FileStoreSourceSplitReader.java | 19 +++++----
.../apache/paimon/flink/source/RecordLimiter.java | 45 ++++++++++++++++++++++
.../source/StaticFileStoreSplitEnumerator.java | 7 +---
.../source/FileStoreSourceSplitReaderTest.java | 5 ++-
5 files changed, 77 insertions(+), 14 deletions(-)
diff --git a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/source/FileStoreSourceReader.java b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/source/FileStoreSourceReader.java
index 8e30b02d1..a001bbf5a 100644
--- a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/source/FileStoreSourceReader.java
+++ b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/source/FileStoreSourceReader.java
@@ -39,8 +39,21 @@ public final class FileStoreSourceReader<T>
SourceReaderContext readerContext,
TableRead tableRead,
@Nullable Long limit) {
+ this(
+ recordsFunction,
+ readerContext,
+ tableRead,
+ limit == null ? null : new RecordLimiter(limit));
+ }
+
+ private FileStoreSourceReader(
+ RecordsFunction<T> recordsFunction,
+ SourceReaderContext readerContext,
+ TableRead tableRead,
+ @Nullable RecordLimiter limiter) {
+ // limiter is created in SourceReader, it can be shared in all split readers
super(
- () -> new FileStoreSourceSplitReader<>(recordsFunction, tableRead, limit),
+ () -> new FileStoreSourceSplitReader<>(recordsFunction, tableRead, limiter),
recordsFunction,
readerContext.getConfiguration(),
readerContext);
diff --git a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/source/FileStoreSourceSplitReader.java b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/source/FileStoreSourceSplitReader.java
index 69600ef9c..dff3e6e19 100644
--- a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/source/FileStoreSourceSplitReader.java
+++ b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/source/FileStoreSourceSplitReader.java
@@ -48,7 +48,7 @@ public class FileStoreSourceSplitReader<T> implements SplitReader<T, FileStoreSo
private final TableRead tableRead;
- @Nullable private final Long limit;
+ @Nullable private final RecordLimiter limiter;
private final Queue<FileStoreSourceSplit> splits;
@@ -56,15 +56,16 @@ public class FileStoreSourceSplitReader<T> implements SplitReader<T, FileStoreSo
@Nullable private LazyRecordReader currentReader;
@Nullable private String currentSplitId;
- private long totalNumRead;
private long currentNumRead;
private RecordIterator<InternalRow> currentFirstBatch;
public FileStoreSourceSplitReader(
- RecordsFunction<T> recordsFunction, TableRead tableRead, @Nullable Long limit) {
+ RecordsFunction<T> recordsFunction,
+ TableRead tableRead,
+ @Nullable RecordLimiter limiter) {
this.recordsFunction = recordsFunction;
this.tableRead = tableRead;
- this.limit = limit;
+ this.limiter = limiter;
this.splits = new LinkedList<>();
this.pool = new Pool<>(1);
this.pool.add(new FileStoreRecordIterator());
@@ -93,7 +94,7 @@ public class FileStoreSourceSplitReader<T> implements SplitReader<T, FileStoreSo
}
private boolean reachLimit() {
- return limit != null && totalNumRead >= limit;
+ return limiter != null && limiter.reachLimit();
}
private FileStoreRecordIterator pool() throws IOException {
@@ -142,7 +143,9 @@ public class FileStoreSourceSplitReader<T> implements SplitReader<T, FileStoreSo
currentSplitId = nextSplit.splitId();
currentReader = new LazyRecordReader(nextSplit.split());
currentNumRead = nextSplit.recordsToSkip();
- totalNumRead += currentNumRead;
+ if (limiter != null) {
+ limiter.add(currentNumRead);
+ }
if (currentNumRead > 0) {
seek(currentNumRead);
}
@@ -212,7 +215,9 @@ public class FileStoreSourceSplitReader<T> implements SplitReader<T, FileStoreSo
recordAndPosition.setNext(new FlinkRowData(row));
currentNumRead++;
- totalNumRead++;
+ if (limiter != null) {
+ limiter.increment();
+ }
return recordAndPosition;
}
diff --git a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/source/RecordLimiter.java b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/source/RecordLimiter.java
new file mode 100644
index 000000000..afaac6a82
--- /dev/null
+++ b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/source/RecordLimiter.java
@@ -0,0 +1,45 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.paimon.flink.source;
+
+import java.util.concurrent.atomic.AtomicLong;
+
+/** A limiter to limit record reading. */
+public class RecordLimiter {
+
+ private final long limit;
+ private final AtomicLong counter;
+
+ public RecordLimiter(long limit) {
+ this.limit = limit;
+ this.counter = new AtomicLong(0);
+ }
+
+ public boolean reachLimit() {
+ return counter.get() >= limit;
+ }
+
+ public void increment() {
+ counter.incrementAndGet();
+ }
+
+ public void add(long delta) {
+ counter.addAndGet(delta);
+ }
+}
diff --git a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/source/StaticFileStoreSplitEnumerator.java b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/source/StaticFileStoreSplitEnumerator.java
index d048cb6d1..1ce87566f 100644
--- a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/source/StaticFileStoreSplitEnumerator.java
+++ b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/source/StaticFileStoreSplitEnumerator.java
@@ -77,12 +77,9 @@ public class StaticFileStoreSplitEnumerator
}
// The following batch assignment operation is for two purposes:
- // 1. To distribute splits evenly when batch reading to prevent a few tasks from reading all
+ // To distribute splits evenly when batch reading to prevent a few tasks from reading all
// the data (for example, the current resource can only schedule part of the tasks).
- // 2. Optimize limit reading. In limit reading, the task will repeatedly create SplitFetcher
- // to read the data of the limit number for each coming split (the limit status is in the
- // SplitFetcher). So if the assigment are divided too small, the task will cost more time on
- // creating SplitFetcher and reading data.
+ // TODO: assignment is already created in constructor, here can just assign per batch
List<FileStoreSourceSplit> splits = pendingSplitAssignment.remove(subtask);
if (splits != null && splits.size() > 0) {
context.assignSplits(new SplitsAssignment<>(Collections.singletonMap(subtask, splits)));
diff --git a/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/source/FileStoreSourceSplitReaderTest.java b/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/source/FileStoreSourceSplitReaderTest.java
index d447739c4..e52cc294a 100644
--- a/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/source/FileStoreSourceSplitReaderTest.java
+++ b/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/source/FileStoreSourceSplitReaderTest.java
@@ -105,7 +105,10 @@ public class FileStoreSourceSplitReaderTest {
private FileStoreSourceSplitReader<RecordAndPosition<RowData>> createReader(
TableRead tableRead, @Nullable Long limit) {
- return new FileStoreSourceSplitReader<>(RecordsFunction.forIterate(), tableRead, limit);
+ return new FileStoreSourceSplitReader<>(
+ RecordsFunction.forIterate(),
+ tableRead,
+ limit == null ? null : new RecordLimiter(limit));
}
private void innerTestOnce(boolean valueCountMode, int skip) throws Exception {