You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by lz...@apache.org on 2022/03/15 14:20:02 UTC

[flink-table-store] branch master updated: [FLINK-26475] FileStoreScan supports incremental consuming

This is an automated email from the ASF dual-hosted git repository.

lzljs3620320 pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/flink-table-store.git


The following commit(s) were added to refs/heads/master by this push:
     new 7bbffef  [FLINK-26475] FileStoreScan supports incremental consuming
7bbffef is described below

commit 7bbffef4fb26048500d37dfc42bdf478df0de73e
Author: Jingsong Lee <ji...@gmail.com>
AuthorDate: Tue Mar 15 22:14:34 2022 +0800

    [FLINK-26475] FileStoreScan supports incremental consuming
    
    This closes #43
---
 .../flink/table/store/connector/TableStore.java    |  16 +-
 .../source/ContinuousFileSplitEnumerator.java      | 239 +++++++++++++++++++++
 .../store/connector/source/FileStoreSource.java    |  61 +++++-
 .../source/StaticFileStoreSplitEnumerator.java     |  10 -
 .../table/store/connector/FileStoreITCase.java     |  77 ++++++-
 .../table/store/connector/FiniteTestSource.java    |  23 +-
 .../store/connector/ReadWriteTableITCase.java      |  14 +-
 .../flink/table/store/file/FileStoreOptions.java   |   6 +
 .../table/store/file/operation/FileStoreScan.java  |   4 +
 .../store/file/operation/FileStoreScanImpl.java    |  26 ++-
 10 files changed, 425 insertions(+), 51 deletions(-)

diff --git a/flink-table-store-connector/src/main/java/org/apache/flink/table/store/connector/TableStore.java b/flink-table-store-connector/src/main/java/org/apache/flink/table/store/connector/TableStore.java
index 534cbe7..ac8a3a0 100644
--- a/flink-table-store-connector/src/main/java/org/apache/flink/table/store/connector/TableStore.java
+++ b/flink-table-store-connector/src/main/java/org/apache/flink/table/store/connector/TableStore.java
@@ -59,6 +59,7 @@ import java.util.UUID;
 import java.util.stream.Collectors;
 
 import static org.apache.flink.table.store.file.FileStoreOptions.BUCKET;
+import static org.apache.flink.table.store.file.FileStoreOptions.CONTINUOUS_DISCOVERY_INTERVAL;
 
 /** A table store api to create source and sink. */
 @Experimental
@@ -207,11 +208,17 @@ public class TableStore {
             return this;
         }
 
-        private FileStoreSource buildFileStoreSource() {
+        private long discoveryIntervalMills() {
+            return options.get(CONTINUOUS_DISCOVERY_INTERVAL).toMillis();
+        }
+
+        private FileStoreSource buildFileSource(boolean isContinuous) {
             FileStore fileStore = buildFileStore();
             return new FileStoreSource(
                     fileStore,
                     primaryKeys.length == 0,
+                    isContinuous,
+                    discoveryIntervalMills(),
                     projectedFields,
                     partitionPredicate,
                     fieldPredicate);
@@ -220,15 +227,14 @@ public class TableStore {
         public Source<RowData, ?, ?> build() {
             if (isContinuous) {
                 if (logSourceProvider == null) {
-                    throw new UnsupportedOperationException(
-                            "File store continuous mode is not supported yet.");
+                    return buildFileSource(true);
                 }
 
                 // TODO project log source
 
                 if (isHybrid) {
                     return HybridSource.<RowData, StaticFileStoreSplitEnumerator>builder(
-                                    buildFileStoreSource())
+                                    buildFileSource(false))
                             .addSource(
                                     new LogHybridSourceFactory(logSourceProvider),
                                     Boundedness.CONTINUOUS_UNBOUNDED)
@@ -237,7 +243,7 @@ public class TableStore {
                     return logSourceProvider.createSource(null);
                 }
             } else {
-                return buildFileStoreSource();
+                return buildFileSource(false);
             }
         }
 
diff --git a/flink-table-store-connector/src/main/java/org/apache/flink/table/store/connector/source/ContinuousFileSplitEnumerator.java b/flink-table-store-connector/src/main/java/org/apache/flink/table/store/connector/source/ContinuousFileSplitEnumerator.java
new file mode 100644
index 0000000..35e42cf
--- /dev/null
+++ b/flink-table-store-connector/src/main/java/org/apache/flink/table/store/connector/source/ContinuousFileSplitEnumerator.java
@@ -0,0 +1,239 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.store.connector.source;
+
+import org.apache.flink.api.connector.source.SourceEvent;
+import org.apache.flink.api.connector.source.SplitEnumerator;
+import org.apache.flink.api.connector.source.SplitEnumeratorContext;
+import org.apache.flink.table.store.file.Snapshot;
+import org.apache.flink.table.store.file.operation.FileStoreScan;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import javax.annotation.Nullable;
+
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.Collection;
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.LinkedList;
+import java.util.List;
+import java.util.Map;
+import java.util.Queue;
+import java.util.Set;
+import java.util.concurrent.Callable;
+
+import static org.apache.flink.table.store.connector.source.PendingSplitsCheckpoint.INVALID_SNAPSHOT;
+import static org.apache.flink.util.Preconditions.checkArgument;
+import static org.apache.flink.util.Preconditions.checkNotNull;
+
+/** A continuously monitoring enumerator. */
+public class ContinuousFileSplitEnumerator
+        implements SplitEnumerator<FileStoreSourceSplit, PendingSplitsCheckpoint> {
+
+    private static final Logger LOG = LoggerFactory.getLogger(ContinuousFileSplitEnumerator.class);
+
+    private final SplitEnumeratorContext<FileStoreSourceSplit> context;
+
+    private final FileStoreScan scan;
+
+    private final Map<Integer, Queue<FileStoreSourceSplit>> bucketSplits;
+
+    private final long discoveryInterval;
+
+    private final Set<Integer> readersAwaitingSplit;
+
+    private final FileStoreSourceSplitGenerator splitGenerator;
+
+    private final SnapshotEnumerator snapshotEnumerator;
+
+    private Long currentSnapshotId;
+
+    public ContinuousFileSplitEnumerator(
+            SplitEnumeratorContext<FileStoreSourceSplit> context,
+            FileStoreScan scan,
+            Collection<FileStoreSourceSplit> remainSplits,
+            long currentSnapshotId,
+            long discoveryInterval) {
+        checkArgument(discoveryInterval > 0L);
+        this.context = checkNotNull(context);
+        this.scan = checkNotNull(scan);
+        this.bucketSplits = new HashMap<>();
+        addSplits(remainSplits);
+        this.currentSnapshotId = currentSnapshotId;
+        this.discoveryInterval = discoveryInterval;
+        this.readersAwaitingSplit = new HashSet<>();
+        this.splitGenerator = new FileStoreSourceSplitGenerator();
+        this.snapshotEnumerator = new SnapshotEnumerator(currentSnapshotId);
+    }
+
+    private void addSplits(Collection<FileStoreSourceSplit> splits) {
+        splits.forEach(this::addSplit);
+    }
+
+    private void addSplit(FileStoreSourceSplit split) {
+        bucketSplits.computeIfAbsent(split.bucket(), i -> new LinkedList<>()).add(split);
+    }
+
+    @Override
+    public void start() {
+        context.callAsync(
+                snapshotEnumerator,
+                this::processDiscoveredSplits,
+                discoveryInterval,
+                discoveryInterval);
+    }
+
+    @Override
+    public void close() throws IOException {
+        // no resources to close
+    }
+
+    @Override
+    public void addReader(int subtaskId) {
+        // this source is purely lazy-pull-based, nothing to do upon registration
+    }
+
+    @Override
+    public void handleSplitRequest(int subtaskId, @Nullable String requesterHostname) {
+        readersAwaitingSplit.add(subtaskId);
+        assignSplits();
+    }
+
+    @Override
+    public void handleSourceEvent(int subtaskId, SourceEvent sourceEvent) {
+        LOG.error("Received unrecognized event: {}", sourceEvent);
+    }
+
+    @Override
+    public void addSplitsBack(List<FileStoreSourceSplit> splits, int subtaskId) {
+        LOG.debug("File Source Enumerator adds splits back: {}", splits);
+        addSplits(splits);
+    }
+
+    @Override
+    public PendingSplitsCheckpoint snapshotState(long checkpointId) {
+        List<FileStoreSourceSplit> splits = new ArrayList<>();
+        bucketSplits.values().forEach(splits::addAll);
+        final PendingSplitsCheckpoint checkpoint =
+                new PendingSplitsCheckpoint(
+                        splits, currentSnapshotId == null ? INVALID_SNAPSHOT : currentSnapshotId);
+
+        LOG.debug("Source Checkpoint is {}", checkpoint);
+        return checkpoint;
+    }
+
+    // ------------------------------------------------------------------------
+
+    private void processDiscoveredSplits(@Nullable EnumeratorResult result, Throwable error) {
+        if (error != null) {
+            LOG.error("Failed to enumerate files", error);
+            return;
+        }
+
+        if (result == null) {
+            return;
+        }
+
+        currentSnapshotId = result.snapshotId;
+        addSplits(result.splits);
+        assignSplits();
+    }
+
+    private void assignSplits() {
+        bucketSplits.forEach(
+                (bucket, splits) -> {
+                    if (splits.size() > 0) {
+                        // To ensure the order of consumption, the data of the same bucket is given
+                        // to a task to be consumed.
+                        int task = bucket % context.currentParallelism();
+                        if (readersAwaitingSplit.remove(task)) {
+                            // if the reader that requested another split has failed in the
+                            // meantime, remove
+                            // it from the list of waiting readers
+                            if (!context.registeredReaders().containsKey(task)) {
+                                return;
+                            }
+                        }
+                        context.assignSplit(splits.poll(), task);
+                    }
+                });
+    }
+
+    private class SnapshotEnumerator implements Callable<EnumeratorResult> {
+
+        private long nextSnapshotId;
+
+        private SnapshotEnumerator(long currentSnapshot) {
+            this.nextSnapshotId = currentSnapshot + 1;
+        }
+
+        @Nullable
+        @Override
+        public EnumeratorResult call() {
+            // TODO sync with processDiscoveredSplits to avoid too more splits in memory
+            while (true) {
+                if (!scan.snapshotExists(nextSnapshotId)) {
+                    // TODO check latest snapshot id, expired?
+                    LOG.debug(
+                            "Next snapshot id {} not exists, wait for it to be generated.",
+                            nextSnapshotId);
+                    return null;
+                }
+
+                Snapshot snapshot = scan.snapshot(nextSnapshotId);
+                if (snapshot.commitKind() != Snapshot.CommitKind.APPEND) {
+                    if (snapshot.commitKind() == Snapshot.CommitKind.OVERWRITE) {
+                        throw new UnsupportedOperationException(
+                                "Invalid overwrite snapshot id: " + nextSnapshotId);
+                    }
+
+                    nextSnapshotId++;
+                    LOG.debug(
+                            "Next snapshot id {} is not append, but is {}, check next one.",
+                            nextSnapshotId,
+                            snapshot.commitKind());
+                    continue;
+                }
+
+                List<FileStoreSourceSplit> splits =
+                        splitGenerator.createSplits(scan.withSnapshot(nextSnapshotId).plan());
+                EnumeratorResult result = new EnumeratorResult(nextSnapshotId, splits);
+                LOG.debug("Find snapshot id {}, it has {} splits.", nextSnapshotId, splits.size());
+
+                nextSnapshotId++;
+                return result;
+            }
+        }
+    }
+
+    private static class EnumeratorResult {
+
+        private final long snapshotId;
+
+        private final List<FileStoreSourceSplit> splits;
+
+        private EnumeratorResult(long snapshotId, List<FileStoreSourceSplit> splits) {
+            this.snapshotId = snapshotId;
+            this.splits = splits;
+        }
+    }
+}
diff --git a/flink-table-store-connector/src/main/java/org/apache/flink/table/store/connector/source/FileStoreSource.java b/flink-table-store-connector/src/main/java/org/apache/flink/table/store/connector/source/FileStoreSource.java
index 6a72c55..36b7c68 100644
--- a/flink-table-store-connector/src/main/java/org/apache/flink/table/store/connector/source/FileStoreSource.java
+++ b/flink-table-store-connector/src/main/java/org/apache/flink/table/store/connector/source/FileStoreSource.java
@@ -33,6 +33,8 @@ import org.apache.flink.table.store.file.predicate.Predicate;
 
 import javax.annotation.Nullable;
 
+import java.util.Collection;
+
 import static org.apache.flink.table.store.connector.source.PendingSplitsCheckpoint.INVALID_SNAPSHOT;
 
 /** {@link Source} of file store. */
@@ -45,6 +47,10 @@ public class FileStoreSource
 
     private final boolean valueCountMode;
 
+    private final boolean isContinuous;
+
+    private final long discoveryInterval;
+
     @Nullable private final int[][] projectedFields;
 
     @Nullable private final Predicate partitionPredicate;
@@ -54,11 +60,15 @@ public class FileStoreSource
     public FileStoreSource(
             FileStore fileStore,
             boolean valueCountMode,
+            boolean isContinuous,
+            long discoveryInterval,
             @Nullable int[][] projectedFields,
             @Nullable Predicate partitionPredicate,
             final Predicate fieldPredicate) {
         this.fileStore = fileStore;
         this.valueCountMode = valueCountMode;
+        this.isContinuous = isContinuous;
+        this.discoveryInterval = discoveryInterval;
         this.projectedFields = projectedFields;
         this.partitionPredicate = partitionPredicate;
         this.fieldPredicate = fieldPredicate;
@@ -73,20 +83,34 @@ public class FileStoreSource
     @Override
     public SourceReader<RowData, FileStoreSourceSplit> createReader(SourceReaderContext context) {
         FileStoreRead read = fileStore.newRead();
+
+        if (isContinuous) {
+            read.withDropDelete(false);
+        }
+
         if (projectedFields != null) {
             if (valueCountMode) {
-                // TODO don't project keys, and add key projection to split reader
+                // TODO when isContinuous is false, don't project keys, and add key projection to
+                // split reader
                 read.withKeyProjection(projectedFields);
             } else {
                 read.withValueProjection(projectedFields);
             }
         }
+
         return new FileStoreSourceReader(context, read, valueCountMode);
     }
 
     @Override
     public SplitEnumerator<FileStoreSourceSplit, PendingSplitsCheckpoint> createEnumerator(
             SplitEnumeratorContext<FileStoreSourceSplit> context) {
+        return restoreEnumerator(context, null);
+    }
+
+    @Override
+    public SplitEnumerator<FileStoreSourceSplit, PendingSplitsCheckpoint> restoreEnumerator(
+            SplitEnumeratorContext<FileStoreSourceSplit> context,
+            PendingSplitsCheckpoint checkpoint) {
         FileStoreScan scan = fileStore.newScan();
         if (partitionPredicate != null) {
             scan.withPartitionFilter(partitionPredicate);
@@ -98,18 +122,33 @@ public class FileStoreSource
                 scan.withValueFilter(fieldPredicate);
             }
         }
-        return new StaticFileStoreSplitEnumerator(context, scan);
-    }
 
-    @Override
-    public SplitEnumerator<FileStoreSourceSplit, PendingSplitsCheckpoint> restoreEnumerator(
-            SplitEnumeratorContext<FileStoreSourceSplit> context,
-            PendingSplitsCheckpoint checkpoint) {
-        Snapshot snapshot = null;
-        if (checkpoint.currentSnapshotId() != INVALID_SNAPSHOT) {
-            snapshot = fileStore.newScan().snapshot(checkpoint.currentSnapshotId());
+        Long snapshotId;
+        Collection<FileStoreSourceSplit> splits;
+        if (checkpoint == null) {
+            FileStoreScan.Plan plan = scan.plan();
+            snapshotId = plan.snapshotId();
+            splits = new FileStoreSourceSplitGenerator().createSplits(plan);
+        } else {
+            snapshotId = checkpoint.currentSnapshotId();
+            if (snapshotId == INVALID_SNAPSHOT) {
+                snapshotId = null;
+            }
+            splits = checkpoint.splits();
+        }
+
+        if (isContinuous) {
+            long currentSnapshot = snapshotId == null ? Snapshot.FIRST_SNAPSHOT_ID - 1 : snapshotId;
+            return new ContinuousFileSplitEnumerator(
+                    context,
+                    scan.withIncremental(true),
+                    splits,
+                    currentSnapshot,
+                    discoveryInterval);
+        } else {
+            Snapshot snapshot = snapshotId == null ? null : scan.snapshot(snapshotId);
+            return new StaticFileStoreSplitEnumerator(context, snapshot, splits);
         }
-        return new StaticFileStoreSplitEnumerator(context, snapshot, checkpoint.splits());
     }
 
     @Override
diff --git a/flink-table-store-connector/src/main/java/org/apache/flink/table/store/connector/source/StaticFileStoreSplitEnumerator.java b/flink-table-store-connector/src/main/java/org/apache/flink/table/store/connector/source/StaticFileStoreSplitEnumerator.java
index 4fc828b..c289a60 100644
--- a/flink-table-store-connector/src/main/java/org/apache/flink/table/store/connector/source/StaticFileStoreSplitEnumerator.java
+++ b/flink-table-store-connector/src/main/java/org/apache/flink/table/store/connector/source/StaticFileStoreSplitEnumerator.java
@@ -21,7 +21,6 @@ package org.apache.flink.table.store.connector.source;
 import org.apache.flink.api.connector.source.SplitEnumerator;
 import org.apache.flink.api.connector.source.SplitEnumeratorContext;
 import org.apache.flink.table.store.file.Snapshot;
-import org.apache.flink.table.store.file.operation.FileStoreScan;
 
 import javax.annotation.Nullable;
 
@@ -52,15 +51,6 @@ public class StaticFileStoreSplitEnumerator
         this.splits = new LinkedList<>(splits);
     }
 
-    public StaticFileStoreSplitEnumerator(
-            SplitEnumeratorContext<FileStoreSourceSplit> context, FileStoreScan scan) {
-        this.context = context;
-        FileStoreScan.Plan plan = scan.plan();
-        Long snapshotId = plan.snapshotId();
-        this.snapshot = snapshotId == null ? null : scan.snapshot(snapshotId);
-        this.splits = new LinkedList<>(new FileStoreSourceSplitGenerator().createSplits(plan));
-    }
-
     @Override
     public void start() {
         // no resources to start
diff --git a/flink-table-store-connector/src/test/java/org/apache/flink/table/store/connector/FileStoreITCase.java b/flink-table-store-connector/src/test/java/org/apache/flink/table/store/connector/FileStoreITCase.java
index 7c499d3..a0d5185 100644
--- a/flink-table-store-connector/src/test/java/org/apache/flink/table/store/connector/FileStoreITCase.java
+++ b/flink-table-store-connector/src/test/java/org/apache/flink/table/store/connector/FileStoreITCase.java
@@ -39,6 +39,7 @@ import org.apache.flink.table.types.logical.VarCharType;
 import org.apache.flink.table.types.utils.TypeConversions;
 import org.apache.flink.test.util.AbstractTestBase;
 import org.apache.flink.types.Row;
+import org.apache.flink.types.RowKind;
 import org.apache.flink.util.CloseableIterator;
 
 import org.junit.Assume;
@@ -55,6 +56,7 @@ import java.util.Collections;
 import java.util.HashMap;
 import java.util.List;
 import java.util.Map;
+import java.util.concurrent.ThreadLocalRandom;
 import java.util.stream.Stream;
 
 import static org.apache.flink.table.store.file.FileStoreOptions.BUCKET;
@@ -210,6 +212,78 @@ public class FileStoreITCase extends AbstractTestBase {
         assertThat(results).containsExactlyInAnyOrder(expected);
     }
 
+    @Test
+    public void testContinuous() throws Exception {
+        store.withPrimaryKeys(new int[] {2});
+        innerTestContinuous();
+    }
+
+    @Test
+    public void testContinuousWithoutPK() throws Exception {
+        store.withPrimaryKeys(new int[0]);
+        innerTestContinuous();
+    }
+
+    private void innerTestContinuous() throws Exception {
+        Assume.assumeFalse(isBatch);
+
+        CloseableIterator<RowData> iterator =
+                store.sourceBuilder().withContinuousMode(true).build(env).executeAndCollect();
+        Thread.sleep(ThreadLocalRandom.current().nextInt(1000));
+
+        sinkAndValidate(
+                Arrays.asList(
+                        srcRow(RowKind.INSERT, 1, "p1", 1), srcRow(RowKind.INSERT, 2, "p2", 2)),
+                iterator,
+                Row.ofKind(RowKind.INSERT, 1, "p1", 1),
+                Row.ofKind(RowKind.INSERT, 2, "p2", 2));
+
+        sinkAndValidate(
+                Arrays.asList(
+                        srcRow(RowKind.DELETE, 1, "p1", 1), srcRow(RowKind.INSERT, 3, "p3", 3)),
+                iterator,
+                Row.ofKind(RowKind.DELETE, 1, "p1", 1),
+                Row.ofKind(RowKind.INSERT, 3, "p3", 3));
+    }
+
+    private void sinkAndValidate(
+            List<RowData> src, CloseableIterator<RowData> iterator, Row... expected)
+            throws Exception {
+        if (isBatch) {
+            throw new UnsupportedOperationException();
+        }
+        DataStreamSource<RowData> source =
+                env.addSource(new FiniteTestSource<>(src, true), InternalTypeInfo.of(TABLE_TYPE));
+        store.sinkBuilder().withInput(source).build();
+        env.execute();
+        assertThat(collectFromUnbounded(iterator, expected.length))
+                .containsExactlyInAnyOrder(expected);
+    }
+
+    private static RowData srcRow(RowKind kind, int v, String p, int k) {
+        return wrap(GenericRowData.ofKind(kind, v, StringData.fromString(p), k));
+    }
+
+    private List<Row> collectFromUnbounded(CloseableIterator<RowData> iterator, int numElements) {
+        if (numElements == 0) {
+            return Collections.emptyList();
+        }
+
+        List<Row> result = new ArrayList<>();
+        while (iterator.hasNext()) {
+            result.add(CONVERTER.toExternal(iterator.next()));
+
+            if (result.size() == numElements) {
+                return result;
+            }
+        }
+
+        throw new IllegalArgumentException(
+                String.format(
+                        "The stream ended before reaching the requested %d records. Only %d records were received.",
+                        numElements, result.size()));
+    }
+
     public static StreamExecutionEnvironment buildStreamEnv() {
         final StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
         env.setRuntimeMode(RuntimeExecutionMode.STREAMING);
@@ -251,7 +325,8 @@ public class FileStoreITCase extends AbstractTestBase {
         return isBatch
                 ? env.fromCollection(SOURCE_DATA, InternalTypeInfo.of(TABLE_TYPE))
                 : env.addSource(
-                        new FiniteTestSource<>(SOURCE_DATA), InternalTypeInfo.of(TABLE_TYPE));
+                        new FiniteTestSource<>(SOURCE_DATA, false),
+                        InternalTypeInfo.of(TABLE_TYPE));
     }
 
     public static List<Row> executeAndCollect(DataStreamSource<RowData> source) throws Exception {
diff --git a/flink-table-store-connector/src/test/java/org/apache/flink/table/store/connector/FiniteTestSource.java b/flink-table-store-connector/src/test/java/org/apache/flink/table/store/connector/FiniteTestSource.java
index 0e60cea..c37643e 100644
--- a/flink-table-store-connector/src/test/java/org/apache/flink/table/store/connector/FiniteTestSource.java
+++ b/flink-table-store-connector/src/test/java/org/apache/flink/table/store/connector/FiniteTestSource.java
@@ -38,14 +38,14 @@ import java.util.List;
  *
  * <p>The reason this class is rewritten is to support {@link CheckpointedFunction}.
  */
-@SuppressWarnings("SynchronizationOnLocalVariableOrMethodParameter")
 public class FiniteTestSource<T>
         implements SourceFunction<T>, CheckpointedFunction, CheckpointListener {
 
     private static final long serialVersionUID = 1L;
 
-    @SuppressWarnings("NonSerializableFieldInSerializableClass")
-    private final Iterable<T> elements;
+    private final List<T> elements;
+
+    private final boolean emitOnce;
 
     private volatile boolean running = true;
 
@@ -55,8 +55,9 @@ public class FiniteTestSource<T>
 
     private volatile int numTimesEmitted;
 
-    public FiniteTestSource(Iterable<T> elements) {
+    public FiniteTestSource(List<T> elements, boolean emitOnce) {
         this.elements = elements;
+        this.emitOnce = emitOnce;
     }
 
     @Override
@@ -92,11 +93,11 @@ public class FiniteTestSource<T>
     public void run(SourceContext<T> ctx) throws Exception {
         switch (numTimesEmitted) {
             case 0:
-                emitElementsAndWaitForCheckpoints(ctx);
-                emitElementsAndWaitForCheckpoints(ctx);
+                emitElementsAndWaitForCheckpoints(ctx, false);
+                emitElementsAndWaitForCheckpoints(ctx, true);
                 break;
             case 1:
-                emitElementsAndWaitForCheckpoints(ctx);
+                emitElementsAndWaitForCheckpoints(ctx, true);
                 break;
             case 2:
                 // Maybe missed notifyCheckpointComplete, wait next notifyCheckpointComplete
@@ -111,15 +112,17 @@ public class FiniteTestSource<T>
         }
     }
 
-    private void emitElementsAndWaitForCheckpoints(SourceContext<T> ctx)
+    private void emitElementsAndWaitForCheckpoints(SourceContext<T> ctx, boolean isSecond)
             throws InterruptedException {
         final Object lock = ctx.getCheckpointLock();
 
         final int checkpointToAwait;
         synchronized (lock) {
             checkpointToAwait = numCheckpointsComplete + 2;
-            for (T t : elements) {
-                ctx.collect(t);
+            if (!isSecond || !emitOnce) {
+                for (T t : elements) {
+                    ctx.collect(t);
+                }
             }
             numTimesEmitted++;
         }
diff --git a/flink-table-store-connector/src/test/java/org/apache/flink/table/store/connector/ReadWriteTableITCase.java b/flink-table-store-connector/src/test/java/org/apache/flink/table/store/connector/ReadWriteTableITCase.java
index eff7b77..1caf5e9 100644
--- a/flink-table-store-connector/src/test/java/org/apache/flink/table/store/connector/ReadWriteTableITCase.java
+++ b/flink-table-store-connector/src/test/java/org/apache/flink/table/store/connector/ReadWriteTableITCase.java
@@ -146,19 +146,7 @@ public class ReadWriteTableITCase extends TableStoreTestBase {
                 });
         // TODO: streaming with change-tracking
 
-        // exception case
-        specs.add(
-                new Object[] {
-                    RuntimeExecutionMode.STREAMING,
-                    "table_" + UUID.randomUUID(),
-                    false, // enable change-tracking
-                    false, // has pk
-                    null, //  with duplicate
-                    new ExpectedResult()
-                            .success(false)
-                            .expectedType(UnsupportedOperationException.class)
-                            .expectedMessage("File store continuous mode is not supported yet.")
-                });
+        // TODO: streaming without log system
 
         // TODO: add overwrite case
 
diff --git a/flink-table-store-core/src/main/java/org/apache/flink/table/store/file/FileStoreOptions.java b/flink-table-store-core/src/main/java/org/apache/flink/table/store/file/FileStoreOptions.java
index 41b0490..aec50ed 100644
--- a/flink-table-store-core/src/main/java/org/apache/flink/table/store/file/FileStoreOptions.java
+++ b/flink-table-store-core/src/main/java/org/apache/flink/table/store/file/FileStoreOptions.java
@@ -97,6 +97,12 @@ public class FileStoreOptions implements Serializable {
                     .defaultValue(Duration.ofDays(1))
                     .withDescription("The maximum time of completed snapshots to retain.");
 
+    public static final ConfigOption<Duration> CONTINUOUS_DISCOVERY_INTERVAL =
+            ConfigOptions.key("continuous.discovery-interval")
+                    .durationType()
+                    .defaultValue(Duration.ofSeconds(1))
+                    .withDescription("The discovery interval of continuous reading.");
+
     private final Configuration options;
 
     public static Set<ConfigOption<?>> allOptions() {
diff --git a/flink-table-store-core/src/main/java/org/apache/flink/table/store/file/operation/FileStoreScan.java b/flink-table-store-core/src/main/java/org/apache/flink/table/store/file/operation/FileStoreScan.java
index 06dad34..3718dd2 100644
--- a/flink-table-store-core/src/main/java/org/apache/flink/table/store/file/operation/FileStoreScan.java
+++ b/flink-table-store-core/src/main/java/org/apache/flink/table/store/file/operation/FileStoreScan.java
@@ -38,6 +38,8 @@ import static org.apache.flink.util.Preconditions.checkArgument;
 /** Scan operation which produces a plan. */
 public interface FileStoreScan {
 
+    boolean snapshotExists(long snapshotId);
+
     Snapshot snapshot(long snapshotId);
 
     FileStoreScan withPartitionFilter(Predicate predicate);
@@ -54,6 +56,8 @@ public interface FileStoreScan {
 
     FileStoreScan withManifestList(List<ManifestFileMeta> manifests);
 
+    FileStoreScan withIncremental(boolean isIncremental);
+
     /** Produce a {@link Plan}. */
     Plan plan();
 
diff --git a/flink-table-store-core/src/main/java/org/apache/flink/table/store/file/operation/FileStoreScanImpl.java b/flink-table-store-core/src/main/java/org/apache/flink/table/store/file/operation/FileStoreScanImpl.java
index ab1dd15..bccb0f2 100644
--- a/flink-table-store-core/src/main/java/org/apache/flink/table/store/file/operation/FileStoreScanImpl.java
+++ b/flink-table-store-core/src/main/java/org/apache/flink/table/store/file/operation/FileStoreScanImpl.java
@@ -18,6 +18,7 @@
 
 package org.apache.flink.table.store.file.operation;
 
+import org.apache.flink.core.fs.Path;
 import org.apache.flink.table.data.binary.BinaryRowData;
 import org.apache.flink.table.store.file.Snapshot;
 import org.apache.flink.table.store.file.manifest.ManifestEntry;
@@ -37,6 +38,8 @@ import org.apache.flink.util.Preconditions;
 
 import javax.annotation.Nullable;
 
+import java.io.IOException;
+import java.io.UncheckedIOException;
 import java.util.ArrayList;
 import java.util.Collections;
 import java.util.HashMap;
@@ -62,6 +65,7 @@ public class FileStoreScanImpl implements FileStoreScan {
     private Long specifiedSnapshotId = null;
     private Integer specifiedBucket = null;
     private List<ManifestFileMeta> specifiedManifests = null;
+    private boolean isIncremental = false;
 
     public FileStoreScanImpl(
             RowType partitionType,
@@ -75,6 +79,16 @@ public class FileStoreScanImpl implements FileStoreScan {
     }
 
     @Override
+    public boolean snapshotExists(long snapshotId) {
+        Path path = pathFactory.toSnapshotPath(snapshotId);
+        try {
+            return path.getFileSystem().exists(path);
+        } catch (IOException e) {
+            throw new UncheckedIOException(e);
+        }
+    }
+
+    @Override
     public Snapshot snapshot(long snapshotId) {
         return Snapshot.fromPath(pathFactory.toSnapshotPath(snapshotId));
     }
@@ -149,6 +163,12 @@ public class FileStoreScanImpl implements FileStoreScan {
     }
 
     @Override
+    public FileStoreScan withIncremental(boolean isIncremental) {
+        this.isIncremental = isIncremental;
+        return this;
+    }
+
+    @Override
     public Plan plan() {
         List<ManifestFileMeta> manifests = specifiedManifests;
         Long snapshotId = specifiedSnapshotId;
@@ -159,7 +179,11 @@ public class FileStoreScanImpl implements FileStoreScan {
             if (snapshotId == null) {
                 manifests = Collections.emptyList();
             } else {
-                manifests = snapshot(snapshotId).readAllManifests(manifestList);
+                Snapshot snapshot = snapshot(snapshotId);
+                manifests =
+                        isIncremental
+                                ? manifestList.read(snapshot.deltaManifestList())
+                                : snapshot.readAllManifests(manifestList);
             }
         }