You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by lz...@apache.org on 2022/07/29 10:25:52 UTC
[flink-table-store] branch master updated: [FLINK-28727] Flink Source supports `SupportsLimitPushDown`
This is an automated email from the ASF dual-hosted git repository.
lzljs3620320 pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/flink-table-store.git
The following commit(s) were added to refs/heads/master by this push:
new 779afb51 [FLINK-28727] Flink Source supports `SupportsLimitPushDown`
779afb51 is described below
commit 779afb51190ca30ede9e4027972f5596523c409b
Author: Nicholas Jiang <pr...@163.com>
AuthorDate: Fri Jul 29 03:25:46 2022 -0700
[FLINK-28727] Flink Source supports `SupportsLimitPushDown`
This closes #255
---
.../store/connector/source/FileStoreSource.java | 8 +++-
.../connector/source/FileStoreSourceReader.java | 7 +++-
.../source/FileStoreSourceSplitReader.java | 8 +++-
.../store/connector/source/FlinkSourceBuilder.java | 9 +++-
.../store/connector/source/TableStoreSource.java | 23 ++++++++++-
.../source/FileStoreSourceReaderTest.java | 3 +-
.../source/FileStoreSourceSplitReaderTest.java | 48 ++++++++++++++++++----
7 files changed, 91 insertions(+), 15 deletions(-)
diff --git a/flink-table-store-connector/src/main/java/org/apache/flink/table/store/connector/source/FileStoreSource.java b/flink-table-store-connector/src/main/java/org/apache/flink/table/store/connector/source/FileStoreSource.java
index cd2c7a28..b54cbde1 100644
--- a/flink-table-store-connector/src/main/java/org/apache/flink/table/store/connector/source/FileStoreSource.java
+++ b/flink-table-store-connector/src/main/java/org/apache/flink/table/store/connector/source/FileStoreSource.java
@@ -58,19 +58,23 @@ public class FileStoreSource
@Nullable private final Predicate predicate;
+ @Nullable private final Long limit;
+
public FileStoreSource(
FileStoreTable table,
boolean isContinuous,
long discoveryInterval,
boolean latestContinuous,
@Nullable int[][] projectedFields,
- @Nullable Predicate predicate) {
+ @Nullable Predicate predicate,
+ @Nullable Long limit) {
this.table = table;
this.isContinuous = isContinuous;
this.discoveryInterval = discoveryInterval;
this.latestContinuous = latestContinuous;
this.projectedFields = projectedFields;
this.predicate = predicate;
+ this.limit = limit;
}
@Override
@@ -87,7 +91,7 @@ public class FileStoreSource
if (predicate != null) {
read.withFilter(predicate);
}
- return new FileStoreSourceReader(context, read);
+ return new FileStoreSourceReader(context, read, limit);
}
@Override
diff --git a/flink-table-store-connector/src/main/java/org/apache/flink/table/store/connector/source/FileStoreSourceReader.java b/flink-table-store-connector/src/main/java/org/apache/flink/table/store/connector/source/FileStoreSourceReader.java
index 6c837fd3..20ededda 100644
--- a/flink-table-store-connector/src/main/java/org/apache/flink/table/store/connector/source/FileStoreSourceReader.java
+++ b/flink-table-store-connector/src/main/java/org/apache/flink/table/store/connector/source/FileStoreSourceReader.java
@@ -25,6 +25,8 @@ import org.apache.flink.connector.file.src.util.RecordAndPosition;
import org.apache.flink.table.data.RowData;
import org.apache.flink.table.store.table.source.TableRead;
+import javax.annotation.Nullable;
+
import java.util.Map;
/** A {@link SourceReader} that read records from {@link FileStoreSourceSplit}. */
@@ -35,9 +37,10 @@ public final class FileStoreSourceReader
FileStoreSourceSplit,
FileStoreSourceSplitState> {
- public FileStoreSourceReader(SourceReaderContext readerContext, TableRead tableRead) {
+ public FileStoreSourceReader(
+ SourceReaderContext readerContext, TableRead tableRead, @Nullable Long limit) {
super(
- () -> new FileStoreSourceSplitReader(tableRead),
+ () -> new FileStoreSourceSplitReader(tableRead, limit),
(element, output, splitState) -> {
output.collect(element.getRecord());
splitState.setPosition(element);
diff --git a/flink-table-store-connector/src/main/java/org/apache/flink/table/store/connector/source/FileStoreSourceSplitReader.java b/flink-table-store-connector/src/main/java/org/apache/flink/table/store/connector/source/FileStoreSourceSplitReader.java
index 3795259b..4a8ae0bd 100644
--- a/flink-table-store-connector/src/main/java/org/apache/flink/table/store/connector/source/FileStoreSourceSplitReader.java
+++ b/flink-table-store-connector/src/main/java/org/apache/flink/table/store/connector/source/FileStoreSourceSplitReader.java
@@ -43,6 +43,8 @@ public class FileStoreSourceSplitReader
private final TableRead tableRead;
+ @Nullable private final Long limit;
+
private final Queue<FileStoreSourceSplit> splits;
private final Pool<FileStoreRecordIterator> pool;
@@ -52,8 +54,9 @@ public class FileStoreSourceSplitReader
private long currentNumRead;
private RecordReader.RecordIterator<RowData> currentFirstBatch;
- public FileStoreSourceSplitReader(TableRead tableRead) {
+ public FileStoreSourceSplitReader(TableRead tableRead, @Nullable Long limit) {
this.tableRead = tableRead;
+ this.limit = limit;
this.splits = new LinkedList<>();
this.pool = new Pool<>(1);
this.pool.add(new FileStoreRecordIterator());
@@ -176,6 +179,9 @@ public class FileStoreSourceSplitReader
@Nullable
@Override
public RecordAndPosition<RowData> next() {
+ if (limit != null && currentNumRead >= limit) {
+ return null;
+ }
RowData row;
try {
row = iterator.next();
diff --git a/flink-table-store-connector/src/main/java/org/apache/flink/table/store/connector/source/FlinkSourceBuilder.java b/flink-table-store-connector/src/main/java/org/apache/flink/table/store/connector/source/FlinkSourceBuilder.java
index 38c5c8ce..c3f565a6 100644
--- a/flink-table-store-connector/src/main/java/org/apache/flink/table/store/connector/source/FlinkSourceBuilder.java
+++ b/flink-table-store-connector/src/main/java/org/apache/flink/table/store/connector/source/FlinkSourceBuilder.java
@@ -60,6 +60,7 @@ public class FlinkSourceBuilder {
@Nullable private Predicate predicate;
@Nullable private LogSourceProvider logSourceProvider;
@Nullable private Integer parallelism;
+ @Nullable private Long limit;
@Nullable private WatermarkStrategy<RowData> watermarkStrategy;
public FlinkSourceBuilder(ObjectIdentifier tableIdentifier, FileStoreTable table) {
@@ -88,6 +89,11 @@ public class FlinkSourceBuilder {
return this;
}
+ public FlinkSourceBuilder withLimit(@Nullable Long limit) {
+ this.limit = limit;
+ return this;
+ }
+
public FlinkSourceBuilder withLogSourceProvider(LogSourceProvider logSourceProvider) {
this.logSourceProvider = logSourceProvider;
return this;
@@ -115,7 +121,8 @@ public class FlinkSourceBuilder {
discoveryIntervalMills(),
continuousScanLatest,
projectedFields,
- predicate);
+ predicate,
+ limit);
}
private Source<RowData, ?, ?> buildSource() {
diff --git a/flink-table-store-connector/src/main/java/org/apache/flink/table/store/connector/source/TableStoreSource.java b/flink-table-store-connector/src/main/java/org/apache/flink/table/store/connector/source/TableStoreSource.java
index 8914e93c..2969f980 100644
--- a/flink-table-store-connector/src/main/java/org/apache/flink/table/store/connector/source/TableStoreSource.java
+++ b/flink-table-store-connector/src/main/java/org/apache/flink/table/store/connector/source/TableStoreSource.java
@@ -25,6 +25,7 @@ import org.apache.flink.table.connector.ChangelogMode;
import org.apache.flink.table.connector.source.DynamicTableSource;
import org.apache.flink.table.connector.source.ScanTableSource;
import org.apache.flink.table.connector.source.abilities.SupportsFilterPushDown;
+import org.apache.flink.table.connector.source.abilities.SupportsLimitPushDown;
import org.apache.flink.table.connector.source.abilities.SupportsProjectionPushDown;
import org.apache.flink.table.connector.source.abilities.SupportsWatermarkPushDown;
import org.apache.flink.table.data.RowData;
@@ -65,6 +66,7 @@ public class TableStoreSource
implements ScanTableSource,
SupportsFilterPushDown,
SupportsProjectionPushDown,
+ SupportsLimitPushDown,
SupportsWatermarkPushDown {
private final ObjectIdentifier tableIdentifier;
@@ -75,6 +77,7 @@ public class TableStoreSource
@Nullable private Predicate predicate;
@Nullable private int[][] projectFields;
+ @Nullable private Long limit;
@Nullable private WatermarkStrategy<RowData> watermarkStrategy;
@@ -84,7 +87,16 @@ public class TableStoreSource
boolean streaming,
DynamicTableFactory.Context context,
@Nullable LogStoreTableFactory logStoreTableFactory) {
- this(tableIdentifier, table, streaming, context, logStoreTableFactory, null, null, null);
+ this(
+ tableIdentifier,
+ table,
+ streaming,
+ context,
+ logStoreTableFactory,
+ null,
+ null,
+ null,
+ null);
}
private TableStoreSource(
@@ -95,6 +107,7 @@ public class TableStoreSource
@Nullable LogStoreTableFactory logStoreTableFactory,
@Nullable Predicate predicate,
@Nullable int[][] projectFields,
+ @Nullable Long limit,
@Nullable WatermarkStrategy<RowData> watermarkStrategy) {
this.tableIdentifier = tableIdentifier;
this.table = table;
@@ -103,6 +116,7 @@ public class TableStoreSource
this.logStoreTableFactory = logStoreTableFactory;
this.predicate = predicate;
this.projectFields = projectFields;
+ this.limit = limit;
this.watermarkStrategy = watermarkStrategy;
}
@@ -151,6 +165,7 @@ public class TableStoreSource
.withLogSourceProvider(logSourceProvider)
.withProjection(projectFields)
.withPredicate(predicate)
+ .withLimit(limit)
.withParallelism(
Configuration.fromMap(table.schema().options())
.get(FlinkConnectorOptions.SCAN_PARALLELISM))
@@ -170,6 +185,7 @@ public class TableStoreSource
logStoreTableFactory,
predicate,
projectFields,
+ limit,
watermarkStrategy);
}
@@ -203,4 +219,9 @@ public class TableStoreSource
public void applyWatermark(WatermarkStrategy<RowData> watermarkStrategy) {
this.watermarkStrategy = watermarkStrategy;
}
+
+ @Override
+ public void applyLimit(long limit) {
+ this.limit = limit;
+ }
}
diff --git a/flink-table-store-connector/src/test/java/org/apache/flink/table/store/connector/source/FileStoreSourceReaderTest.java b/flink-table-store-connector/src/test/java/org/apache/flink/table/store/connector/source/FileStoreSourceReaderTest.java
index 6025b735..a7d9d2dc 100644
--- a/flink-table-store-connector/src/test/java/org/apache/flink/table/store/connector/source/FileStoreSourceReaderTest.java
+++ b/flink-table-store-connector/src/test/java/org/apache/flink/table/store/connector/source/FileStoreSourceReaderTest.java
@@ -60,7 +60,8 @@ public class FileStoreSourceReaderTest {
private FileStoreSourceReader createReader(TestingReaderContext context) {
return new FileStoreSourceReader(
context,
- new TestChangelogDataReadWrite(tempDir.toString(), null).createReadWithKey());
+ new TestChangelogDataReadWrite(tempDir.toString(), null).createReadWithKey(),
+ null);
}
private static FileStoreSourceSplit createTestFileSplit() {
diff --git a/flink-table-store-connector/src/test/java/org/apache/flink/table/store/connector/source/FileStoreSourceSplitReaderTest.java b/flink-table-store-connector/src/test/java/org/apache/flink/table/store/connector/source/FileStoreSourceSplitReaderTest.java
index 8da3af35..32a40de1 100644
--- a/flink-table-store-connector/src/test/java/org/apache/flink/table/store/connector/source/FileStoreSourceSplitReaderTest.java
+++ b/flink-table-store-connector/src/test/java/org/apache/flink/table/store/connector/source/FileStoreSourceSplitReaderTest.java
@@ -91,7 +91,8 @@ public class FileStoreSourceSplitReaderTest {
TestChangelogDataReadWrite rw = new TestChangelogDataReadWrite(tempDir.toString(), service);
FileStoreSourceSplitReader reader =
new FileStoreSourceSplitReader(
- valueCountMode ? rw.createReadWithValueCount() : rw.createReadWithKey());
+ valueCountMode ? rw.createReadWithValueCount() : rw.createReadWithKey(),
+ null);
List<Tuple2<Long, Long>> input = kvs();
List<DataFileMeta> files = rw.writeFiles(row(1), 0, input);
@@ -132,7 +133,8 @@ public class FileStoreSourceSplitReaderTest {
@Test
public void testPrimaryKeyWithDelete() throws Exception {
TestChangelogDataReadWrite rw = new TestChangelogDataReadWrite(tempDir.toString(), service);
- FileStoreSourceSplitReader reader = new FileStoreSourceSplitReader(rw.createReadWithKey());
+ FileStoreSourceSplitReader reader =
+ new FileStoreSourceSplitReader(rw.createReadWithKey(), null);
List<Tuple2<Long, Long>> input = kvs();
RecordWriter<KeyValue> writer = rw.createMergeTreeWriter(row(1), 0);
@@ -171,7 +173,8 @@ public class FileStoreSourceSplitReaderTest {
@Test
public void testMultipleBatchInSplit() throws Exception {
TestChangelogDataReadWrite rw = new TestChangelogDataReadWrite(tempDir.toString(), service);
- FileStoreSourceSplitReader reader = new FileStoreSourceSplitReader(rw.createReadWithKey());
+ FileStoreSourceSplitReader reader =
+ new FileStoreSourceSplitReader(rw.createReadWithKey(), null);
List<Tuple2<Long, Long>> input1 = kvs();
List<DataFileMeta> files = rw.writeFiles(row(1), 0, input1);
@@ -207,7 +210,8 @@ public class FileStoreSourceSplitReaderTest {
@Test
public void testRestore() throws Exception {
TestChangelogDataReadWrite rw = new TestChangelogDataReadWrite(tempDir.toString(), service);
- FileStoreSourceSplitReader reader = new FileStoreSourceSplitReader(rw.createReadWithKey());
+ FileStoreSourceSplitReader reader =
+ new FileStoreSourceSplitReader(rw.createReadWithKey(), null);
List<Tuple2<Long, Long>> input = kvs();
List<DataFileMeta> files = rw.writeFiles(row(1), 0, input);
@@ -233,7 +237,8 @@ public class FileStoreSourceSplitReaderTest {
@Test
public void testRestoreMultipleBatchInSplit() throws Exception {
TestChangelogDataReadWrite rw = new TestChangelogDataReadWrite(tempDir.toString(), service);
- FileStoreSourceSplitReader reader = new FileStoreSourceSplitReader(rw.createReadWithKey());
+ FileStoreSourceSplitReader reader =
+ new FileStoreSourceSplitReader(rw.createReadWithKey(), null);
List<Tuple2<Long, Long>> input1 = kvs();
List<DataFileMeta> files = rw.writeFiles(row(1), 0, input1);
@@ -264,7 +269,8 @@ public class FileStoreSourceSplitReaderTest {
@Test
public void testMultipleSplits() throws Exception {
TestChangelogDataReadWrite rw = new TestChangelogDataReadWrite(tempDir.toString(), service);
- FileStoreSourceSplitReader reader = new FileStoreSourceSplitReader(rw.createReadWithKey());
+ FileStoreSourceSplitReader reader =
+ new FileStoreSourceSplitReader(rw.createReadWithKey(), null);
List<Tuple2<Long, Long>> input1 = kvs();
List<DataFileMeta> files1 = rw.writeFiles(row(1), 0, input1);
@@ -302,11 +308,39 @@ public class FileStoreSourceSplitReaderTest {
@Test
public void testNoSplit() throws Exception {
TestChangelogDataReadWrite rw = new TestChangelogDataReadWrite(tempDir.toString(), service);
- FileStoreSourceSplitReader reader = new FileStoreSourceSplitReader(rw.createReadWithKey());
+ FileStoreSourceSplitReader reader =
+ new FileStoreSourceSplitReader(rw.createReadWithKey(), null);
assertThatThrownBy(reader::fetch).hasMessageContaining("no split remaining");
reader.close();
}
+ @Test
+ public void testLimit() throws Exception {
+ TestChangelogDataReadWrite rw = new TestChangelogDataReadWrite(tempDir.toString(), service);
+ FileStoreSourceSplitReader reader =
+ new FileStoreSourceSplitReader(rw.createReadWithKey(), 2L);
+
+ List<Tuple2<Long, Long>> input = kvs();
+ List<DataFileMeta> files = rw.writeFiles(row(1), 0, input);
+
+ assignSplit(reader, newSourceSplit("id1", row(1), 0, files, 0));
+
+ RecordsWithSplitIds<RecordAndPosition<RowData>> records = reader.fetch();
+
+ List<Tuple2<RowKind, Long>> expected =
+ input.stream()
+ .map(t -> new Tuple2<>(RowKind.INSERT, t.f1))
+ .collect(Collectors.toList());
+
+ List<Tuple2<RowKind, Long>> result = readRecords(records, "id1", 0);
+ assertThat(result).isEqualTo(expected.subList(0, 2));
+
+ records = reader.fetch();
+ assertRecords(records, null, "id1", 0, Collections.emptyList());
+
+ reader.close();
+ }
+
private void assertRecords(
RecordsWithSplitIds<RecordAndPosition<RowData>> records,
String finishedSplit,