You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by lz...@apache.org on 2022/07/29 10:25:52 UTC

[flink-table-store] branch master updated: [FLINK-28727] Flink Source supports `SupportsLimitPushDown`

This is an automated email from the ASF dual-hosted git repository.

lzljs3620320 pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/flink-table-store.git


The following commit(s) were added to refs/heads/master by this push:
     new 779afb51 [FLINK-28727] Flink Source supports `SupportsLimitPushDown`
779afb51 is described below

commit 779afb51190ca30ede9e4027972f5596523c409b
Author: Nicholas Jiang <pr...@163.com>
AuthorDate: Fri Jul 29 03:25:46 2022 -0700

    [FLINK-28727] Flink Source supports `SupportsLimitPushDown`
    
    This closes #255
---
 .../store/connector/source/FileStoreSource.java    |  8 +++-
 .../connector/source/FileStoreSourceReader.java    |  7 +++-
 .../source/FileStoreSourceSplitReader.java         |  8 +++-
 .../store/connector/source/FlinkSourceBuilder.java |  9 +++-
 .../store/connector/source/TableStoreSource.java   | 23 ++++++++++-
 .../source/FileStoreSourceReaderTest.java          |  3 +-
 .../source/FileStoreSourceSplitReaderTest.java     | 48 ++++++++++++++++++----
 7 files changed, 91 insertions(+), 15 deletions(-)

diff --git a/flink-table-store-connector/src/main/java/org/apache/flink/table/store/connector/source/FileStoreSource.java b/flink-table-store-connector/src/main/java/org/apache/flink/table/store/connector/source/FileStoreSource.java
index cd2c7a28..b54cbde1 100644
--- a/flink-table-store-connector/src/main/java/org/apache/flink/table/store/connector/source/FileStoreSource.java
+++ b/flink-table-store-connector/src/main/java/org/apache/flink/table/store/connector/source/FileStoreSource.java
@@ -58,19 +58,23 @@ public class FileStoreSource
 
     @Nullable private final Predicate predicate;
 
+    @Nullable private final Long limit;
+
     public FileStoreSource(
             FileStoreTable table,
             boolean isContinuous,
             long discoveryInterval,
             boolean latestContinuous,
             @Nullable int[][] projectedFields,
-            @Nullable Predicate predicate) {
+            @Nullable Predicate predicate,
+            @Nullable Long limit) {
         this.table = table;
         this.isContinuous = isContinuous;
         this.discoveryInterval = discoveryInterval;
         this.latestContinuous = latestContinuous;
         this.projectedFields = projectedFields;
         this.predicate = predicate;
+        this.limit = limit;
     }
 
     @Override
@@ -87,7 +91,7 @@ public class FileStoreSource
         if (predicate != null) {
             read.withFilter(predicate);
         }
-        return new FileStoreSourceReader(context, read);
+        return new FileStoreSourceReader(context, read, limit);
     }
 
     @Override
diff --git a/flink-table-store-connector/src/main/java/org/apache/flink/table/store/connector/source/FileStoreSourceReader.java b/flink-table-store-connector/src/main/java/org/apache/flink/table/store/connector/source/FileStoreSourceReader.java
index 6c837fd3..20ededda 100644
--- a/flink-table-store-connector/src/main/java/org/apache/flink/table/store/connector/source/FileStoreSourceReader.java
+++ b/flink-table-store-connector/src/main/java/org/apache/flink/table/store/connector/source/FileStoreSourceReader.java
@@ -25,6 +25,8 @@ import org.apache.flink.connector.file.src.util.RecordAndPosition;
 import org.apache.flink.table.data.RowData;
 import org.apache.flink.table.store.table.source.TableRead;
 
+import javax.annotation.Nullable;
+
 import java.util.Map;
 
 /** A {@link SourceReader} that read records from {@link FileStoreSourceSplit}. */
@@ -35,9 +37,10 @@ public final class FileStoreSourceReader
                 FileStoreSourceSplit,
                 FileStoreSourceSplitState> {
 
-    public FileStoreSourceReader(SourceReaderContext readerContext, TableRead tableRead) {
+    public FileStoreSourceReader(
+            SourceReaderContext readerContext, TableRead tableRead, @Nullable Long limit) {
         super(
-                () -> new FileStoreSourceSplitReader(tableRead),
+                () -> new FileStoreSourceSplitReader(tableRead, limit),
                 (element, output, splitState) -> {
                     output.collect(element.getRecord());
                     splitState.setPosition(element);
diff --git a/flink-table-store-connector/src/main/java/org/apache/flink/table/store/connector/source/FileStoreSourceSplitReader.java b/flink-table-store-connector/src/main/java/org/apache/flink/table/store/connector/source/FileStoreSourceSplitReader.java
index 3795259b..4a8ae0bd 100644
--- a/flink-table-store-connector/src/main/java/org/apache/flink/table/store/connector/source/FileStoreSourceSplitReader.java
+++ b/flink-table-store-connector/src/main/java/org/apache/flink/table/store/connector/source/FileStoreSourceSplitReader.java
@@ -43,6 +43,8 @@ public class FileStoreSourceSplitReader
 
     private final TableRead tableRead;
 
+    @Nullable private final Long limit;
+
     private final Queue<FileStoreSourceSplit> splits;
 
     private final Pool<FileStoreRecordIterator> pool;
@@ -52,8 +54,9 @@ public class FileStoreSourceSplitReader
     private long currentNumRead;
     private RecordReader.RecordIterator<RowData> currentFirstBatch;
 
-    public FileStoreSourceSplitReader(TableRead tableRead) {
+    public FileStoreSourceSplitReader(TableRead tableRead, @Nullable Long limit) {
         this.tableRead = tableRead;
+        this.limit = limit;
         this.splits = new LinkedList<>();
         this.pool = new Pool<>(1);
         this.pool.add(new FileStoreRecordIterator());
@@ -176,6 +179,9 @@ public class FileStoreSourceSplitReader
         @Nullable
         @Override
         public RecordAndPosition<RowData> next() {
+            if (limit != null && currentNumRead >= limit) {
+                return null;
+            }
             RowData row;
             try {
                 row = iterator.next();
diff --git a/flink-table-store-connector/src/main/java/org/apache/flink/table/store/connector/source/FlinkSourceBuilder.java b/flink-table-store-connector/src/main/java/org/apache/flink/table/store/connector/source/FlinkSourceBuilder.java
index 38c5c8ce..c3f565a6 100644
--- a/flink-table-store-connector/src/main/java/org/apache/flink/table/store/connector/source/FlinkSourceBuilder.java
+++ b/flink-table-store-connector/src/main/java/org/apache/flink/table/store/connector/source/FlinkSourceBuilder.java
@@ -60,6 +60,7 @@ public class FlinkSourceBuilder {
     @Nullable private Predicate predicate;
     @Nullable private LogSourceProvider logSourceProvider;
     @Nullable private Integer parallelism;
+    @Nullable private Long limit;
     @Nullable private WatermarkStrategy<RowData> watermarkStrategy;
 
     public FlinkSourceBuilder(ObjectIdentifier tableIdentifier, FileStoreTable table) {
@@ -88,6 +89,11 @@ public class FlinkSourceBuilder {
         return this;
     }
 
+    public FlinkSourceBuilder withLimit(@Nullable Long limit) {
+        this.limit = limit;
+        return this;
+    }
+
     public FlinkSourceBuilder withLogSourceProvider(LogSourceProvider logSourceProvider) {
         this.logSourceProvider = logSourceProvider;
         return this;
@@ -115,7 +121,8 @@ public class FlinkSourceBuilder {
                 discoveryIntervalMills(),
                 continuousScanLatest,
                 projectedFields,
-                predicate);
+                predicate,
+                limit);
     }
 
     private Source<RowData, ?, ?> buildSource() {
diff --git a/flink-table-store-connector/src/main/java/org/apache/flink/table/store/connector/source/TableStoreSource.java b/flink-table-store-connector/src/main/java/org/apache/flink/table/store/connector/source/TableStoreSource.java
index 8914e93c..2969f980 100644
--- a/flink-table-store-connector/src/main/java/org/apache/flink/table/store/connector/source/TableStoreSource.java
+++ b/flink-table-store-connector/src/main/java/org/apache/flink/table/store/connector/source/TableStoreSource.java
@@ -25,6 +25,7 @@ import org.apache.flink.table.connector.ChangelogMode;
 import org.apache.flink.table.connector.source.DynamicTableSource;
 import org.apache.flink.table.connector.source.ScanTableSource;
 import org.apache.flink.table.connector.source.abilities.SupportsFilterPushDown;
+import org.apache.flink.table.connector.source.abilities.SupportsLimitPushDown;
 import org.apache.flink.table.connector.source.abilities.SupportsProjectionPushDown;
 import org.apache.flink.table.connector.source.abilities.SupportsWatermarkPushDown;
 import org.apache.flink.table.data.RowData;
@@ -65,6 +66,7 @@ public class TableStoreSource
         implements ScanTableSource,
                 SupportsFilterPushDown,
                 SupportsProjectionPushDown,
+                SupportsLimitPushDown,
                 SupportsWatermarkPushDown {
 
     private final ObjectIdentifier tableIdentifier;
@@ -75,6 +77,7 @@ public class TableStoreSource
 
     @Nullable private Predicate predicate;
     @Nullable private int[][] projectFields;
+    @Nullable private Long limit;
 
     @Nullable private WatermarkStrategy<RowData> watermarkStrategy;
 
@@ -84,7 +87,16 @@ public class TableStoreSource
             boolean streaming,
             DynamicTableFactory.Context context,
             @Nullable LogStoreTableFactory logStoreTableFactory) {
-        this(tableIdentifier, table, streaming, context, logStoreTableFactory, null, null, null);
+        this(
+                tableIdentifier,
+                table,
+                streaming,
+                context,
+                logStoreTableFactory,
+                null,
+                null,
+                null,
+                null);
     }
 
     private TableStoreSource(
@@ -95,6 +107,7 @@ public class TableStoreSource
             @Nullable LogStoreTableFactory logStoreTableFactory,
             @Nullable Predicate predicate,
             @Nullable int[][] projectFields,
+            @Nullable Long limit,
             @Nullable WatermarkStrategy<RowData> watermarkStrategy) {
         this.tableIdentifier = tableIdentifier;
         this.table = table;
@@ -103,6 +116,7 @@ public class TableStoreSource
         this.logStoreTableFactory = logStoreTableFactory;
         this.predicate = predicate;
         this.projectFields = projectFields;
+        this.limit = limit;
         this.watermarkStrategy = watermarkStrategy;
     }
 
@@ -151,6 +165,7 @@ public class TableStoreSource
                         .withLogSourceProvider(logSourceProvider)
                         .withProjection(projectFields)
                         .withPredicate(predicate)
+                        .withLimit(limit)
                         .withParallelism(
                                 Configuration.fromMap(table.schema().options())
                                         .get(FlinkConnectorOptions.SCAN_PARALLELISM))
@@ -170,6 +185,7 @@ public class TableStoreSource
                 logStoreTableFactory,
                 predicate,
                 projectFields,
+                limit,
                 watermarkStrategy);
     }
 
@@ -203,4 +219,9 @@ public class TableStoreSource
     public void applyWatermark(WatermarkStrategy<RowData> watermarkStrategy) {
         this.watermarkStrategy = watermarkStrategy;
     }
+
+    @Override
+    public void applyLimit(long limit) {
+        this.limit = limit;
+    }
 }
diff --git a/flink-table-store-connector/src/test/java/org/apache/flink/table/store/connector/source/FileStoreSourceReaderTest.java b/flink-table-store-connector/src/test/java/org/apache/flink/table/store/connector/source/FileStoreSourceReaderTest.java
index 6025b735..a7d9d2dc 100644
--- a/flink-table-store-connector/src/test/java/org/apache/flink/table/store/connector/source/FileStoreSourceReaderTest.java
+++ b/flink-table-store-connector/src/test/java/org/apache/flink/table/store/connector/source/FileStoreSourceReaderTest.java
@@ -60,7 +60,8 @@ public class FileStoreSourceReaderTest {
     private FileStoreSourceReader createReader(TestingReaderContext context) {
         return new FileStoreSourceReader(
                 context,
-                new TestChangelogDataReadWrite(tempDir.toString(), null).createReadWithKey());
+                new TestChangelogDataReadWrite(tempDir.toString(), null).createReadWithKey(),
+                null);
     }
 
     private static FileStoreSourceSplit createTestFileSplit() {
diff --git a/flink-table-store-connector/src/test/java/org/apache/flink/table/store/connector/source/FileStoreSourceSplitReaderTest.java b/flink-table-store-connector/src/test/java/org/apache/flink/table/store/connector/source/FileStoreSourceSplitReaderTest.java
index 8da3af35..32a40de1 100644
--- a/flink-table-store-connector/src/test/java/org/apache/flink/table/store/connector/source/FileStoreSourceSplitReaderTest.java
+++ b/flink-table-store-connector/src/test/java/org/apache/flink/table/store/connector/source/FileStoreSourceSplitReaderTest.java
@@ -91,7 +91,8 @@ public class FileStoreSourceSplitReaderTest {
         TestChangelogDataReadWrite rw = new TestChangelogDataReadWrite(tempDir.toString(), service);
         FileStoreSourceSplitReader reader =
                 new FileStoreSourceSplitReader(
-                        valueCountMode ? rw.createReadWithValueCount() : rw.createReadWithKey());
+                        valueCountMode ? rw.createReadWithValueCount() : rw.createReadWithKey(),
+                        null);
 
         List<Tuple2<Long, Long>> input = kvs();
         List<DataFileMeta> files = rw.writeFiles(row(1), 0, input);
@@ -132,7 +133,8 @@ public class FileStoreSourceSplitReaderTest {
     @Test
     public void testPrimaryKeyWithDelete() throws Exception {
         TestChangelogDataReadWrite rw = new TestChangelogDataReadWrite(tempDir.toString(), service);
-        FileStoreSourceSplitReader reader = new FileStoreSourceSplitReader(rw.createReadWithKey());
+        FileStoreSourceSplitReader reader =
+                new FileStoreSourceSplitReader(rw.createReadWithKey(), null);
 
         List<Tuple2<Long, Long>> input = kvs();
         RecordWriter<KeyValue> writer = rw.createMergeTreeWriter(row(1), 0);
@@ -171,7 +173,8 @@ public class FileStoreSourceSplitReaderTest {
     @Test
     public void testMultipleBatchInSplit() throws Exception {
         TestChangelogDataReadWrite rw = new TestChangelogDataReadWrite(tempDir.toString(), service);
-        FileStoreSourceSplitReader reader = new FileStoreSourceSplitReader(rw.createReadWithKey());
+        FileStoreSourceSplitReader reader =
+                new FileStoreSourceSplitReader(rw.createReadWithKey(), null);
 
         List<Tuple2<Long, Long>> input1 = kvs();
         List<DataFileMeta> files = rw.writeFiles(row(1), 0, input1);
@@ -207,7 +210,8 @@ public class FileStoreSourceSplitReaderTest {
     @Test
     public void testRestore() throws Exception {
         TestChangelogDataReadWrite rw = new TestChangelogDataReadWrite(tempDir.toString(), service);
-        FileStoreSourceSplitReader reader = new FileStoreSourceSplitReader(rw.createReadWithKey());
+        FileStoreSourceSplitReader reader =
+                new FileStoreSourceSplitReader(rw.createReadWithKey(), null);
 
         List<Tuple2<Long, Long>> input = kvs();
         List<DataFileMeta> files = rw.writeFiles(row(1), 0, input);
@@ -233,7 +237,8 @@ public class FileStoreSourceSplitReaderTest {
     @Test
     public void testRestoreMultipleBatchInSplit() throws Exception {
         TestChangelogDataReadWrite rw = new TestChangelogDataReadWrite(tempDir.toString(), service);
-        FileStoreSourceSplitReader reader = new FileStoreSourceSplitReader(rw.createReadWithKey());
+        FileStoreSourceSplitReader reader =
+                new FileStoreSourceSplitReader(rw.createReadWithKey(), null);
 
         List<Tuple2<Long, Long>> input1 = kvs();
         List<DataFileMeta> files = rw.writeFiles(row(1), 0, input1);
@@ -264,7 +269,8 @@ public class FileStoreSourceSplitReaderTest {
     @Test
     public void testMultipleSplits() throws Exception {
         TestChangelogDataReadWrite rw = new TestChangelogDataReadWrite(tempDir.toString(), service);
-        FileStoreSourceSplitReader reader = new FileStoreSourceSplitReader(rw.createReadWithKey());
+        FileStoreSourceSplitReader reader =
+                new FileStoreSourceSplitReader(rw.createReadWithKey(), null);
 
         List<Tuple2<Long, Long>> input1 = kvs();
         List<DataFileMeta> files1 = rw.writeFiles(row(1), 0, input1);
@@ -302,11 +308,39 @@ public class FileStoreSourceSplitReaderTest {
     @Test
     public void testNoSplit() throws Exception {
         TestChangelogDataReadWrite rw = new TestChangelogDataReadWrite(tempDir.toString(), service);
-        FileStoreSourceSplitReader reader = new FileStoreSourceSplitReader(rw.createReadWithKey());
+        FileStoreSourceSplitReader reader =
+                new FileStoreSourceSplitReader(rw.createReadWithKey(), null);
         assertThatThrownBy(reader::fetch).hasMessageContaining("no split remaining");
         reader.close();
     }
 
+    @Test
+    public void testLimit() throws Exception {
+        TestChangelogDataReadWrite rw = new TestChangelogDataReadWrite(tempDir.toString(), service);
+        FileStoreSourceSplitReader reader =
+                new FileStoreSourceSplitReader(rw.createReadWithKey(), 2L);
+
+        List<Tuple2<Long, Long>> input = kvs();
+        List<DataFileMeta> files = rw.writeFiles(row(1), 0, input);
+
+        assignSplit(reader, newSourceSplit("id1", row(1), 0, files, 0));
+
+        RecordsWithSplitIds<RecordAndPosition<RowData>> records = reader.fetch();
+
+        List<Tuple2<RowKind, Long>> expected =
+                input.stream()
+                        .map(t -> new Tuple2<>(RowKind.INSERT, t.f1))
+                        .collect(Collectors.toList());
+
+        List<Tuple2<RowKind, Long>> result = readRecords(records, "id1", 0);
+        assertThat(result).isEqualTo(expected.subList(0, 2));
+
+        records = reader.fetch();
+        assertRecords(records, null, "id1", 0, Collections.emptyList());
+
+        reader.close();
+    }
+
     private void assertRecords(
             RecordsWithSplitIds<RecordAndPosition<RowData>> records,
             String finishedSplit,