You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by lz...@apache.org on 2022/12/08 11:51:44 UTC

[flink-table-store] branch master updated: [FLINK-30293] Introduce StaticDataFileSnapshotEnumerator and change read.compacted as a scan mode

This is an automated email from the ASF dual-hosted git repository.

lzljs3620320 pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/flink-table-store.git


The following commit(s) were added to refs/heads/master by this push:
     new e7690d7c [FLINK-30293] Introduce StaticDataFileSnapshotEnumerator and change read.compacted as a scan mode
e7690d7c is described below

commit e7690d7c40c9a4371feafc170a147c5b03f70358
Author: tsreaper <ts...@gmail.com>
AuthorDate: Thu Dec 8 19:51:39 2022 +0800

    [FLINK-30293] Introduce StaticDataFileSnapshotEnumerator and change read.compacted as a scan mode
    
    This closes #425
---
 .../shortcodes/generated/core_configuration.html   |   8 +-
 .../source/ContinuousFileStoreSource.java          |   4 +-
 .../connector/source/StaticFileStoreSource.java    |  22 +-
 .../store/connector/BatchFileStoreITCase.java      |   4 +-
 .../connector/FullCompactionFileStoreITCase.java   |   6 +-
 .../source/StaticFileStoreSplitEnumeratorTest.java | 108 ---------
 .../org/apache/flink/table/store/CoreOptions.java  |  23 +-
 .../table/store/file/AppendOnlyFileStore.java      |   3 +-
 .../flink/table/store/file/KeyValueFileStore.java  |   3 +-
 .../file/operation/AbstractFileStoreScan.java      |  10 +-
 .../file/operation/AppendOnlyFileStoreScan.java    |   6 +-
 .../file/operation/KeyValueFileStoreScan.java      |   6 +-
 .../store/table/source/TableStreamingReader.java   |  13 +-
 .../source/snapshot/CompactedStartingScanner.java  |  45 ++++
 ...ava => CompactionChangelogFollowUpScanner.java} |  32 +--
 .../ContinuousDataFileSnapshotEnumerator.java      | 147 ++++++++++++
 .../ContinuousFromTimestampStartingScanner.java    |  54 +++++
 .../snapshot/ContinuousLatestStartingScanner.java  |  47 ++++
 .../snapshot/DataFileSnapshotEnumerator.java       | 159 -------------
 ...otEnumerator.java => DeltaFollowUpScanner.java} |  27 +--
 .../table/source/snapshot/FollowUpScanner.java     |  30 +++
 .../table/source/snapshot/FullStartingScanner.java |  45 ++++
 ...tor.java => InputChangelogFollowUpScanner.java} |  31 +--
 .../table/source/snapshot/StartingScanner.java     |  28 +++
 .../snapshot/StaticDataFileSnapshotEnumerator.java |  77 +++++++
 .../table/ChangelogWithKeyFileStoreTableTest.java  |  12 +-
 .../table/store/table/FileStoreTableTestBase.java  |  56 -----
 .../snapshot/CompactedStartingScannerTest.java     |  99 ++++++++
 .../CompactionChangelogFollowUpScannerTest.java    | 111 +++++++++
 ... ContinuousDataFileSnapshotEnumeratorTest.java} | 129 +++--------
 ...ContinuousFromTimestampStartingScannerTest.java | 107 +++++++++
 .../ContinuousLatestStartingScannerTest.java       |  70 ++++++
 .../source/snapshot/DeltaFollowUpScannerTest.java  |  87 +++++++
 ...lCompactionChangelogSnapshotEnumeratorTest.java | 256 ---------------------
 .../source/snapshot/FullStartingScannerTest.java   |  73 ++++++
 .../InputChangelogFollowUpScannerTest.java         |  97 ++++++++
 .../InputChangelogSnapshotEnumeratorTest.java      | 239 -------------------
 ...stBase.java => SnapshotEnumeratorTestBase.java} |  11 +-
 .../StaticDataFileSnapshotEnumeratorTest.java      |  72 ++++++
 39 files changed, 1313 insertions(+), 1044 deletions(-)

diff --git a/docs/layouts/shortcodes/generated/core_configuration.html b/docs/layouts/shortcodes/generated/core_configuration.html
index 14c721f3..0d7d205b 100644
--- a/docs/layouts/shortcodes/generated/core_configuration.html
+++ b/docs/layouts/shortcodes/generated/core_configuration.html
@@ -188,17 +188,11 @@
             <td>String</td>
             <td>The default partition name in case the dynamic partition column value is null/empty string.</td>
         </tr>
-        <tr>
-            <td><h5>read.compacted</h5></td>
-            <td style="word-wrap: break-word;">false</td>
-            <td>Boolean</td>
-            <td>Whether to read compacted snapshot only.</td>
-        </tr>
         <tr>
             <td><h5>scan.mode</h5></td>
             <td style="word-wrap: break-word;">deafult</td>
             <td><p>Enum</p></td>
-            <td>Specify the scanning behavior of the source.<br /><br />Possible values:<ul><li>"deafult": Determines actual startup mode according to other table properties. If "scan.timestamp-millis" is set the actual startup mode will be "from-timestamp". Otherwise the actual startup mode will be "full".</li><li>"full": For streaming sources, produces a snapshot on the table upon first startup, and continue to read the latest changes. For batch sources, just produce a snapshot but doe [...]
+            <td>Specify the scanning behavior of the source.<br /><br />Possible values:<ul><li>"deafult": Determines actual startup mode according to other table properties. If "scan.timestamp-millis" is set the actual startup mode will be "from-timestamp". Otherwise the actual startup mode will be "full".</li><li>"full": For streaming sources, produces the latest snapshot on the table upon first startup, and continue to read the latest changes. For batch sources, just produce the lates [...]
         </tr>
         <tr>
             <td><h5>scan.timestamp-millis</h5></td>
diff --git a/flink-table-store-connector/src/main/java/org/apache/flink/table/store/connector/source/ContinuousFileStoreSource.java b/flink-table-store-connector/src/main/java/org/apache/flink/table/store/connector/source/ContinuousFileStoreSource.java
index 581d746c..bf9dd90d 100644
--- a/flink-table-store-connector/src/main/java/org/apache/flink/table/store/connector/source/ContinuousFileStoreSource.java
+++ b/flink-table-store-connector/src/main/java/org/apache/flink/table/store/connector/source/ContinuousFileStoreSource.java
@@ -24,7 +24,7 @@ import org.apache.flink.api.connector.source.SplitEnumeratorContext;
 import org.apache.flink.table.store.file.predicate.Predicate;
 import org.apache.flink.table.store.table.FileStoreTable;
 import org.apache.flink.table.store.table.source.DataTableScan;
-import org.apache.flink.table.store.table.source.snapshot.DataFileSnapshotEnumerator;
+import org.apache.flink.table.store.table.source.snapshot.ContinuousDataFileSnapshotEnumerator;
 
 import javax.annotation.Nullable;
 
@@ -76,6 +76,6 @@ public class ContinuousFileStoreSource extends FlinkSource {
                 splits,
                 nextSnapshotId,
                 discoveryInterval,
-                DataFileSnapshotEnumerator.create(table, scan, nextSnapshotId));
+                ContinuousDataFileSnapshotEnumerator.create(table, scan, nextSnapshotId));
     }
 }
diff --git a/flink-table-store-connector/src/main/java/org/apache/flink/table/store/connector/source/StaticFileStoreSource.java b/flink-table-store-connector/src/main/java/org/apache/flink/table/store/connector/source/StaticFileStoreSource.java
index 2223bbad..e3f67955 100644
--- a/flink-table-store-connector/src/main/java/org/apache/flink/table/store/connector/source/StaticFileStoreSource.java
+++ b/flink-table-store-connector/src/main/java/org/apache/flink/table/store/connector/source/StaticFileStoreSource.java
@@ -26,9 +26,12 @@ import org.apache.flink.table.store.file.predicate.Predicate;
 import org.apache.flink.table.store.file.utils.SnapshotManager;
 import org.apache.flink.table.store.table.FileStoreTable;
 import org.apache.flink.table.store.table.source.DataTableScan;
+import org.apache.flink.table.store.table.source.snapshot.SnapshotEnumerator;
+import org.apache.flink.table.store.table.source.snapshot.StaticDataFileSnapshotEnumerator;
 
 import javax.annotation.Nullable;
 
+import java.util.ArrayList;
 import java.util.Collection;
 
 /** Bounded {@link FlinkSource} for reading records. It does not monitor new snapshots. */
@@ -62,12 +65,23 @@ public class StaticFileStoreSource extends FlinkSource {
             scan.withFilter(predicate);
         }
 
-        Long snapshotId;
+        Long snapshotId = null;
         Collection<FileStoreSourceSplit> splits;
         if (checkpoint == null) {
-            DataTableScan.DataFilePlan plan = scan.plan();
-            snapshotId = plan.snapshotId;
-            splits = new FileStoreSourceSplitGenerator().createSplits(plan);
+            splits = new ArrayList<>();
+            FileStoreSourceSplitGenerator splitGenerator = new FileStoreSourceSplitGenerator();
+
+            // read all splits from the enumerator in one go
+            SnapshotEnumerator snapshotEnumerator =
+                    StaticDataFileSnapshotEnumerator.create(table, table.newScan());
+            while (true) {
+                DataTableScan.DataFilePlan plan = snapshotEnumerator.enumerate();
+                if (plan == null) {
+                    break;
+                }
+                snapshotId = plan.snapshotId;
+                splits.addAll(splitGenerator.createSplits(plan));
+            }
         } else {
             // restore from checkpoint
             snapshotId = checkpoint.currentSnapshotId();
diff --git a/flink-table-store-connector/src/test/java/org/apache/flink/table/store/connector/BatchFileStoreITCase.java b/flink-table-store-connector/src/test/java/org/apache/flink/table/store/connector/BatchFileStoreITCase.java
index 6355859f..adb21bb9 100644
--- a/flink-table-store-connector/src/test/java/org/apache/flink/table/store/connector/BatchFileStoreITCase.java
+++ b/flink-table-store-connector/src/test/java/org/apache/flink/table/store/connector/BatchFileStoreITCase.java
@@ -45,8 +45,8 @@ public class BatchFileStoreITCase extends FileStoreTableITCase {
     }
 
     @Test
-    public void testReadCompactedEmpty() {
+    public void testCompactedScanModeEmpty() {
         batchSql("INSERT INTO T VALUES (1, 11, 111), (2, 22, 222)");
-        assertThat(batchSql("SELECT * FROM T /*+ OPTIONS('read.compacted'='true') */")).isEmpty();
+        assertThat(batchSql("SELECT * FROM T /*+ OPTIONS('scan.mode'='compacted') */")).isEmpty();
     }
 }
diff --git a/flink-table-store-connector/src/test/java/org/apache/flink/table/store/connector/FullCompactionFileStoreITCase.java b/flink-table-store-connector/src/test/java/org/apache/flink/table/store/connector/FullCompactionFileStoreITCase.java
index b17f43c3..81d7f3bc 100644
--- a/flink-table-store-connector/src/test/java/org/apache/flink/table/store/connector/FullCompactionFileStoreITCase.java
+++ b/flink-table-store-connector/src/test/java/org/apache/flink/table/store/connector/FullCompactionFileStoreITCase.java
@@ -55,11 +55,11 @@ public class FullCompactionFileStoreITCase extends FileStoreTableITCase {
     }
 
     @Test
-    public void testReadCompacted() throws Exception {
+    public void testCompactedScanMode() throws Exception {
         BlockingIterator<Row, Row> iterator =
                 BlockingIterator.of(
                         streamSqlIter(
-                                "SELECT * FROM %s /*+ OPTIONS('read.compacted'='true') */", table));
+                                "SELECT * FROM %s /*+ OPTIONS('scan.mode'='compacted') */", table));
 
         batchSql("INSERT INTO %s VALUES ('1', '2', '3'), ('4', '5', '6')", table);
         assertThat(iterator.collect(2))
@@ -68,7 +68,7 @@ public class FullCompactionFileStoreITCase extends FileStoreTableITCase {
         batchSql("INSERT INTO %s VALUES ('7', '8', '9')", table);
         assertThat(iterator.collect(1)).containsExactlyInAnyOrder(Row.of("7", "8", "9"));
 
-        assertThat(batchSql("SELECT * FROM T /*+ OPTIONS('read.compacted'='true') */"))
+        assertThat(batchSql("SELECT * FROM T /*+ OPTIONS('scan.mode'='compacted') */"))
                 .containsExactlyInAnyOrder(
                         Row.of("1", "2", "3"), Row.of("4", "5", "6"), Row.of("7", "8", "9"));
     }
diff --git a/flink-table-store-connector/src/test/java/org/apache/flink/table/store/connector/source/StaticFileStoreSplitEnumeratorTest.java b/flink-table-store-connector/src/test/java/org/apache/flink/table/store/connector/source/StaticFileStoreSplitEnumeratorTest.java
deleted file mode 100644
index 8b28c155..00000000
--- a/flink-table-store-connector/src/test/java/org/apache/flink/table/store/connector/source/StaticFileStoreSplitEnumeratorTest.java
+++ /dev/null
@@ -1,108 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.table.store.connector.source;
-
-import org.apache.flink.api.connector.source.SplitEnumeratorContext;
-import org.apache.flink.connector.testutils.source.reader.TestingSplitEnumeratorContext;
-
-import org.junit.jupiter.api.Test;
-
-import java.util.Arrays;
-
-import static org.apache.flink.table.store.connector.source.FileStoreSourceSplitSerializerTest.newFile;
-import static org.apache.flink.table.store.connector.source.FileStoreSourceSplitSerializerTest.newSourceSplit;
-import static org.apache.flink.table.store.file.mergetree.compact.MergeTreeCompactManagerTest.row;
-import static org.assertj.core.api.Assertions.assertThat;
-
-/** Unit tests for the {@link StaticFileStoreSplitEnumerator}. */
-public class StaticFileStoreSplitEnumeratorTest {
-
-    @Test
-    public void testCheckpointNoSplitRequested() {
-        final TestingSplitEnumeratorContext<FileStoreSourceSplit> context =
-                new TestingSplitEnumeratorContext<>(4);
-        final FileStoreSourceSplit split = createRandomSplit();
-        final StaticFileStoreSplitEnumerator enumerator = createEnumerator(context, split);
-
-        final PendingSplitsCheckpoint checkpoint = enumerator.snapshotState(1L);
-
-        assertThat(checkpoint.splits()).contains(split);
-    }
-
-    @Test
-    public void testSplitRequestForRegisteredReader() {
-        final TestingSplitEnumeratorContext<FileStoreSourceSplit> context =
-                new TestingSplitEnumeratorContext<>(4);
-        final FileStoreSourceSplit split = createRandomSplit();
-        final StaticFileStoreSplitEnumerator enumerator = createEnumerator(context, split);
-
-        context.registerReader(3, "somehost");
-        enumerator.addReader(3);
-        enumerator.handleSplitRequest(3, "somehost");
-
-        assertThat(enumerator.snapshotState(1L).splits()).isEmpty();
-        assertThat(context.getSplitAssignments().get(3).getAssignedSplits()).contains(split);
-    }
-
-    @Test
-    public void testSplitRequestForNonRegisteredReader() {
-        final TestingSplitEnumeratorContext<FileStoreSourceSplit> context =
-                new TestingSplitEnumeratorContext<>(4);
-        final FileStoreSourceSplit split = createRandomSplit();
-        final StaticFileStoreSplitEnumerator enumerator = createEnumerator(context, split);
-
-        enumerator.handleSplitRequest(3, "somehost");
-
-        assertThat(context.getSplitAssignments().containsKey(3)).isFalse();
-        assertThat(enumerator.snapshotState(1L).splits()).contains(split);
-    }
-
-    @Test
-    public void testNoMoreSplits() {
-        final TestingSplitEnumeratorContext<FileStoreSourceSplit> context =
-                new TestingSplitEnumeratorContext<>(4);
-        final FileStoreSourceSplit split = createRandomSplit();
-        final StaticFileStoreSplitEnumerator enumerator = createEnumerator(context, split);
-
-        // first split assignment
-        context.registerReader(1, "somehost");
-        enumerator.addReader(1);
-        enumerator.handleSplitRequest(1, "somehost");
-
-        // second request has no more split
-        enumerator.handleSplitRequest(1, "somehost");
-
-        assertThat(context.getSplitAssignments().get(1).getAssignedSplits()).contains(split);
-        assertThat(context.getSplitAssignments().get(1).hasReceivedNoMoreSplitsSignal()).isTrue();
-    }
-
-    // ------------------------------------------------------------------------
-    //  test setup helpers
-    // ------------------------------------------------------------------------
-
-    private static FileStoreSourceSplit createRandomSplit() {
-        return newSourceSplit("split", row(1), 2, Arrays.asList(newFile(0), newFile(1)));
-    }
-
-    private static StaticFileStoreSplitEnumerator createEnumerator(
-            final SplitEnumeratorContext<FileStoreSourceSplit> context,
-            final FileStoreSourceSplit... splits) {
-        return new StaticFileStoreSplitEnumerator(context, null, Arrays.asList(splits));
-    }
-}
diff --git a/flink-table-store-core/src/main/java/org/apache/flink/table/store/CoreOptions.java b/flink-table-store-core/src/main/java/org/apache/flink/table/store/CoreOptions.java
index 7f43b0bb..744e7dee 100644
--- a/flink-table-store-core/src/main/java/org/apache/flink/table/store/CoreOptions.java
+++ b/flink-table-store-core/src/main/java/org/apache/flink/table/store/CoreOptions.java
@@ -173,12 +173,6 @@ public class CoreOptions implements Serializable {
                     .defaultValue(false)
                     .withDescription("Whether to skip compaction on write.");
 
-    public static final ConfigOption<Boolean> READ_COMPACTED =
-            ConfigOptions.key("read.compacted")
-                    .booleanType()
-                    .defaultValue(false)
-                    .withDescription("Whether to read compacted snapshot only.");
-
     public static final ConfigOption<MemorySize> SOURCE_SPLIT_TARGET_SIZE =
             ConfigOptions.key("source.split.target-size")
                     .memoryType()
@@ -569,10 +563,6 @@ public class CoreOptions implements Serializable {
         return options.get(WRITE_COMPACTION_SKIP);
     }
 
-    public boolean readCompacted() {
-        return options.get(READ_COMPACTED);
-    }
-
     /** Specifies the merge engine for table with primary key. */
     public enum MergeEngine implements DescribedEnum {
         DEDUPLICATE("deduplicate", "De-duplicate and keep the last row."),
@@ -610,9 +600,9 @@ public class CoreOptions implements Serializable {
 
         FULL(
                 "full",
-                "For streaming sources, produces a snapshot on the table upon first startup,"
-                        + " and continue to read the latest changes. "
-                        + "For batch sources, just produce a snapshot but does not read new changes."),
+                "For streaming sources, produces the latest snapshot on the table upon first startup, "
+                        + "and continue to read the latest changes. "
+                        + "For batch sources, just produce the latest snapshot but does not read new changes."),
 
         LATEST(
                 "latest",
@@ -620,6 +610,13 @@ public class CoreOptions implements Serializable {
                         + "without producing a snapshot at the beginning. "
                         + "For batch sources, behaves the same as the \"full\" startup mode."),
 
+        COMPACTED(
+                "compacted",
+                "For streaming sources, produces a snapshot after the latest compaction on the table "
+                        + "upon first startup, and continue to read the latest changes. "
+                        + "For batch sources, just produce a snapshot after the latest compaction "
+                        + "but does not read new changes."),
+
         FROM_TIMESTAMP(
                 "from-timestamp",
                 "For streaming sources, continuously reads changes "
diff --git a/flink-table-store-core/src/main/java/org/apache/flink/table/store/file/AppendOnlyFileStore.java b/flink-table-store-core/src/main/java/org/apache/flink/table/store/file/AppendOnlyFileStore.java
index d9359d4e..0d889213 100644
--- a/flink-table-store-core/src/main/java/org/apache/flink/table/store/file/AppendOnlyFileStore.java
+++ b/flink-table-store-core/src/main/java/org/apache/flink/table/store/file/AppendOnlyFileStore.java
@@ -81,8 +81,7 @@ public class AppendOnlyFileStore extends AbstractFileStore<RowData> {
                 manifestFileFactory(),
                 manifestListFactory(),
                 options.bucket(),
-                checkNumOfBuckets,
-                options.readCompacted());
+                checkNumOfBuckets);
     }
 
     @Override
diff --git a/flink-table-store-core/src/main/java/org/apache/flink/table/store/file/KeyValueFileStore.java b/flink-table-store-core/src/main/java/org/apache/flink/table/store/file/KeyValueFileStore.java
index 802c1944..780eabb4 100644
--- a/flink-table-store-core/src/main/java/org/apache/flink/table/store/file/KeyValueFileStore.java
+++ b/flink-table-store-core/src/main/java/org/apache/flink/table/store/file/KeyValueFileStore.java
@@ -111,8 +111,7 @@ public class KeyValueFileStore extends AbstractFileStore<KeyValue> {
                 manifestFileFactory(),
                 manifestListFactory(),
                 options.bucket(),
-                checkNumOfBuckets,
-                options.readCompacted());
+                checkNumOfBuckets);
     }
 
     @Override
diff --git a/flink-table-store-core/src/main/java/org/apache/flink/table/store/file/operation/AbstractFileStoreScan.java b/flink-table-store-core/src/main/java/org/apache/flink/table/store/file/operation/AbstractFileStoreScan.java
index e7e563e0..3aafa8d8 100644
--- a/flink-table-store-core/src/main/java/org/apache/flink/table/store/file/operation/AbstractFileStoreScan.java
+++ b/flink-table-store-core/src/main/java/org/apache/flink/table/store/file/operation/AbstractFileStoreScan.java
@@ -60,7 +60,6 @@ public abstract class AbstractFileStoreScan implements FileStoreScan {
     private final ManifestList manifestList;
     private final int numOfBuckets;
     private final boolean checkNumOfBuckets;
-    private final boolean readCompacted;
 
     private final ConcurrentMap<Long, TableSchema> tableSchemas;
     private final SchemaManager schemaManager;
@@ -84,8 +83,7 @@ public abstract class AbstractFileStoreScan implements FileStoreScan {
             ManifestFile.Factory manifestFileFactory,
             ManifestList.Factory manifestListFactory,
             int numOfBuckets,
-            boolean checkNumOfBuckets,
-            boolean readCompacted) {
+            boolean checkNumOfBuckets) {
         this.partitionStatsConverter = new FieldStatsArraySerializer(partitionType);
         this.partitionConverter = new RowDataToObjectArrayConverter(partitionType);
         Preconditions.checkArgument(
@@ -98,7 +96,6 @@ public abstract class AbstractFileStoreScan implements FileStoreScan {
         this.manifestList = manifestListFactory.create();
         this.numOfBuckets = numOfBuckets;
         this.checkNumOfBuckets = checkNumOfBuckets;
-        this.readCompacted = readCompacted;
         this.tableSchemas = new ConcurrentHashMap<>();
     }
 
@@ -180,10 +177,7 @@ public abstract class AbstractFileStoreScan implements FileStoreScan {
         Long snapshotId = specifiedSnapshotId;
         if (manifests == null) {
             if (snapshotId == null) {
-                snapshotId =
-                        readCompacted
-                                ? snapshotManager.latestCompactedSnapshotId()
-                                : snapshotManager.latestSnapshotId();
+                snapshotId = snapshotManager.latestSnapshotId();
             }
             if (snapshotId == null) {
                 manifests = Collections.emptyList();
diff --git a/flink-table-store-core/src/main/java/org/apache/flink/table/store/file/operation/AppendOnlyFileStoreScan.java b/flink-table-store-core/src/main/java/org/apache/flink/table/store/file/operation/AppendOnlyFileStoreScan.java
index 86bcca3b..ac65b0ab 100644
--- a/flink-table-store-core/src/main/java/org/apache/flink/table/store/file/operation/AppendOnlyFileStoreScan.java
+++ b/flink-table-store-core/src/main/java/org/apache/flink/table/store/file/operation/AppendOnlyFileStoreScan.java
@@ -55,8 +55,7 @@ public class AppendOnlyFileStoreScan extends AbstractFileStoreScan {
             ManifestFile.Factory manifestFileFactory,
             ManifestList.Factory manifestListFactory,
             int numOfBuckets,
-            boolean checkNumOfBuckets,
-            boolean readCompacted) {
+            boolean checkNumOfBuckets) {
         super(
                 partitionType,
                 bucketKeyType,
@@ -66,8 +65,7 @@ public class AppendOnlyFileStoreScan extends AbstractFileStoreScan {
                 manifestFileFactory,
                 manifestListFactory,
                 numOfBuckets,
-                checkNumOfBuckets,
-                readCompacted);
+                checkNumOfBuckets);
         this.schemaRowStatsConverters = new ConcurrentHashMap<>();
         this.rowType = rowType;
     }
diff --git a/flink-table-store-core/src/main/java/org/apache/flink/table/store/file/operation/KeyValueFileStoreScan.java b/flink-table-store-core/src/main/java/org/apache/flink/table/store/file/operation/KeyValueFileStoreScan.java
index 68e44f0a..25391021 100644
--- a/flink-table-store-core/src/main/java/org/apache/flink/table/store/file/operation/KeyValueFileStoreScan.java
+++ b/flink-table-store-core/src/main/java/org/apache/flink/table/store/file/operation/KeyValueFileStoreScan.java
@@ -60,8 +60,7 @@ public class KeyValueFileStoreScan extends AbstractFileStoreScan {
             ManifestFile.Factory manifestFileFactory,
             ManifestList.Factory manifestListFactory,
             int numOfBuckets,
-            boolean checkNumOfBuckets,
-            boolean readCompacted) {
+            boolean checkNumOfBuckets) {
         super(
                 partitionType,
                 bucketKeyType,
@@ -71,8 +70,7 @@ public class KeyValueFileStoreScan extends AbstractFileStoreScan {
                 manifestFileFactory,
                 manifestListFactory,
                 numOfBuckets,
-                checkNumOfBuckets,
-                readCompacted);
+                checkNumOfBuckets);
         this.keyValueFieldsExtractor = keyValueFieldsExtractor;
         this.schemaKeyStatsConverters = new ConcurrentHashMap<>();
         this.keyType = keyType;
diff --git a/flink-table-store-core/src/main/java/org/apache/flink/table/store/table/source/TableStreamingReader.java b/flink-table-store-core/src/main/java/org/apache/flink/table/store/table/source/TableStreamingReader.java
index b875d220..0a830780 100644
--- a/flink-table-store-core/src/main/java/org/apache/flink/table/store/table/source/TableStreamingReader.java
+++ b/flink-table-store-core/src/main/java/org/apache/flink/table/store/table/source/TableStreamingReader.java
@@ -19,13 +19,14 @@
 package org.apache.flink.table.store.table.source;
 
 import org.apache.flink.table.data.RowData;
-import org.apache.flink.table.store.CoreOptions;
 import org.apache.flink.table.store.file.mergetree.compact.ConcatRecordReader;
 import org.apache.flink.table.store.file.predicate.Predicate;
 import org.apache.flink.table.store.file.predicate.PredicateFilter;
 import org.apache.flink.table.store.file.utils.RecordReaderIterator;
 import org.apache.flink.table.store.table.FileStoreTable;
-import org.apache.flink.table.store.table.source.snapshot.DeltaSnapshotEnumerator;
+import org.apache.flink.table.store.table.source.snapshot.ContinuousDataFileSnapshotEnumerator;
+import org.apache.flink.table.store.table.source.snapshot.DeltaFollowUpScanner;
+import org.apache.flink.table.store.table.source.snapshot.FullStartingScanner;
 import org.apache.flink.table.store.table.source.snapshot.SnapshotEnumerator;
 import org.apache.flink.table.store.utils.TypeUtils;
 
@@ -88,8 +89,12 @@ public class TableStreamingReader {
             scan.withFilter(predicate);
         }
         enumerator =
-                new DeltaSnapshotEnumerator(
-                        table.location(), scan, CoreOptions.StartupMode.FULL, null, null);
+                new ContinuousDataFileSnapshotEnumerator(
+                        table.location(),
+                        scan,
+                        new FullStartingScanner(),
+                        new DeltaFollowUpScanner(),
+                        null);
     }
 
     @Nullable
diff --git a/flink-table-store-core/src/main/java/org/apache/flink/table/store/table/source/snapshot/CompactedStartingScanner.java b/flink-table-store-core/src/main/java/org/apache/flink/table/store/table/source/snapshot/CompactedStartingScanner.java
new file mode 100644
index 00000000..1eea117c
--- /dev/null
+++ b/flink-table-store-core/src/main/java/org/apache/flink/table/store/table/source/snapshot/CompactedStartingScanner.java
@@ -0,0 +1,45 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.store.table.source.snapshot;
+
+import org.apache.flink.table.store.file.operation.ScanKind;
+import org.apache.flink.table.store.file.utils.SnapshotManager;
+import org.apache.flink.table.store.table.source.DataTableScan;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+/**
+ * {@link StartingScanner} for the {@link
+ * org.apache.flink.table.store.CoreOptions.StartupMode#COMPACTED} startup mode.
+ */
+public class CompactedStartingScanner implements StartingScanner {
+
+    private static final Logger LOG = LoggerFactory.getLogger(CompactedStartingScanner.class);
+
+    @Override
+    public DataTableScan.DataFilePlan getPlan(SnapshotManager snapshotManager, DataTableScan scan) {
+        Long startingSnapshotId = snapshotManager.latestCompactedSnapshotId();
+        if (startingSnapshotId == null) {
+            LOG.debug("There is currently no compact snapshot. Waiting for snapshot generation.");
+            return null;
+        }
+        return scan.withKind(ScanKind.ALL).withSnapshot(startingSnapshotId).plan();
+    }
+}
diff --git a/flink-table-store-core/src/main/java/org/apache/flink/table/store/table/source/snapshot/FullCompactionChangelogSnapshotEnumerator.java b/flink-table-store-core/src/main/java/org/apache/flink/table/store/table/source/snapshot/CompactionChangelogFollowUpScanner.java
similarity index 54%
rename from flink-table-store-core/src/main/java/org/apache/flink/table/store/table/source/snapshot/FullCompactionChangelogSnapshotEnumerator.java
rename to flink-table-store-core/src/main/java/org/apache/flink/table/store/table/source/snapshot/CompactionChangelogFollowUpScanner.java
index ed9e5f16..153bb59b 100644
--- a/flink-table-store-core/src/main/java/org/apache/flink/table/store/table/source/snapshot/FullCompactionChangelogSnapshotEnumerator.java
+++ b/flink-table-store-core/src/main/java/org/apache/flink/table/store/table/source/snapshot/CompactionChangelogFollowUpScanner.java
@@ -18,8 +18,6 @@
 
 package org.apache.flink.table.store.table.source.snapshot;
 
-import org.apache.flink.core.fs.Path;
-import org.apache.flink.table.store.CoreOptions;
 import org.apache.flink.table.store.file.Snapshot;
 import org.apache.flink.table.store.file.operation.ScanKind;
 import org.apache.flink.table.store.table.source.DataTableScan;
@@ -27,33 +25,17 @@ import org.apache.flink.table.store.table.source.DataTableScan;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
-import javax.annotation.Nullable;
-
 /**
- * A {@link DataFileSnapshotEnumerator} which scans incremental changes in {@link
- * Snapshot#changelogManifestList()} for each newly created snapshots.
- *
- * <p>This enumerator looks for changelog files produced from full compaction. Can only be used when
- * {@link CoreOptions#CHANGELOG_PRODUCER} is set to {@link
- * CoreOptions.ChangelogProducer#FULL_COMPACTION}.
+ * {@link FollowUpScanner} for {@link
+ * org.apache.flink.table.store.CoreOptions.ChangelogProducer#FULL_COMPACTION} changelog producer.
  */
-public class FullCompactionChangelogSnapshotEnumerator extends DataFileSnapshotEnumerator {
+public class CompactionChangelogFollowUpScanner implements FollowUpScanner {
 
     private static final Logger LOG =
-            LoggerFactory.getLogger(FullCompactionChangelogSnapshotEnumerator.class);
-
-    public FullCompactionChangelogSnapshotEnumerator(
-            Path tablePath,
-            DataTableScan scan,
-            int maxLevel,
-            CoreOptions.StartupMode startupMode,
-            @Nullable Long startupMillis,
-            @Nullable Long nextSnapshotId) {
-        super(tablePath, scan.withLevel(maxLevel), startupMode, startupMillis, nextSnapshotId);
-    }
+            LoggerFactory.getLogger(CompactionChangelogFollowUpScanner.class);
 
     @Override
-    protected boolean shouldReadSnapshot(Snapshot snapshot) {
+    public boolean shouldScanSnapshot(Snapshot snapshot) {
         if (snapshot.commitKind() == Snapshot.CommitKind.COMPACT) {
             return true;
         }
@@ -66,7 +48,7 @@ public class FullCompactionChangelogSnapshotEnumerator extends DataFileSnapshotE
     }
 
     @Override
-    protected DataTableScan.DataFilePlan getPlan(DataTableScan scan) {
-        return scan.withKind(ScanKind.CHANGELOG).plan();
+    public DataTableScan.DataFilePlan getPlan(long snapshotId, DataTableScan scan) {
+        return scan.withKind(ScanKind.CHANGELOG).withSnapshot(snapshotId).plan();
     }
 }
diff --git a/flink-table-store-core/src/main/java/org/apache/flink/table/store/table/source/snapshot/ContinuousDataFileSnapshotEnumerator.java b/flink-table-store-core/src/main/java/org/apache/flink/table/store/table/source/snapshot/ContinuousDataFileSnapshotEnumerator.java
new file mode 100644
index 00000000..326f7e62
--- /dev/null
+++ b/flink-table-store-core/src/main/java/org/apache/flink/table/store/table/source/snapshot/ContinuousDataFileSnapshotEnumerator.java
@@ -0,0 +1,147 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.store.table.source.snapshot;
+
+import org.apache.flink.core.fs.Path;
+import org.apache.flink.table.store.CoreOptions;
+import org.apache.flink.table.store.file.Snapshot;
+import org.apache.flink.table.store.file.utils.SnapshotManager;
+import org.apache.flink.table.store.table.FileStoreTable;
+import org.apache.flink.table.store.table.source.DataTableScan;
+import org.apache.flink.util.Preconditions;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import javax.annotation.Nullable;
+
+/** {@link SnapshotEnumerator} for streaming read. */
+public class ContinuousDataFileSnapshotEnumerator implements SnapshotEnumerator {
+
+    private static final Logger LOG =
+            LoggerFactory.getLogger(ContinuousDataFileSnapshotEnumerator.class);
+
+    private final SnapshotManager snapshotManager;
+    private final DataTableScan scan;
+    private final StartingScanner startingScanner;
+    private final FollowUpScanner followUpScanner;
+
+    private @Nullable Long nextSnapshotId;
+
+    public ContinuousDataFileSnapshotEnumerator(
+            Path tablePath,
+            DataTableScan scan,
+            StartingScanner startingScanner,
+            FollowUpScanner followUpScanner,
+            @Nullable Long nextSnapshotId) {
+        this.snapshotManager = new SnapshotManager(tablePath);
+        this.scan = scan;
+        this.startingScanner = startingScanner;
+        this.followUpScanner = followUpScanner;
+
+        this.nextSnapshotId = nextSnapshotId;
+    }
+
+    @Nullable
+    @Override
+    public DataTableScan.DataFilePlan enumerate() {
+        if (nextSnapshotId == null) {
+            return tryFirstEnumerate();
+        } else {
+            return nextEnumerate();
+        }
+    }
+
+    private DataTableScan.DataFilePlan tryFirstEnumerate() {
+        DataTableScan.DataFilePlan plan = startingScanner.getPlan(snapshotManager, scan);
+        if (plan != null) {
+            nextSnapshotId = plan.snapshotId + 1;
+        }
+        return plan;
+    }
+
+    private DataTableScan.DataFilePlan nextEnumerate() {
+        while (true) {
+            if (!snapshotManager.snapshotExists(nextSnapshotId)) {
+                LOG.debug(
+                        "Next snapshot id {} does not exist, wait for the snapshot generation.",
+                        nextSnapshotId);
+                return null;
+            }
+
+            Snapshot snapshot = snapshotManager.snapshot(nextSnapshotId);
+
+            if (followUpScanner.shouldScanSnapshot(snapshot)) {
+                LOG.debug("Find snapshot id {}.", nextSnapshotId);
+                DataTableScan.DataFilePlan plan = followUpScanner.getPlan(nextSnapshotId, scan);
+                nextSnapshotId++;
+                return plan;
+            } else {
+                nextSnapshotId++;
+            }
+        }
+    }
+
+    // ------------------------------------------------------------------------
+    //  static create methods
+    // ------------------------------------------------------------------------
+
+    public static ContinuousDataFileSnapshotEnumerator create(
+            FileStoreTable table, DataTableScan scan, Long nextSnapshotId) {
+        CoreOptions.StartupMode startupMode = table.options().startupMode();
+        Long startupMillis = table.options().logScanTimestampMills();
+        StartingScanner startingScanner;
+        if (startupMode == CoreOptions.StartupMode.FULL) {
+            startingScanner = new FullStartingScanner();
+        } else if (startupMode == CoreOptions.StartupMode.LATEST) {
+            startingScanner = new ContinuousLatestStartingScanner();
+        } else if (startupMode == CoreOptions.StartupMode.COMPACTED) {
+            startingScanner = new CompactedStartingScanner();
+        } else if (startupMode == CoreOptions.StartupMode.FROM_TIMESTAMP) {
+            Preconditions.checkNotNull(
+                    startupMillis,
+                    String.format(
+                            "%s can not be null when you use %s for %s",
+                            CoreOptions.SCAN_TIMESTAMP_MILLIS.key(),
+                            CoreOptions.StartupMode.FROM_TIMESTAMP,
+                            CoreOptions.SCAN_MODE.key()));
+            startingScanner = new ContinuousFromTimestampStartingScanner(startupMillis);
+        } else {
+            throw new UnsupportedOperationException("Unknown startup mode " + startupMode.name());
+        }
+
+        CoreOptions.ChangelogProducer changelogProducer = table.options().changelogProducer();
+        FollowUpScanner followUpScanner;
+        if (changelogProducer == CoreOptions.ChangelogProducer.NONE) {
+            followUpScanner = new DeltaFollowUpScanner();
+        } else if (changelogProducer == CoreOptions.ChangelogProducer.INPUT) {
+            followUpScanner = new InputChangelogFollowUpScanner();
+        } else if (changelogProducer == CoreOptions.ChangelogProducer.FULL_COMPACTION) {
+            // this change in scan will affect both starting scanner and follow-up scanner
+            scan.withLevel(table.options().numLevels() - 1);
+            followUpScanner = new CompactionChangelogFollowUpScanner();
+        } else {
+            throw new UnsupportedOperationException(
+                    "Unknown changelog producer " + changelogProducer.name());
+        }
+
+        return new ContinuousDataFileSnapshotEnumerator(
+                table.location(), scan, startingScanner, followUpScanner, nextSnapshotId);
+    }
+}
diff --git a/flink-table-store-core/src/main/java/org/apache/flink/table/store/table/source/snapshot/ContinuousFromTimestampStartingScanner.java b/flink-table-store-core/src/main/java/org/apache/flink/table/store/table/source/snapshot/ContinuousFromTimestampStartingScanner.java
new file mode 100644
index 00000000..d598cb34
--- /dev/null
+++ b/flink-table-store-core/src/main/java/org/apache/flink/table/store/table/source/snapshot/ContinuousFromTimestampStartingScanner.java
@@ -0,0 +1,54 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.store.table.source.snapshot;
+
+import org.apache.flink.table.store.file.utils.SnapshotManager;
+import org.apache.flink.table.store.table.source.DataTableScan;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.util.Collections;
+
+/**
+ * {@link StartingScanner} for the {@link
+ * org.apache.flink.table.store.CoreOptions.StartupMode#FROM_TIMESTAMP} startup mode of a streaming
+ * read.
+ */
+public class ContinuousFromTimestampStartingScanner implements StartingScanner {
+
+    private static final Logger LOG =
+            LoggerFactory.getLogger(ContinuousFromTimestampStartingScanner.class);
+
+    private final long startupMillis;
+
+    public ContinuousFromTimestampStartingScanner(long startupMillis) {
+        this.startupMillis = startupMillis;
+    }
+
+    @Override
+    public DataTableScan.DataFilePlan getPlan(SnapshotManager snapshotManager, DataTableScan scan) {
+        Long startingSnapshotId = snapshotManager.earlierThanTimeMills(startupMillis);
+        if (startingSnapshotId == null) {
+            LOG.debug("There is currently no snapshot. Waiting for snapshot generation.");
+            return null;
+        }
+        return new DataTableScan.DataFilePlan(startingSnapshotId, Collections.emptyList());
+    }
+}
diff --git a/flink-table-store-core/src/main/java/org/apache/flink/table/store/table/source/snapshot/ContinuousLatestStartingScanner.java b/flink-table-store-core/src/main/java/org/apache/flink/table/store/table/source/snapshot/ContinuousLatestStartingScanner.java
new file mode 100644
index 00000000..2a0a69d3
--- /dev/null
+++ b/flink-table-store-core/src/main/java/org/apache/flink/table/store/table/source/snapshot/ContinuousLatestStartingScanner.java
@@ -0,0 +1,47 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.store.table.source.snapshot;
+
+import org.apache.flink.table.store.file.utils.SnapshotManager;
+import org.apache.flink.table.store.table.source.DataTableScan;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.util.Collections;
+
+/**
+ * {@link StartingScanner} for the {@link
+ * org.apache.flink.table.store.CoreOptions.StartupMode#LATEST} startup mode of a streaming read.
+ */
+public class ContinuousLatestStartingScanner implements StartingScanner {
+
+    private static final Logger LOG =
+            LoggerFactory.getLogger(ContinuousLatestStartingScanner.class);
+
+    @Override
+    public DataTableScan.DataFilePlan getPlan(SnapshotManager snapshotManager, DataTableScan scan) {
+        Long startingSnapshotId = snapshotManager.latestSnapshotId();
+        if (startingSnapshotId == null) {
+            LOG.debug("There is currently no snapshot. Wait for the snapshot generation.");
+            return null;
+        }
+        return new DataTableScan.DataFilePlan(startingSnapshotId, Collections.emptyList());
+    }
+}
diff --git a/flink-table-store-core/src/main/java/org/apache/flink/table/store/table/source/snapshot/DataFileSnapshotEnumerator.java b/flink-table-store-core/src/main/java/org/apache/flink/table/store/table/source/snapshot/DataFileSnapshotEnumerator.java
deleted file mode 100644
index 54d201dd..00000000
--- a/flink-table-store-core/src/main/java/org/apache/flink/table/store/table/source/snapshot/DataFileSnapshotEnumerator.java
+++ /dev/null
@@ -1,159 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.table.store.table.source.snapshot;
-
-import org.apache.flink.core.fs.Path;
-import org.apache.flink.table.store.CoreOptions;
-import org.apache.flink.table.store.file.Snapshot;
-import org.apache.flink.table.store.file.operation.ScanKind;
-import org.apache.flink.table.store.file.utils.SnapshotManager;
-import org.apache.flink.table.store.table.FileStoreTable;
-import org.apache.flink.table.store.table.source.DataTableScan;
-import org.apache.flink.util.Preconditions;
-
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-import javax.annotation.Nullable;
-
-import java.util.Collections;
-
-/** Abstract class for all {@link SnapshotEnumerator}s which enumerate record related data files. */
-public abstract class DataFileSnapshotEnumerator implements SnapshotEnumerator {
-
-    private static final Logger LOG = LoggerFactory.getLogger(DataFileSnapshotEnumerator.class);
-
-    private final SnapshotManager snapshotManager;
-    private final DataTableScan scan;
-    private final CoreOptions.StartupMode startupMode;
-    private @Nullable final Long startupMillis;
-
-    private @Nullable Long nextSnapshotId;
-
-    public DataFileSnapshotEnumerator(
-            Path tablePath,
-            DataTableScan scan,
-            CoreOptions.StartupMode startupMode,
-            @Nullable Long startupMillis,
-            @Nullable Long nextSnapshotId) {
-        this.snapshotManager = new SnapshotManager(tablePath);
-        this.scan = scan;
-        this.startupMode = startupMode;
-        this.startupMillis = startupMillis;
-
-        this.nextSnapshotId = nextSnapshotId;
-    }
-
-    @Override
-    public DataTableScan.DataFilePlan enumerate() {
-        if (nextSnapshotId == null) {
-            return tryFirstEnumerate();
-        } else {
-            return nextEnumerate();
-        }
-    }
-
-    private DataTableScan.DataFilePlan tryFirstEnumerate() {
-        Long startingSnapshotId = snapshotManager.latestSnapshotId();
-        if (startingSnapshotId == null) {
-            LOG.debug("There is currently no snapshot. Wait for the snapshot generation.");
-            return null;
-        }
-
-        DataTableScan.DataFilePlan plan;
-        switch (startupMode) {
-            case FULL:
-                plan = scan.withKind(ScanKind.ALL).withSnapshot(startingSnapshotId).plan();
-                break;
-            case FROM_TIMESTAMP:
-                Preconditions.checkNotNull(
-                        startupMillis,
-                        String.format(
-                                "%s can not be null when you use %s for %s",
-                                CoreOptions.SCAN_TIMESTAMP_MILLIS.key(),
-                                CoreOptions.StartupMode.FROM_TIMESTAMP,
-                                CoreOptions.SCAN_MODE.key()));
-                startingSnapshotId = snapshotManager.earlierThanTimeMills(startupMillis);
-                plan = new DataTableScan.DataFilePlan(startingSnapshotId, Collections.emptyList());
-                break;
-            case LATEST:
-                plan = new DataTableScan.DataFilePlan(startingSnapshotId, Collections.emptyList());
-                break;
-            default:
-                throw new UnsupportedOperationException(
-                        "Unknown log startup mode " + startupMode.name());
-        }
-
-        nextSnapshotId = startingSnapshotId + 1;
-        return plan;
-    }
-
-    private DataTableScan.DataFilePlan nextEnumerate() {
-        while (true) {
-            if (!snapshotManager.snapshotExists(nextSnapshotId)) {
-                LOG.debug(
-                        "Next snapshot id {} does not exist, wait for the snapshot generation.",
-                        nextSnapshotId);
-                return null;
-            }
-
-            Snapshot snapshot = snapshotManager.snapshot(nextSnapshotId);
-
-            if (shouldReadSnapshot(snapshot)) {
-                LOG.debug("Find snapshot id {}.", nextSnapshotId);
-                DataTableScan.DataFilePlan plan = getPlan(scan.withSnapshot(nextSnapshotId));
-                nextSnapshotId++;
-                return plan;
-            } else {
-                nextSnapshotId++;
-            }
-        }
-    }
-
-    protected abstract boolean shouldReadSnapshot(Snapshot snapshot);
-
-    protected abstract DataTableScan.DataFilePlan getPlan(DataTableScan scan);
-
-    public static DataFileSnapshotEnumerator create(
-            FileStoreTable table, DataTableScan scan, Long nextSnapshotId) {
-        Path location = table.location();
-        CoreOptions.StartupMode startupMode = table.options().startupMode();
-        Long startupMillis = table.options().logScanTimestampMills();
-
-        switch (table.options().changelogProducer()) {
-            case NONE:
-                return new DeltaSnapshotEnumerator(
-                        location, scan, startupMode, startupMillis, nextSnapshotId);
-            case INPUT:
-                return new InputChangelogSnapshotEnumerator(
-                        location, scan, startupMode, startupMillis, nextSnapshotId);
-            case FULL_COMPACTION:
-                return new FullCompactionChangelogSnapshotEnumerator(
-                        location,
-                        scan,
-                        table.options().numLevels() - 1,
-                        startupMode,
-                        startupMillis,
-                        nextSnapshotId);
-            default:
-                throw new UnsupportedOperationException(
-                        "Unknown changelog producer " + table.options().changelogProducer().name());
-        }
-    }
-}
diff --git a/flink-table-store-core/src/main/java/org/apache/flink/table/store/table/source/snapshot/DeltaSnapshotEnumerator.java b/flink-table-store-core/src/main/java/org/apache/flink/table/store/table/source/snapshot/DeltaFollowUpScanner.java
similarity index 63%
rename from flink-table-store-core/src/main/java/org/apache/flink/table/store/table/source/snapshot/DeltaSnapshotEnumerator.java
rename to flink-table-store-core/src/main/java/org/apache/flink/table/store/table/source/snapshot/DeltaFollowUpScanner.java
index 36642ff8..4173aab0 100644
--- a/flink-table-store-core/src/main/java/org/apache/flink/table/store/table/source/snapshot/DeltaSnapshotEnumerator.java
+++ b/flink-table-store-core/src/main/java/org/apache/flink/table/store/table/source/snapshot/DeltaFollowUpScanner.java
@@ -18,8 +18,6 @@
 
 package org.apache.flink.table.store.table.source.snapshot;
 
-import org.apache.flink.core.fs.Path;
-import org.apache.flink.table.store.CoreOptions;
 import org.apache.flink.table.store.file.Snapshot;
 import org.apache.flink.table.store.file.operation.ScanKind;
 import org.apache.flink.table.store.table.source.DataTableScan;
@@ -27,27 +25,16 @@ import org.apache.flink.table.store.table.source.DataTableScan;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
-import javax.annotation.Nullable;
-
 /**
- * A {@link DataFileSnapshotEnumerator} which scans incremental changes in {@link
- * Snapshot#deltaManifestList()} for each newly created snapshots.
+ * {@link FollowUpScanner} for {@link
+ * org.apache.flink.table.store.CoreOptions.ChangelogProducer#NONE} changelog producer.
  */
-public class DeltaSnapshotEnumerator extends DataFileSnapshotEnumerator {
-
-    private static final Logger LOG = LoggerFactory.getLogger(DeltaSnapshotEnumerator.class);
+public class DeltaFollowUpScanner implements FollowUpScanner {
 
-    public DeltaSnapshotEnumerator(
-            Path tablePath,
-            DataTableScan scan,
-            CoreOptions.StartupMode startupMode,
-            @Nullable Long startupMillis,
-            @Nullable Long nextSnapshotId) {
-        super(tablePath, scan, startupMode, startupMillis, nextSnapshotId);
-    }
+    private static final Logger LOG = LoggerFactory.getLogger(DeltaFollowUpScanner.class);
 
     @Override
-    protected boolean shouldReadSnapshot(Snapshot snapshot) {
+    public boolean shouldScanSnapshot(Snapshot snapshot) {
         if (snapshot.commitKind() == Snapshot.CommitKind.APPEND) {
             return true;
         }
@@ -60,7 +47,7 @@ public class DeltaSnapshotEnumerator extends DataFileSnapshotEnumerator {
     }
 
     @Override
-    protected DataTableScan.DataFilePlan getPlan(DataTableScan scan) {
-        return scan.withKind(ScanKind.DELTA).plan();
+    public DataTableScan.DataFilePlan getPlan(long snapshotId, DataTableScan scan) {
+        return scan.withKind(ScanKind.DELTA).withSnapshot(snapshotId).plan();
     }
 }
diff --git a/flink-table-store-core/src/main/java/org/apache/flink/table/store/table/source/snapshot/FollowUpScanner.java b/flink-table-store-core/src/main/java/org/apache/flink/table/store/table/source/snapshot/FollowUpScanner.java
new file mode 100644
index 00000000..91d6da66
--- /dev/null
+++ b/flink-table-store-core/src/main/java/org/apache/flink/table/store/table/source/snapshot/FollowUpScanner.java
@@ -0,0 +1,30 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.store.table.source.snapshot;
+
+import org.apache.flink.table.store.file.Snapshot;
+import org.apache.flink.table.store.table.source.DataTableScan;
+
+/** Helper class for the follow-up enumeration of a {@link SnapshotEnumerator}. */
+public interface FollowUpScanner {
+
+    boolean shouldScanSnapshot(Snapshot snapshot);
+
+    DataTableScan.DataFilePlan getPlan(long snapshotId, DataTableScan scan);
+}
diff --git a/flink-table-store-core/src/main/java/org/apache/flink/table/store/table/source/snapshot/FullStartingScanner.java b/flink-table-store-core/src/main/java/org/apache/flink/table/store/table/source/snapshot/FullStartingScanner.java
new file mode 100644
index 00000000..ccb4a4b4
--- /dev/null
+++ b/flink-table-store-core/src/main/java/org/apache/flink/table/store/table/source/snapshot/FullStartingScanner.java
@@ -0,0 +1,45 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.store.table.source.snapshot;
+
+import org.apache.flink.table.store.file.operation.ScanKind;
+import org.apache.flink.table.store.file.utils.SnapshotManager;
+import org.apache.flink.table.store.table.source.DataTableScan;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+/**
+ * {@link StartingScanner} for the {@link org.apache.flink.table.store.CoreOptions.StartupMode#FULL}
+ * startup mode.
+ */
+public class FullStartingScanner implements StartingScanner {
+
+    private static final Logger LOG = LoggerFactory.getLogger(FullStartingScanner.class);
+
+    @Override
+    public DataTableScan.DataFilePlan getPlan(SnapshotManager snapshotManager, DataTableScan scan) {
+        Long startingSnapshotId = snapshotManager.latestSnapshotId();
+        if (startingSnapshotId == null) {
+            LOG.debug("There is currently no snapshot. Waiting for snapshot generation.");
+            return null;
+        }
+        return scan.withKind(ScanKind.ALL).withSnapshot(startingSnapshotId).plan();
+    }
+}
diff --git a/flink-table-store-core/src/main/java/org/apache/flink/table/store/table/source/snapshot/InputChangelogSnapshotEnumerator.java b/flink-table-store-core/src/main/java/org/apache/flink/table/store/table/source/snapshot/InputChangelogFollowUpScanner.java
similarity index 55%
rename from flink-table-store-core/src/main/java/org/apache/flink/table/store/table/source/snapshot/InputChangelogSnapshotEnumerator.java
rename to flink-table-store-core/src/main/java/org/apache/flink/table/store/table/source/snapshot/InputChangelogFollowUpScanner.java
index fffe71f4..d9b3a397 100644
--- a/flink-table-store-core/src/main/java/org/apache/flink/table/store/table/source/snapshot/InputChangelogSnapshotEnumerator.java
+++ b/flink-table-store-core/src/main/java/org/apache/flink/table/store/table/source/snapshot/InputChangelogFollowUpScanner.java
@@ -18,8 +18,6 @@
 
 package org.apache.flink.table.store.table.source.snapshot;
 
-import org.apache.flink.core.fs.Path;
-import org.apache.flink.table.store.CoreOptions;
 import org.apache.flink.table.store.file.Snapshot;
 import org.apache.flink.table.store.file.operation.ScanKind;
 import org.apache.flink.table.store.table.source.DataTableScan;
@@ -27,31 +25,16 @@ import org.apache.flink.table.store.table.source.DataTableScan;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
-import javax.annotation.Nullable;
-
 /**
- * A {@link DataFileSnapshotEnumerator} which scans incremental changes in {@link
- * Snapshot#changelogManifestList()} for each newly created snapshots.
- *
- * <p>This enumerator looks for changelog files produced directly from input. Can only be used when
- * {@link CoreOptions#CHANGELOG_PRODUCER} is set to {@link CoreOptions.ChangelogProducer#INPUT}.
+ * {@link FollowUpScanner} for {@link
+ * org.apache.flink.table.store.CoreOptions.ChangelogProducer#INPUT} changelog producer.
  */
-public class InputChangelogSnapshotEnumerator extends DataFileSnapshotEnumerator {
+public class InputChangelogFollowUpScanner implements FollowUpScanner {
 
-    private static final Logger LOG =
-            LoggerFactory.getLogger(InputChangelogSnapshotEnumerator.class);
-
-    public InputChangelogSnapshotEnumerator(
-            Path tablePath,
-            DataTableScan scan,
-            CoreOptions.StartupMode startupMode,
-            @Nullable Long startupMillis,
-            Long nextSnapshotId) {
-        super(tablePath, scan, startupMode, startupMillis, nextSnapshotId);
-    }
+    private static final Logger LOG = LoggerFactory.getLogger(InputChangelogFollowUpScanner.class);
 
     @Override
-    protected boolean shouldReadSnapshot(Snapshot snapshot) {
+    public boolean shouldScanSnapshot(Snapshot snapshot) {
         if (snapshot.commitKind() == Snapshot.CommitKind.APPEND) {
             return true;
         }
@@ -64,7 +47,7 @@ public class InputChangelogSnapshotEnumerator extends DataFileSnapshotEnumerator
     }
 
     @Override
-    protected DataTableScan.DataFilePlan getPlan(DataTableScan scan) {
-        return scan.withKind(ScanKind.CHANGELOG).plan();
+    public DataTableScan.DataFilePlan getPlan(long snapshotId, DataTableScan scan) {
+        return scan.withKind(ScanKind.CHANGELOG).withSnapshot(snapshotId).plan();
     }
 }
diff --git a/flink-table-store-core/src/main/java/org/apache/flink/table/store/table/source/snapshot/StartingScanner.java b/flink-table-store-core/src/main/java/org/apache/flink/table/store/table/source/snapshot/StartingScanner.java
new file mode 100644
index 00000000..7b658066
--- /dev/null
+++ b/flink-table-store-core/src/main/java/org/apache/flink/table/store/table/source/snapshot/StartingScanner.java
@@ -0,0 +1,28 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.store.table.source.snapshot;
+
+import org.apache.flink.table.store.file.utils.SnapshotManager;
+import org.apache.flink.table.store.table.source.DataTableScan;
+
+/** Helper class for the first enumeration of a {@link SnapshotEnumerator}. */
+public interface StartingScanner {
+
+    DataTableScan.DataFilePlan getPlan(SnapshotManager snapshotManager, DataTableScan scan);
+}
diff --git a/flink-table-store-core/src/main/java/org/apache/flink/table/store/table/source/snapshot/StaticDataFileSnapshotEnumerator.java b/flink-table-store-core/src/main/java/org/apache/flink/table/store/table/source/snapshot/StaticDataFileSnapshotEnumerator.java
new file mode 100644
index 00000000..f12e9ac1
--- /dev/null
+++ b/flink-table-store-core/src/main/java/org/apache/flink/table/store/table/source/snapshot/StaticDataFileSnapshotEnumerator.java
@@ -0,0 +1,77 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.store.table.source.snapshot;
+
+import org.apache.flink.core.fs.Path;
+import org.apache.flink.table.store.CoreOptions;
+import org.apache.flink.table.store.file.utils.SnapshotManager;
+import org.apache.flink.table.store.table.FileStoreTable;
+import org.apache.flink.table.store.table.source.DataTableScan;
+
+import javax.annotation.Nullable;
+
+/** {@link SnapshotEnumerator} for batch read. */
+public class StaticDataFileSnapshotEnumerator implements SnapshotEnumerator {
+
+    private final SnapshotManager snapshotManager;
+    private final DataTableScan scan;
+    private final StartingScanner startingScanner;
+
+    private boolean hasNext;
+
+    private StaticDataFileSnapshotEnumerator(
+            Path tablePath, DataTableScan scan, StartingScanner startingScanner) {
+        this.snapshotManager = new SnapshotManager(tablePath);
+        this.scan = scan;
+        this.startingScanner = startingScanner;
+
+        this.hasNext = true;
+    }
+
+    @Nullable
+    @Override
+    public DataTableScan.DataFilePlan enumerate() {
+        if (hasNext) {
+            hasNext = false;
+            return startingScanner.getPlan(snapshotManager, scan);
+        } else {
+            return null;
+        }
+    }
+
+    // ------------------------------------------------------------------------
+    //  static create methods
+    // ------------------------------------------------------------------------
+
+    public static StaticDataFileSnapshotEnumerator create(
+            FileStoreTable table, DataTableScan scan) {
+        CoreOptions.StartupMode startupMode = table.options().startupMode();
+        StartingScanner startingScanner;
+        if (startupMode == CoreOptions.StartupMode.FULL
+                || startupMode == CoreOptions.StartupMode.LATEST) {
+            startingScanner = new FullStartingScanner();
+        } else if (startupMode == CoreOptions.StartupMode.COMPACTED) {
+            startingScanner = new CompactedStartingScanner();
+        } else {
+            throw new UnsupportedOperationException("Unknown startup mode " + startupMode.name());
+        }
+
+        return new StaticDataFileSnapshotEnumerator(table.location(), scan, startingScanner);
+    }
+}
diff --git a/flink-table-store-core/src/test/java/org/apache/flink/table/store/table/ChangelogWithKeyFileStoreTableTest.java b/flink-table-store-core/src/test/java/org/apache/flink/table/store/table/ChangelogWithKeyFileStoreTableTest.java
index aae4a784..34ab1a17 100644
--- a/flink-table-store-core/src/test/java/org/apache/flink/table/store/table/ChangelogWithKeyFileStoreTableTest.java
+++ b/flink-table-store-core/src/test/java/org/apache/flink/table/store/table/ChangelogWithKeyFileStoreTableTest.java
@@ -38,7 +38,9 @@ import org.apache.flink.table.store.table.source.DataTableScan;
 import org.apache.flink.table.store.table.source.Split;
 import org.apache.flink.table.store.table.source.TableRead;
 import org.apache.flink.table.store.table.source.TableScan;
-import org.apache.flink.table.store.table.source.snapshot.InputChangelogSnapshotEnumerator;
+import org.apache.flink.table.store.table.source.snapshot.ContinuousDataFileSnapshotEnumerator;
+import org.apache.flink.table.store.table.source.snapshot.FullStartingScanner;
+import org.apache.flink.table.store.table.source.snapshot.InputChangelogFollowUpScanner;
 import org.apache.flink.table.store.table.source.snapshot.SnapshotEnumerator;
 import org.apache.flink.table.store.utils.CompatibilityTestUtils;
 import org.apache.flink.table.types.logical.LogicalType;
@@ -373,8 +375,12 @@ public class ChangelogWithKeyFileStoreTableTest extends FileStoreTableTestBase {
                                 Collections.singletonList("-D 2|10|301|binary|varbinary")));
 
         SnapshotEnumerator enumerator =
-                new InputChangelogSnapshotEnumerator(
-                        tablePath, table.newScan(), CoreOptions.StartupMode.FULL, null, 1L);
+                new ContinuousDataFileSnapshotEnumerator(
+                        tablePath,
+                        table.newScan(),
+                        new FullStartingScanner(),
+                        new InputChangelogFollowUpScanner(),
+                        1L);
 
         FunctionWithException<Integer, Void, Exception> assertNextSnapshot =
                 i -> {
diff --git a/flink-table-store-core/src/test/java/org/apache/flink/table/store/table/FileStoreTableTestBase.java b/flink-table-store-core/src/test/java/org/apache/flink/table/store/table/FileStoreTableTestBase.java
index 3bde3586..8965e97f 100644
--- a/flink-table-store-core/src/test/java/org/apache/flink/table/store/table/FileStoreTableTestBase.java
+++ b/flink-table-store-core/src/test/java/org/apache/flink/table/store/table/FileStoreTableTestBase.java
@@ -28,7 +28,6 @@ import org.apache.flink.table.data.RowData;
 import org.apache.flink.table.data.StringData;
 import org.apache.flink.table.data.binary.BinaryRowData;
 import org.apache.flink.table.data.writer.BinaryRowWriter;
-import org.apache.flink.table.store.CoreOptions;
 import org.apache.flink.table.store.file.Snapshot;
 import org.apache.flink.table.store.file.io.DataFileMeta;
 import org.apache.flink.table.store.file.mergetree.compact.ConcatRecordReader;
@@ -43,7 +42,6 @@ import org.apache.flink.table.store.table.sink.FileCommittable;
 import org.apache.flink.table.store.table.sink.TableCommit;
 import org.apache.flink.table.store.table.sink.TableWrite;
 import org.apache.flink.table.store.table.source.DataSplit;
-import org.apache.flink.table.store.table.source.DataTableScan;
 import org.apache.flink.table.store.table.source.Split;
 import org.apache.flink.table.store.table.source.TableRead;
 import org.apache.flink.table.types.logical.LogicalType;
@@ -60,7 +58,6 @@ import java.util.ArrayList;
 import java.util.Arrays;
 import java.util.Collections;
 import java.util.HashMap;
-import java.util.Iterator;
 import java.util.List;
 import java.util.Map;
 import java.util.UUID;
@@ -72,7 +69,6 @@ import java.util.stream.Collectors;
 import static org.apache.flink.table.store.CoreOptions.BUCKET;
 import static org.apache.flink.table.store.CoreOptions.BUCKET_KEY;
 import static org.apache.flink.table.store.CoreOptions.COMPACTION_MAX_FILE_NUM;
-import static org.apache.flink.table.store.CoreOptions.READ_COMPACTED;
 import static org.apache.flink.table.store.CoreOptions.WRITE_COMPACTION_SKIP;
 import static org.assertj.core.api.Assertions.assertThat;
 
@@ -320,58 +316,6 @@ public abstract class FileStoreTableTestBase {
         }
     }
 
-    @Test
-    public void testReadCompactedSnapshot() throws Exception {
-        writeCompactData();
-        FileStoreTable table = createFileStoreTable(conf -> conf.set(READ_COMPACTED, true));
-
-        DataTableScan.DataFilePlan plan = table.newScan().plan();
-        Snapshot compactedSnapshot = table.snapshotManager().snapshot(plan.snapshotId);
-        Iterator<Snapshot> snapshotIterator = table.snapshotManager().snapshots();
-        while (snapshotIterator.hasNext()) {
-            Snapshot snapshot = snapshotIterator.next();
-            if (snapshot.commitKind() == Snapshot.CommitKind.COMPACT) {
-                assertThat(snapshot.id()).isLessThanOrEqualTo(compactedSnapshot.id());
-            }
-        }
-
-        assertThat(compactedSnapshot.commitKind()).isEqualTo(Snapshot.CommitKind.COMPACT);
-        List<Split> splits = plan.splits();
-        TableRead read = table.newRead();
-        assertThat(getResult(read, splits, binaryRow(1), 0, BATCH_ROW_TO_STRING).size())
-                .isGreaterThan(0);
-        assertThat(getResult(read, splits, binaryRow(2), 0, BATCH_ROW_TO_STRING).size())
-                .isGreaterThan(0);
-
-        List<Split> compactedSplits =
-                table.newScan().withSnapshot(compactedSnapshot.id()).plan().splits();
-        assertThat(getResult(read, splits, binaryRow(1), 0, BATCH_ROW_TO_STRING))
-                .isEqualTo(getResult(read, compactedSplits, binaryRow(1), 0, BATCH_ROW_TO_STRING));
-        assertThat(getResult(read, splits, binaryRow(2), 0, BATCH_ROW_TO_STRING))
-                .isEqualTo(getResult(read, compactedSplits, binaryRow(2), 0, BATCH_ROW_TO_STRING));
-    }
-
-    private void writeCompactData() throws Exception {
-        final int batchCount = 10;
-        final int countPerBatch = 10000;
-        FileStoreTable table =
-                createFileStoreTable(
-                        conf ->
-                                conf.set(CoreOptions.BUCKET, 1)
-                                        .set(CoreOptions.READ_COMPACTED, true)
-                                        .set(CoreOptions.COMPACTION_MAX_FILE_NUM, 10));
-        TableWrite write = table.newWrite("user");
-        TableCommit commit = table.newCommit("user");
-
-        for (int i = 0; i < batchCount; i++) {
-            for (int j = 0; j < countPerBatch; j++) {
-                write.write(rowData(j % 2 + 1, i * j, j * 10L));
-            }
-            commit.commit(i, write.prepareCommit(true, i));
-        }
-        write.close();
-    }
-
     protected List<String> getResult(
             TableRead read,
             List<Split> splits,
diff --git a/flink-table-store-core/src/test/java/org/apache/flink/table/store/table/source/snapshot/CompactedStartingScannerTest.java b/flink-table-store-core/src/test/java/org/apache/flink/table/store/table/source/snapshot/CompactedStartingScannerTest.java
new file mode 100644
index 00000000..b7bf76e3
--- /dev/null
+++ b/flink-table-store-core/src/test/java/org/apache/flink/table/store/table/source/snapshot/CompactedStartingScannerTest.java
@@ -0,0 +1,99 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.store.table.source.snapshot;
+
+import org.apache.flink.table.store.file.utils.SnapshotManager;
+import org.apache.flink.table.store.table.FileStoreTable;
+import org.apache.flink.table.store.table.sink.TableCommit;
+import org.apache.flink.table.store.table.sink.TableWrite;
+import org.apache.flink.table.store.table.source.DataTableScan;
+import org.apache.flink.types.RowKind;
+
+import org.junit.jupiter.api.Test;
+
+import java.util.Arrays;
+
+import static org.assertj.core.api.Assertions.assertThat;
+
+/** Tests for {@link CompactedStartingScanner}. */
+public class CompactedStartingScannerTest extends SnapshotEnumeratorTestBase {
+
+    @Test
+    public void testGetPlan() throws Exception {
+        FileStoreTable table = createFileStoreTable();
+        SnapshotManager snapshotManager = table.snapshotManager();
+        TableWrite write = table.newWrite(commitUser);
+        TableCommit commit = table.newCommit(commitUser);
+
+        write.write(rowData(1, 10, 100L));
+        write.write(rowData(1, 20, 200L));
+        write.write(rowData(1, 40, 400L));
+        commit.commit(0, write.prepareCommit(true, 0));
+
+        write.write(rowData(1, 10, 101L));
+        write.write(rowData(1, 30, 300L));
+        write.write(rowDataWithKind(RowKind.DELETE, 1, 40, 400L));
+        write.compact(binaryRow(1), 0, true);
+        commit.commit(1, write.prepareCommit(true, 1));
+
+        write.write(rowData(1, 10, 102L));
+        write.write(rowData(1, 20, 201L));
+        commit.commit(2, write.prepareCommit(true, 2));
+
+        assertThat(snapshotManager.latestSnapshotId()).isEqualTo(4);
+
+        CompactedStartingScanner scanner = new CompactedStartingScanner();
+        DataTableScan.DataFilePlan plan = scanner.getPlan(snapshotManager, table.newScan());
+        assertThat(plan.snapshotId).isEqualTo(3);
+        assertThat(getResult(table.newRead(), plan.splits()))
+                .hasSameElementsAs(Arrays.asList("+I 1|10|101", "+I 1|20|200", "+I 1|30|300"));
+
+        write.close();
+        commit.close();
+    }
+
+    @Test
+    public void testNoSnapshot() throws Exception {
+        FileStoreTable table = createFileStoreTable();
+        SnapshotManager snapshotManager = table.snapshotManager();
+        CompactedStartingScanner scanner = new CompactedStartingScanner();
+        assertThat(scanner.getPlan(snapshotManager, table.newScan())).isNull();
+    }
+
+    @Test
+    public void testNoCompactSnapshot() throws Exception {
+        FileStoreTable table = createFileStoreTable();
+        SnapshotManager snapshotManager = table.snapshotManager();
+        TableWrite write = table.newWrite(commitUser);
+        TableCommit commit = table.newCommit(commitUser);
+
+        write.write(rowData(1, 10, 100L));
+        write.write(rowData(1, 20, 200L));
+        write.write(rowData(1, 40, 400L));
+        commit.commit(0, write.prepareCommit(true, 0));
+
+        assertThat(snapshotManager.latestSnapshotId()).isEqualTo(1);
+
+        CompactedStartingScanner scanner = new CompactedStartingScanner();
+        assertThat(scanner.getPlan(snapshotManager, table.newScan())).isNull();
+
+        write.close();
+        commit.close();
+    }
+}
diff --git a/flink-table-store-core/src/test/java/org/apache/flink/table/store/table/source/snapshot/CompactionChangelogFollowUpScannerTest.java b/flink-table-store-core/src/test/java/org/apache/flink/table/store/table/source/snapshot/CompactionChangelogFollowUpScannerTest.java
new file mode 100644
index 00000000..10f22cb0
--- /dev/null
+++ b/flink-table-store-core/src/test/java/org/apache/flink/table/store/table/source/snapshot/CompactionChangelogFollowUpScannerTest.java
@@ -0,0 +1,111 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.store.table.source.snapshot;
+
+import org.apache.flink.configuration.Configuration;
+import org.apache.flink.table.store.CoreOptions;
+import org.apache.flink.table.store.file.Snapshot;
+import org.apache.flink.table.store.file.utils.SnapshotManager;
+import org.apache.flink.table.store.table.FileStoreTable;
+import org.apache.flink.table.store.table.sink.TableCommit;
+import org.apache.flink.table.store.table.sink.TableWrite;
+import org.apache.flink.table.store.table.source.DataTableScan;
+import org.apache.flink.table.store.table.source.TableRead;
+import org.apache.flink.types.RowKind;
+
+import org.junit.jupiter.api.Test;
+
+import java.util.Arrays;
+
+import static org.assertj.core.api.Assertions.assertThat;
+
+/** Tests for {@link CompactionChangelogFollowUpScanner}. */
+public class CompactionChangelogFollowUpScannerTest extends SnapshotEnumeratorTestBase {
+
+    @Test
+    public void testGetPlan() throws Exception {
+        FileStoreTable table = createFileStoreTable();
+        SnapshotManager snapshotManager = table.snapshotManager();
+        TableWrite write = table.newWrite(commitUser);
+        TableCommit commit = table.newCommit(commitUser);
+
+        write.write(rowData(1, 10, 100L));
+        write.write(rowData(1, 20, 200L));
+        write.write(rowData(1, 40, 400L));
+        commit.commit(0, write.prepareCommit(true, 0));
+
+        write.write(rowData(1, 10, 101L));
+        write.write(rowData(1, 30, 300L));
+        write.write(rowData(1, 10, 102L));
+        write.write(rowDataWithKind(RowKind.DELETE, 1, 40, 400L));
+        write.compact(binaryRow(1), 0, true);
+        commit.commit(1, write.prepareCommit(true, 1));
+
+        write.write(rowData(1, 10, 103L));
+        write.write(rowDataWithKind(RowKind.DELETE, 1, 30, 300L));
+        write.write(rowData(1, 40, 401L));
+        write.compact(binaryRow(1), 0, true);
+        commit.commit(1, write.prepareCommit(true, 1));
+
+        assertThat(snapshotManager.latestSnapshotId()).isEqualTo(5);
+
+        DataTableScan scan = table.newScan().withLevel(table.options().numLevels() - 1);
+        TableRead read = table.newRead();
+        CompactionChangelogFollowUpScanner scanner = new CompactionChangelogFollowUpScanner();
+
+        Snapshot snapshot = snapshotManager.snapshot(1);
+        assertThat(snapshot.commitKind()).isEqualTo(Snapshot.CommitKind.APPEND);
+        assertThat(scanner.shouldScanSnapshot(snapshot)).isFalse();
+
+        snapshot = snapshotManager.snapshot(2);
+        assertThat(snapshot.commitKind()).isEqualTo(Snapshot.CommitKind.APPEND);
+        assertThat(scanner.shouldScanSnapshot(snapshot)).isFalse();
+
+        snapshot = snapshotManager.snapshot(3);
+        assertThat(snapshot.commitKind()).isEqualTo(Snapshot.CommitKind.COMPACT);
+        assertThat(scanner.shouldScanSnapshot(snapshot)).isTrue();
+        DataTableScan.DataFilePlan plan = scanner.getPlan(3, scan);
+        assertThat(plan.snapshotId).isEqualTo(3);
+        assertThat(getResult(read, plan.splits()))
+                .hasSameElementsAs(Arrays.asList("+I 1|10|102", "+I 1|20|200", "+I 1|30|300"));
+
+        snapshot = snapshotManager.snapshot(4);
+        assertThat(snapshot.commitKind()).isEqualTo(Snapshot.CommitKind.APPEND);
+        assertThat(scanner.shouldScanSnapshot(snapshot)).isFalse();
+
+        snapshot = snapshotManager.snapshot(5);
+        assertThat(snapshot.commitKind()).isEqualTo(Snapshot.CommitKind.COMPACT);
+        assertThat(scanner.shouldScanSnapshot(snapshot)).isTrue();
+        plan = scanner.getPlan(5, scan);
+        assertThat(plan.snapshotId).isEqualTo(5);
+        assertThat(getResult(read, plan.splits()))
+                .hasSameElementsAs(
+                        Arrays.asList("-U 1|10|102", "+U 1|10|103", "-D 1|30|300", "+I 1|40|401"));
+
+        write.close();
+        commit.close();
+    }
+
+    @Override
+    protected FileStoreTable createFileStoreTable() throws Exception {
+        Configuration conf = new Configuration();
+        conf.set(CoreOptions.CHANGELOG_PRODUCER, CoreOptions.ChangelogProducer.FULL_COMPACTION);
+        return createFileStoreTable(conf);
+    }
+}
diff --git a/flink-table-store-core/src/test/java/org/apache/flink/table/store/table/source/snapshot/DeltaSnapshotEnumeratorTest.java b/flink-table-store-core/src/test/java/org/apache/flink/table/store/table/source/snapshot/ContinuousDataFileSnapshotEnumeratorTest.java
similarity index 59%
rename from flink-table-store-core/src/test/java/org/apache/flink/table/store/table/source/snapshot/DeltaSnapshotEnumeratorTest.java
rename to flink-table-store-core/src/test/java/org/apache/flink/table/store/table/source/snapshot/ContinuousDataFileSnapshotEnumeratorTest.java
index 3252d022..666c22c3 100644
--- a/flink-table-store-core/src/test/java/org/apache/flink/table/store/table/source/snapshot/DeltaSnapshotEnumeratorTest.java
+++ b/flink-table-store-core/src/test/java/org/apache/flink/table/store/table/source/snapshot/ContinuousDataFileSnapshotEnumeratorTest.java
@@ -33,18 +33,17 @@ import java.util.Arrays;
 
 import static org.assertj.core.api.Assertions.assertThat;
 
-/** Tests for {@link DeltaSnapshotEnumerator}. */
-public class DeltaSnapshotEnumeratorTest extends DataFileSnapshotEnumeratorTestBase {
+/** Tests for {@link ContinuousDataFileSnapshotEnumerator}. */
+public class ContinuousDataFileSnapshotEnumeratorTest extends SnapshotEnumeratorTestBase {
 
     @Test
-    public void testFullStartupMode() throws Exception {
+    public void testEnumerate() throws Exception {
         FileStoreTable table = createFileStoreTable();
         TableRead read = table.newRead();
         TableWrite write = table.newWrite(commitUser);
         TableCommit commit = table.newCommit(commitUser);
         SnapshotEnumerator enumerator =
-                new DeltaSnapshotEnumerator(
-                        tablePath, table.newScan(), CoreOptions.StartupMode.FULL, null, null);
+                ContinuousDataFileSnapshotEnumerator.create(table, table.newScan(), null);
 
         // first call without any snapshot, should return null
         assertThat(enumerator.enumerate()).isNull();
@@ -101,134 +100,76 @@ public class DeltaSnapshotEnumeratorTest extends DataFileSnapshotEnumeratorTestB
     }
 
     @Test
-    public void testFromTimestampStartupMode() throws Exception {
-        FileStoreTable table = createFileStoreTable();
+    public void testFullCompactionChangelog() throws Exception {
+        Configuration conf = new Configuration();
+        conf.set(CoreOptions.CHANGELOG_PRODUCER, CoreOptions.ChangelogProducer.FULL_COMPACTION);
+
+        FileStoreTable table = createFileStoreTable(conf);
         TableRead read = table.newRead();
         TableWrite write = table.newWrite(commitUser);
         TableCommit commit = table.newCommit(commitUser);
+        SnapshotEnumerator enumerator =
+                ContinuousDataFileSnapshotEnumerator.create(table, table.newScan(), null);
+
+        // first call without any snapshot, should return null
+        assertThat(enumerator.enumerate()).isNull();
 
+        // write base data
         write.write(rowData(1, 10, 100L));
         write.write(rowData(1, 20, 200L));
         write.write(rowData(1, 40, 400L));
         commit.commit(0, write.prepareCommit(true, 0));
 
-        // log current time millis, we'll start from here
-        Thread.sleep(50);
-        long startMillis = System.currentTimeMillis();
-        Thread.sleep(50);
-
         write.write(rowData(1, 10, 101L));
         write.write(rowData(1, 30, 300L));
         write.write(rowDataWithKind(RowKind.DELETE, 1, 40, 400L));
+        write.compact(binaryRow(1), 0, true);
         commit.commit(1, write.prepareCommit(true, 1));
 
+        // some more records
         write.write(rowDataWithKind(RowKind.DELETE, 1, 10, 101L));
         write.write(rowData(1, 20, 201L));
         write.write(rowData(1, 10, 102L));
         write.write(rowData(1, 40, 400L));
         commit.commit(2, write.prepareCommit(true, 2));
 
-        // first call with snapshot, should return empty plan
-        SnapshotEnumerator enumerator =
-                new DeltaSnapshotEnumerator(
-                        tablePath,
-                        table.newScan(),
-                        CoreOptions.StartupMode.FROM_TIMESTAMP,
-                        startMillis,
-                        null);
-
+        // first call with snapshot, should return full compacted records from 3rd commit
         DataTableScan.DataFilePlan plan = enumerator.enumerate();
-        assertThat(plan.snapshotId).isEqualTo(1);
-        assertThat(plan.splits()).isEmpty();
-
-        // first incremental call, should return incremental records from 2nd commit
-        plan = enumerator.enumerate();
-        assertThat(plan.snapshotId).isEqualTo(2);
-        assertThat(getResult(read, plan.splits()))
-                .hasSameElementsAs(Arrays.asList("+I 1|10|101", "+I 1|30|300", "-D 1|40|400"));
-
-        // second incremental call, should return incremental records from 3rd commit
-        plan = enumerator.enumerate();
-        assertThat(plan.snapshotId).isEqualTo(3);
+        assertThat(plan.snapshotId).isEqualTo(4);
         assertThat(getResult(read, plan.splits()))
-                .hasSameElementsAs(Arrays.asList("+I 1|10|102", "+I 1|20|201", "+I 1|40|400"));
+                .hasSameElementsAs(Arrays.asList("+I 1|10|101", "+I 1|20|200", "+I 1|30|300"));
 
-        // no new snapshots
+        // incremental call without new snapshots, should return null
         assertThat(enumerator.enumerate()).isNull();
 
-        // more incremental records
+        // write incremental data
         write.write(rowData(1, 10, 103L));
         write.write(rowDataWithKind(RowKind.DELETE, 1, 40, 400L));
         write.write(rowData(1, 50, 500L));
         commit.commit(3, write.prepareCommit(true, 3));
 
-        plan = enumerator.enumerate();
-        assertThat(plan.snapshotId).isEqualTo(4);
-        assertThat(getResult(read, plan.splits()))
-                .hasSameElementsAs(Arrays.asList("+I 1|10|103", "-D 1|40|400", "+I 1|50|500"));
-
-        assertThat(enumerator.enumerate()).isNull();
-
-        write.close();
-        commit.close();
-    }
-
-    @Test
-    public void testLatestStartupMode() throws Exception {
-        FileStoreTable table = createFileStoreTable();
-        TableRead read = table.newRead();
-        TableWrite write = table.newWrite(commitUser);
-        TableCommit commit = table.newCommit(commitUser);
-
-        write.write(rowData(1, 10, 100L));
-        write.write(rowData(1, 20, 200L));
-        write.write(rowData(1, 40, 400L));
-        commit.commit(0, write.prepareCommit(true, 0));
-
-        write.write(rowData(1, 10, 101L));
-        write.write(rowData(1, 30, 300L));
-        write.write(rowDataWithKind(RowKind.DELETE, 1, 40, 400L));
-        commit.commit(1, write.prepareCommit(true, 1));
-
-        SnapshotEnumerator enumerator =
-                new DeltaSnapshotEnumerator(
-                        tablePath, table.newScan(), CoreOptions.StartupMode.LATEST, null, null);
-
-        DataTableScan.DataFilePlan plan = enumerator.enumerate();
-        assertThat(plan.snapshotId).isEqualTo(2);
-        assertThat(plan.splits()).isEmpty();
-
+        // no new compact snapshots, should return null
         assertThat(enumerator.enumerate()).isNull();
 
-        write.write(rowDataWithKind(RowKind.DELETE, 1, 10, 101L));
-        write.write(rowData(1, 20, 201L));
-        write.write(rowData(1, 10, 102L));
-        write.write(rowData(1, 40, 400L));
-        commit.commit(2, write.prepareCommit(true, 2));
+        write.compact(binaryRow(1), 0, true);
+        commit.commit(4, write.prepareCommit(true, 4));
 
+        // full compaction done, read new changelog
         plan = enumerator.enumerate();
-        assertThat(plan.snapshotId).isEqualTo(3);
+        assertThat(plan.snapshotId).isEqualTo(6);
         assertThat(getResult(read, plan.splits()))
-                .hasSameElementsAs(Arrays.asList("+I 1|10|102", "+I 1|20|201", "+I 1|40|400"));
-        assertThat(enumerator.enumerate()).isNull();
-
-        write.write(rowData(1, 10, 103L));
-        write.write(rowDataWithKind(RowKind.DELETE, 1, 40, 400L));
-        write.write(rowData(1, 50, 500L));
-        commit.commit(3, write.prepareCommit(true, 3));
+                .hasSameElementsAs(
+                        Arrays.asList(
+                                "-U 1|10|101",
+                                "+U 1|10|103",
+                                "-U 1|20|200",
+                                "+U 1|20|201",
+                                "+I 1|50|500"));
 
-        plan = enumerator.enumerate();
-        assertThat(plan.snapshotId).isEqualTo(4);
-        assertThat(getResult(read, plan.splits()))
-                .hasSameElementsAs(Arrays.asList("+I 1|10|103", "-D 1|40|400", "+I 1|50|500"));
+        // no more new snapshots, should return null
         assertThat(enumerator.enumerate()).isNull();
 
         write.close();
         commit.close();
     }
-
-    @Override
-    protected Configuration getConf() {
-        return new Configuration();
-    }
 }
diff --git a/flink-table-store-core/src/test/java/org/apache/flink/table/store/table/source/snapshot/ContinuousFromTimestampStartingScannerTest.java b/flink-table-store-core/src/test/java/org/apache/flink/table/store/table/source/snapshot/ContinuousFromTimestampStartingScannerTest.java
new file mode 100644
index 00000000..eb68eb6c
--- /dev/null
+++ b/flink-table-store-core/src/test/java/org/apache/flink/table/store/table/source/snapshot/ContinuousFromTimestampStartingScannerTest.java
@@ -0,0 +1,107 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.store.table.source.snapshot;
+
+import org.apache.flink.table.store.file.utils.SnapshotManager;
+import org.apache.flink.table.store.table.FileStoreTable;
+import org.apache.flink.table.store.table.sink.TableCommit;
+import org.apache.flink.table.store.table.sink.TableWrite;
+import org.apache.flink.table.store.table.source.DataTableScan;
+import org.apache.flink.types.RowKind;
+
+import org.junit.jupiter.api.Test;
+
+import static org.assertj.core.api.Assertions.assertThat;
+
+/** Tests for {@link ContinuousFromTimestampStartingScanner}. */
+public class ContinuousFromTimestampStartingScannerTest extends SnapshotEnumeratorTestBase {
+
+    @Test
+    public void testGetPlan() throws Exception {
+        FileStoreTable table = createFileStoreTable();
+        SnapshotManager snapshotManager = table.snapshotManager();
+        TableWrite write = table.newWrite(commitUser);
+        TableCommit commit = table.newCommit(commitUser);
+
+        write.write(rowData(1, 10, 100L));
+        write.write(rowData(1, 20, 200L));
+        write.write(rowData(1, 40, 400L));
+        commit.commit(0, write.prepareCommit(true, 0));
+
+        write.write(rowData(1, 10, 101L));
+        write.write(rowData(1, 30, 300L));
+        write.write(rowDataWithKind(RowKind.DELETE, 1, 40, 400L));
+        commit.commit(1, write.prepareCommit(true, 1));
+
+        // wait for a little while
+        Thread.sleep(50);
+
+        write.write(rowData(1, 10, 102L));
+        write.write(rowData(1, 20, 201L));
+        commit.commit(2, write.prepareCommit(true, 2));
+
+        assertThat(snapshotManager.latestSnapshotId()).isEqualTo(3);
+        // snapshot 3 should not be read
+        long timestamp = snapshotManager.snapshot(3).timeMillis();
+
+        ContinuousFromTimestampStartingScanner scanner =
+                new ContinuousFromTimestampStartingScanner(timestamp);
+        DataTableScan.DataFilePlan plan = scanner.getPlan(snapshotManager, table.newScan());
+        assertThat(plan.snapshotId).isEqualTo(2);
+        assertThat(getResult(table.newRead(), plan.splits())).isEmpty();
+
+        write.close();
+        commit.close();
+    }
+
+    @Test
+    public void testNoSnapshot() throws Exception {
+        FileStoreTable table = createFileStoreTable();
+        SnapshotManager snapshotManager = table.snapshotManager();
+        ContinuousFromTimestampStartingScanner scanner =
+                new ContinuousFromTimestampStartingScanner(System.currentTimeMillis());
+        assertThat(scanner.getPlan(snapshotManager, table.newScan())).isNull();
+    }
+
+    @Test
+    public void testNoSnapshotBeforeTimestamp() throws Exception {
+        FileStoreTable table = createFileStoreTable();
+        SnapshotManager snapshotManager = table.snapshotManager();
+        TableWrite write = table.newWrite(commitUser);
+        TableCommit commit = table.newCommit(commitUser);
+
+        write.write(rowData(1, 10, 100L));
+        write.write(rowData(1, 20, 200L));
+        write.write(rowData(1, 40, 400L));
+        commit.commit(0, write.prepareCommit(true, 0));
+
+        assertThat(snapshotManager.latestSnapshotId()).isEqualTo(1);
+        // snapshot 1 should not be read
+        long timestamp = snapshotManager.snapshot(1).timeMillis();
+
+        ContinuousFromTimestampStartingScanner scanner =
+                new ContinuousFromTimestampStartingScanner(timestamp);
+        DataTableScan.DataFilePlan plan = scanner.getPlan(snapshotManager, table.newScan());
+        assertThat(plan.snapshotId).isEqualTo(0);
+        assertThat(getResult(table.newRead(), plan.splits())).isEmpty();
+
+        write.close();
+        commit.close();
+    }
+}
diff --git a/flink-table-store-core/src/test/java/org/apache/flink/table/store/table/source/snapshot/ContinuousLatestStartingScannerTest.java b/flink-table-store-core/src/test/java/org/apache/flink/table/store/table/source/snapshot/ContinuousLatestStartingScannerTest.java
new file mode 100644
index 00000000..d4917792
--- /dev/null
+++ b/flink-table-store-core/src/test/java/org/apache/flink/table/store/table/source/snapshot/ContinuousLatestStartingScannerTest.java
@@ -0,0 +1,70 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.store.table.source.snapshot;
+
+import org.apache.flink.table.store.file.utils.SnapshotManager;
+import org.apache.flink.table.store.table.FileStoreTable;
+import org.apache.flink.table.store.table.sink.TableCommit;
+import org.apache.flink.table.store.table.sink.TableWrite;
+import org.apache.flink.table.store.table.source.DataTableScan;
+import org.apache.flink.types.RowKind;
+
+import org.junit.jupiter.api.Test;
+
+import static org.assertj.core.api.Assertions.assertThat;
+
+/** Tests for {@link ContinuousLatestStartingScanner}. */
+public class ContinuousLatestStartingScannerTest extends SnapshotEnumeratorTestBase {
+
+    @Test
+    public void testGetPlan() throws Exception {
+        FileStoreTable table = createFileStoreTable();
+        SnapshotManager snapshotManager = table.snapshotManager();
+        TableWrite write = table.newWrite(commitUser);
+        TableCommit commit = table.newCommit(commitUser);
+
+        write.write(rowData(1, 10, 100L));
+        write.write(rowData(1, 20, 200L));
+        write.write(rowData(1, 40, 400L));
+        commit.commit(0, write.prepareCommit(true, 0));
+
+        write.write(rowData(1, 10, 101L));
+        write.write(rowData(1, 30, 300L));
+        write.write(rowDataWithKind(RowKind.DELETE, 1, 40, 400L));
+        commit.commit(1, write.prepareCommit(true, 1));
+
+        assertThat(snapshotManager.latestSnapshotId()).isEqualTo(2);
+
+        ContinuousLatestStartingScanner scanner = new ContinuousLatestStartingScanner();
+        DataTableScan.DataFilePlan plan = scanner.getPlan(snapshotManager, table.newScan());
+        assertThat(plan.snapshotId).isEqualTo(2);
+        assertThat(getResult(table.newRead(), plan.splits())).isEmpty();
+
+        write.close();
+        commit.close();
+    }
+
+    @Test
+    public void testNoSnapshot() throws Exception {
+        FileStoreTable table = createFileStoreTable();
+        SnapshotManager snapshotManager = table.snapshotManager();
+        ContinuousLatestStartingScanner scanner = new ContinuousLatestStartingScanner();
+        assertThat(scanner.getPlan(snapshotManager, table.newScan())).isNull();
+    }
+}
diff --git a/flink-table-store-core/src/test/java/org/apache/flink/table/store/table/source/snapshot/DeltaFollowUpScannerTest.java b/flink-table-store-core/src/test/java/org/apache/flink/table/store/table/source/snapshot/DeltaFollowUpScannerTest.java
new file mode 100644
index 00000000..080812a2
--- /dev/null
+++ b/flink-table-store-core/src/test/java/org/apache/flink/table/store/table/source/snapshot/DeltaFollowUpScannerTest.java
@@ -0,0 +1,87 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.store.table.source.snapshot;
+
+import org.apache.flink.table.store.file.Snapshot;
+import org.apache.flink.table.store.file.utils.SnapshotManager;
+import org.apache.flink.table.store.table.FileStoreTable;
+import org.apache.flink.table.store.table.sink.TableCommit;
+import org.apache.flink.table.store.table.sink.TableWrite;
+import org.apache.flink.table.store.table.source.DataTableScan;
+import org.apache.flink.table.store.table.source.TableRead;
+import org.apache.flink.types.RowKind;
+
+import org.junit.jupiter.api.Test;
+
+import java.util.Arrays;
+
+import static org.assertj.core.api.Assertions.assertThat;
+
+/** Tests for {@link DeltaFollowUpScanner}. */
+public class DeltaFollowUpScannerTest extends SnapshotEnumeratorTestBase {
+
+    @Test
+    public void testGetPlan() throws Exception {
+        FileStoreTable table = createFileStoreTable();
+        SnapshotManager snapshotManager = table.snapshotManager();
+        TableWrite write = table.newWrite(commitUser);
+        TableCommit commit = table.newCommit(commitUser);
+
+        write.write(rowData(1, 10, 100L));
+        write.write(rowData(1, 20, 200L));
+        write.write(rowData(1, 40, 400L));
+        commit.commit(0, write.prepareCommit(true, 0));
+
+        write.write(rowData(1, 10, 101L));
+        write.write(rowData(1, 30, 300L));
+        write.write(rowData(1, 10, 102L));
+        write.write(rowDataWithKind(RowKind.DELETE, 1, 40, 400L));
+        write.compact(binaryRow(1), 0, true);
+        commit.commit(1, write.prepareCommit(true, 1));
+
+        assertThat(snapshotManager.latestSnapshotId()).isEqualTo(3);
+
+        DataTableScan scan = table.newScan();
+        TableRead read = table.newRead();
+        DeltaFollowUpScanner scanner = new DeltaFollowUpScanner();
+
+        Snapshot snapshot = snapshotManager.snapshot(1);
+        assertThat(snapshot.commitKind()).isEqualTo(Snapshot.CommitKind.APPEND);
+        assertThat(scanner.shouldScanSnapshot(snapshot)).isTrue();
+        DataTableScan.DataFilePlan plan = scanner.getPlan(1, scan);
+        assertThat(plan.snapshotId).isEqualTo(1);
+        assertThat(getResult(read, plan.splits()))
+                .hasSameElementsAs(Arrays.asList("+I 1|10|100", "+I 1|20|200", "+I 1|40|400"));
+
+        snapshot = snapshotManager.snapshot(2);
+        assertThat(snapshot.commitKind()).isEqualTo(Snapshot.CommitKind.APPEND);
+        assertThat(scanner.shouldScanSnapshot(snapshot)).isTrue();
+        plan = scanner.getPlan(2, scan);
+        assertThat(plan.snapshotId).isEqualTo(2);
+        assertThat(getResult(read, plan.splits()))
+                .hasSameElementsAs(Arrays.asList("+I 1|10|102", "+I 1|30|300", "-D 1|40|400"));
+
+        snapshot = snapshotManager.snapshot(3);
+        assertThat(snapshot.commitKind()).isEqualTo(Snapshot.CommitKind.COMPACT);
+        assertThat(scanner.shouldScanSnapshot(snapshot)).isFalse();
+
+        write.close();
+        commit.close();
+    }
+}
diff --git a/flink-table-store-core/src/test/java/org/apache/flink/table/store/table/source/snapshot/FullCompactionChangelogSnapshotEnumeratorTest.java b/flink-table-store-core/src/test/java/org/apache/flink/table/store/table/source/snapshot/FullCompactionChangelogSnapshotEnumeratorTest.java
deleted file mode 100644
index 9a3e890f..00000000
--- a/flink-table-store-core/src/test/java/org/apache/flink/table/store/table/source/snapshot/FullCompactionChangelogSnapshotEnumeratorTest.java
+++ /dev/null
@@ -1,256 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.table.store.table.source.snapshot;
-
-import org.apache.flink.configuration.Configuration;
-import org.apache.flink.table.store.CoreOptions;
-import org.apache.flink.table.store.table.FileStoreTable;
-import org.apache.flink.table.store.table.sink.TableCommit;
-import org.apache.flink.table.store.table.sink.TableWrite;
-import org.apache.flink.table.store.table.source.DataTableScan;
-import org.apache.flink.table.store.table.source.TableRead;
-import org.apache.flink.types.RowKind;
-
-import org.junit.jupiter.api.Test;
-
-import java.util.Arrays;
-
-import static org.assertj.core.api.Assertions.assertThat;
-
-/** Tests for {@link FullCompactionChangelogSnapshotEnumerator}. */
-public class FullCompactionChangelogSnapshotEnumeratorTest
-        extends DataFileSnapshotEnumeratorTestBase {
-
-    @Test
-    public void testFullStartupMode() throws Exception {
-        FileStoreTable table = createFileStoreTable();
-        TableRead read = table.newRead();
-        TableWrite write = table.newWrite(commitUser);
-        TableCommit commit = table.newCommit(commitUser);
-        SnapshotEnumerator enumerator =
-                new FullCompactionChangelogSnapshotEnumerator(
-                        tablePath,
-                        table.newScan(),
-                        table.options().numLevels() - 1,
-                        CoreOptions.StartupMode.FULL,
-                        null,
-                        null);
-
-        // first call without any snapshot, should return null
-        assertThat(enumerator.enumerate()).isNull();
-
-        // write base data
-        write.write(rowData(1, 10, 100L));
-        write.write(rowData(1, 20, 200L));
-        write.write(rowData(1, 40, 400L));
-        commit.commit(0, write.prepareCommit(true, 0));
-
-        write.write(rowData(1, 10, 101L));
-        write.write(rowData(1, 30, 300L));
-        write.write(rowDataWithKind(RowKind.DELETE, 1, 40, 400L));
-        write.compact(binaryRow(1), 0, true);
-        commit.commit(1, write.prepareCommit(true, 1));
-
-        // some more records
-        write.write(rowDataWithKind(RowKind.DELETE, 1, 10, 101L));
-        write.write(rowData(1, 20, 201L));
-        write.write(rowData(1, 10, 102L));
-        write.write(rowData(1, 40, 400L));
-        commit.commit(2, write.prepareCommit(true, 2));
-
-        // first call with snapshot, should return full compacted records from 3rd commit
-        DataTableScan.DataFilePlan plan = enumerator.enumerate();
-        assertThat(plan.snapshotId).isEqualTo(4);
-        assertThat(getResult(read, plan.splits()))
-                .hasSameElementsAs(Arrays.asList("+I 1|10|101", "+I 1|20|200", "+I 1|30|300"));
-
-        // incremental call without new snapshots, should return null
-        assertThat(enumerator.enumerate()).isNull();
-
-        // write incremental data
-        write.write(rowData(1, 10, 103L));
-        write.write(rowDataWithKind(RowKind.DELETE, 1, 40, 400L));
-        write.write(rowData(1, 50, 500L));
-        commit.commit(3, write.prepareCommit(true, 3));
-
-        // no new compact snapshots, should return null
-        assertThat(enumerator.enumerate()).isNull();
-
-        write.compact(binaryRow(1), 0, true);
-        commit.commit(4, write.prepareCommit(true, 4));
-
-        // full compaction done, read new changelog
-        plan = enumerator.enumerate();
-        assertThat(plan.snapshotId).isEqualTo(6);
-        assertThat(getResult(read, plan.splits()))
-                .hasSameElementsAs(
-                        Arrays.asList(
-                                "-U 1|10|101",
-                                "+U 1|10|103",
-                                "-U 1|20|200",
-                                "+U 1|20|201",
-                                "+I 1|50|500"));
-
-        // no more new snapshots, should return null
-        assertThat(enumerator.enumerate()).isNull();
-
-        write.close();
-        commit.close();
-    }
-
-    @Test
-    public void testFromTimestampStartupMode() throws Exception {
-        FileStoreTable table = createFileStoreTable();
-        TableRead read = table.newRead();
-        TableWrite write = table.newWrite(commitUser);
-        TableCommit commit = table.newCommit(commitUser);
-
-        write.write(rowData(1, 10, 100L));
-        write.write(rowData(1, 20, 200L));
-        write.write(rowData(1, 40, 400L));
-        commit.commit(0, write.prepareCommit(true, 0));
-
-        write.write(rowData(1, 10, 101L));
-        write.write(rowData(1, 30, 300L));
-        write.write(rowDataWithKind(RowKind.DELETE, 1, 40, 400L));
-        write.compact(binaryRow(1), 0, true);
-        commit.commit(1, write.prepareCommit(true, 1));
-
-        // log current time millis, we'll start from here
-        Thread.sleep(50);
-        long startMillis = System.currentTimeMillis();
-        Thread.sleep(50);
-
-        write.write(rowDataWithKind(RowKind.DELETE, 1, 10, 101L));
-        write.write(rowData(1, 20, 201L));
-        write.write(rowData(1, 10, 102L));
-        write.write(rowData(1, 40, 400L));
-        commit.commit(2, write.prepareCommit(true, 2));
-
-        SnapshotEnumerator enumerator =
-                new FullCompactionChangelogSnapshotEnumerator(
-                        tablePath,
-                        table.newScan(),
-                        table.options().numLevels() - 1,
-                        CoreOptions.StartupMode.FROM_TIMESTAMP,
-                        startMillis,
-                        null);
-
-        // first call, should return empty plan
-        DataTableScan.DataFilePlan plan = enumerator.enumerate();
-        assertThat(plan.snapshotId).isEqualTo(3);
-        assertThat(plan.splits()).isEmpty();
-
-        // first incremental call, no new compact snapshot, should be null
-        assertThat(enumerator.enumerate()).isNull();
-
-        write.write(rowData(1, 10, 103L));
-        write.write(rowDataWithKind(RowKind.DELETE, 1, 40, 400L));
-        write.write(rowData(1, 50, 500L));
-        write.compact(binaryRow(1), 0, true);
-        commit.commit(3, write.prepareCommit(true, 3));
-
-        plan = enumerator.enumerate();
-        assertThat(plan.snapshotId).isEqualTo(6);
-        assertThat(getResult(read, plan.splits()))
-                .hasSameElementsAs(
-                        Arrays.asList(
-                                "-U 1|10|101",
-                                "+U 1|10|103",
-                                "-U 1|20|200",
-                                "+U 1|20|201",
-                                "+I 1|50|500"));
-
-        assertThat(enumerator.enumerate()).isNull();
-
-        write.close();
-        commit.close();
-    }
-
-    @Test
-    public void testLatestStartupMode() throws Exception {
-        FileStoreTable table = createFileStoreTable();
-        TableRead read = table.newRead();
-        TableWrite write = table.newWrite(commitUser);
-        TableCommit commit = table.newCommit(commitUser);
-
-        write.write(rowData(1, 10, 100L));
-        write.write(rowData(1, 20, 200L));
-        write.write(rowData(1, 40, 400L));
-        commit.commit(0, write.prepareCommit(true, 0));
-
-        write.write(rowData(1, 10, 101L));
-        write.write(rowData(1, 30, 300L));
-        write.write(rowDataWithKind(RowKind.DELETE, 1, 40, 400L));
-        write.compact(binaryRow(1), 0, true);
-        commit.commit(1, write.prepareCommit(true, 1));
-
-        SnapshotEnumerator enumerator =
-                new FullCompactionChangelogSnapshotEnumerator(
-                        tablePath,
-                        table.newScan(),
-                        table.options().numLevels() - 1,
-                        CoreOptions.StartupMode.LATEST,
-                        null,
-                        null);
-
-        // first call, should be empty plan
-        DataTableScan.DataFilePlan plan = enumerator.enumerate();
-        assertThat(plan.snapshotId).isEqualTo(3);
-        assertThat(plan.splits()).isEmpty();
-
-        write.write(rowDataWithKind(RowKind.DELETE, 1, 10, 101L));
-        write.write(rowData(1, 20, 201L));
-        write.write(rowData(1, 10, 102L));
-        write.write(rowData(1, 40, 400L));
-        commit.commit(2, write.prepareCommit(true, 2));
-
-        // first incremental call, no new compact snapshot, should be null
-        assertThat(enumerator.enumerate()).isNull();
-
-        write.write(rowData(1, 10, 103L));
-        write.write(rowDataWithKind(RowKind.DELETE, 1, 40, 400L));
-        write.write(rowData(1, 50, 500L));
-        write.compact(binaryRow(1), 0, true);
-        commit.commit(3, write.prepareCommit(true, 3));
-
-        plan = enumerator.enumerate();
-        assertThat(plan.snapshotId).isEqualTo(6);
-        assertThat(getResult(read, plan.splits()))
-                .hasSameElementsAs(
-                        Arrays.asList(
-                                "-U 1|10|101",
-                                "+U 1|10|103",
-                                "-U 1|20|200",
-                                "+U 1|20|201",
-                                "+I 1|50|500"));
-
-        assertThat(enumerator.enumerate()).isNull();
-
-        write.close();
-        commit.close();
-    }
-
-    @Override
-    protected Configuration getConf() {
-        Configuration conf = new Configuration();
-        conf.set(CoreOptions.CHANGELOG_PRODUCER, CoreOptions.ChangelogProducer.FULL_COMPACTION);
-        return conf;
-    }
-}
diff --git a/flink-table-store-core/src/test/java/org/apache/flink/table/store/table/source/snapshot/FullStartingScannerTest.java b/flink-table-store-core/src/test/java/org/apache/flink/table/store/table/source/snapshot/FullStartingScannerTest.java
new file mode 100644
index 00000000..c8ca7c56
--- /dev/null
+++ b/flink-table-store-core/src/test/java/org/apache/flink/table/store/table/source/snapshot/FullStartingScannerTest.java
@@ -0,0 +1,73 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.store.table.source.snapshot;
+
+import org.apache.flink.table.store.file.utils.SnapshotManager;
+import org.apache.flink.table.store.table.FileStoreTable;
+import org.apache.flink.table.store.table.sink.TableCommit;
+import org.apache.flink.table.store.table.sink.TableWrite;
+import org.apache.flink.table.store.table.source.DataTableScan;
+import org.apache.flink.types.RowKind;
+
+import org.junit.jupiter.api.Test;
+
+import java.util.Arrays;
+
+import static org.assertj.core.api.Assertions.assertThat;
+
+/** Tests for {@link FullStartingScanner}. */
+public class FullStartingScannerTest extends SnapshotEnumeratorTestBase {
+
+    @Test
+    public void testGetPlan() throws Exception {
+        FileStoreTable table = createFileStoreTable();
+        SnapshotManager snapshotManager = table.snapshotManager();
+        TableWrite write = table.newWrite(commitUser);
+        TableCommit commit = table.newCommit(commitUser);
+
+        write.write(rowData(1, 10, 100L));
+        write.write(rowData(1, 20, 200L));
+        write.write(rowData(1, 40, 400L));
+        commit.commit(0, write.prepareCommit(true, 0));
+
+        write.write(rowData(1, 10, 101L));
+        write.write(rowData(1, 30, 300L));
+        write.write(rowDataWithKind(RowKind.DELETE, 1, 40, 400L));
+        commit.commit(1, write.prepareCommit(true, 1));
+
+        assertThat(snapshotManager.latestSnapshotId()).isEqualTo(2);
+
+        FullStartingScanner scanner = new FullStartingScanner();
+        DataTableScan.DataFilePlan plan = scanner.getPlan(snapshotManager, table.newScan());
+        assertThat(plan.snapshotId).isEqualTo(2);
+        assertThat(getResult(table.newRead(), plan.splits()))
+                .hasSameElementsAs(Arrays.asList("+I 1|10|101", "+I 1|20|200", "+I 1|30|300"));
+
+        write.close();
+        commit.close();
+    }
+
+    @Test
+    public void testNoSnapshot() throws Exception {
+        FileStoreTable table = createFileStoreTable();
+        SnapshotManager snapshotManager = table.snapshotManager();
+        FullStartingScanner scanner = new FullStartingScanner();
+        assertThat(scanner.getPlan(snapshotManager, table.newScan())).isNull();
+    }
+}
diff --git a/flink-table-store-core/src/test/java/org/apache/flink/table/store/table/source/snapshot/InputChangelogFollowUpScannerTest.java b/flink-table-store-core/src/test/java/org/apache/flink/table/store/table/source/snapshot/InputChangelogFollowUpScannerTest.java
new file mode 100644
index 00000000..aec46df2
--- /dev/null
+++ b/flink-table-store-core/src/test/java/org/apache/flink/table/store/table/source/snapshot/InputChangelogFollowUpScannerTest.java
@@ -0,0 +1,97 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.store.table.source.snapshot;
+
+import org.apache.flink.configuration.Configuration;
+import org.apache.flink.table.store.CoreOptions;
+import org.apache.flink.table.store.file.Snapshot;
+import org.apache.flink.table.store.file.utils.SnapshotManager;
+import org.apache.flink.table.store.table.FileStoreTable;
+import org.apache.flink.table.store.table.sink.TableCommit;
+import org.apache.flink.table.store.table.sink.TableWrite;
+import org.apache.flink.table.store.table.source.DataTableScan;
+import org.apache.flink.table.store.table.source.TableRead;
+import org.apache.flink.types.RowKind;
+
+import org.junit.jupiter.api.Test;
+
+import java.util.Arrays;
+
+import static org.assertj.core.api.Assertions.assertThat;
+
+/** Tests for {@link InputChangelogFollowUpScanner}. */
+public class InputChangelogFollowUpScannerTest extends SnapshotEnumeratorTestBase {
+
+    @Test
+    public void testGetPlan() throws Exception {
+        FileStoreTable table = createFileStoreTable();
+        SnapshotManager snapshotManager = table.snapshotManager();
+        TableWrite write = table.newWrite(commitUser);
+        TableCommit commit = table.newCommit(commitUser);
+
+        write.write(rowData(1, 10, 100L));
+        write.write(rowData(1, 20, 200L));
+        write.write(rowData(1, 40, 400L));
+        commit.commit(0, write.prepareCommit(true, 0));
+
+        write.write(rowData(1, 10, 101L));
+        write.write(rowData(1, 30, 300L));
+        write.write(rowData(1, 10, 102L));
+        write.write(rowDataWithKind(RowKind.DELETE, 1, 40, 400L));
+        write.compact(binaryRow(1), 0, true);
+        commit.commit(1, write.prepareCommit(true, 1));
+
+        assertThat(snapshotManager.latestSnapshotId()).isEqualTo(3);
+
+        DataTableScan scan = table.newScan();
+        TableRead read = table.newRead();
+        InputChangelogFollowUpScanner scanner = new InputChangelogFollowUpScanner();
+
+        Snapshot snapshot = snapshotManager.snapshot(1);
+        assertThat(snapshot.commitKind()).isEqualTo(Snapshot.CommitKind.APPEND);
+        assertThat(scanner.shouldScanSnapshot(snapshot)).isTrue();
+        DataTableScan.DataFilePlan plan = scanner.getPlan(1, scan);
+        assertThat(plan.snapshotId).isEqualTo(1);
+        assertThat(getResult(read, plan.splits()))
+                .hasSameElementsAs(Arrays.asList("+I 1|10|100", "+I 1|20|200", "+I 1|40|400"));
+
+        snapshot = snapshotManager.snapshot(2);
+        assertThat(snapshot.commitKind()).isEqualTo(Snapshot.CommitKind.APPEND);
+        assertThat(scanner.shouldScanSnapshot(snapshot)).isTrue();
+        plan = scanner.getPlan(2, scan);
+        assertThat(plan.snapshotId).isEqualTo(2);
+        assertThat(getResult(read, plan.splits()))
+                .hasSameElementsAs(
+                        Arrays.asList("+I 1|10|101", "+I 1|30|300", "+I 1|10|102", "-D 1|40|400"));
+
+        snapshot = snapshotManager.snapshot(3);
+        assertThat(snapshot.commitKind()).isEqualTo(Snapshot.CommitKind.COMPACT);
+        assertThat(scanner.shouldScanSnapshot(snapshot)).isFalse();
+
+        write.close();
+        commit.close();
+    }
+
+    @Override
+    protected FileStoreTable createFileStoreTable() throws Exception {
+        Configuration conf = new Configuration();
+        conf.set(CoreOptions.CHANGELOG_PRODUCER, CoreOptions.ChangelogProducer.INPUT);
+        return createFileStoreTable(conf);
+    }
+}
diff --git a/flink-table-store-core/src/test/java/org/apache/flink/table/store/table/source/snapshot/InputChangelogSnapshotEnumeratorTest.java b/flink-table-store-core/src/test/java/org/apache/flink/table/store/table/source/snapshot/InputChangelogSnapshotEnumeratorTest.java
deleted file mode 100644
index 2c2fc091..00000000
--- a/flink-table-store-core/src/test/java/org/apache/flink/table/store/table/source/snapshot/InputChangelogSnapshotEnumeratorTest.java
+++ /dev/null
@@ -1,239 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.table.store.table.source.snapshot;
-
-import org.apache.flink.configuration.Configuration;
-import org.apache.flink.table.store.CoreOptions;
-import org.apache.flink.table.store.table.FileStoreTable;
-import org.apache.flink.table.store.table.sink.TableCommit;
-import org.apache.flink.table.store.table.sink.TableWrite;
-import org.apache.flink.table.store.table.source.DataTableScan;
-import org.apache.flink.table.store.table.source.TableRead;
-import org.apache.flink.types.RowKind;
-
-import org.junit.jupiter.api.Test;
-
-import java.util.Arrays;
-
-import static org.assertj.core.api.Assertions.assertThat;
-
-/** Tests for {@link InputChangelogSnapshotEnumerator}. */
-public class InputChangelogSnapshotEnumeratorTest extends DataFileSnapshotEnumeratorTestBase {
-
-    @Test
-    public void testFullStartupMode() throws Exception {
-        FileStoreTable table = createFileStoreTable();
-        TableRead read = table.newRead();
-        TableWrite write = table.newWrite(commitUser);
-        TableCommit commit = table.newCommit(commitUser);
-        SnapshotEnumerator enumerator =
-                new InputChangelogSnapshotEnumerator(
-                        tablePath, table.newScan(), CoreOptions.StartupMode.FULL, null, null);
-
-        // first call without any snapshot, should return null
-        assertThat(enumerator.enumerate()).isNull();
-
-        // write base data
-        write.write(rowData(1, 10, 100L));
-        write.write(rowData(1, 20, 200L));
-        write.write(rowData(1, 40, 400L));
-        commit.commit(0, write.prepareCommit(true, 0));
-
-        write.write(rowData(1, 10, 101L));
-        write.write(rowData(1, 30, 300L));
-        write.write(rowDataWithKind(RowKind.DELETE, 1, 40, 400L));
-        commit.commit(1, write.prepareCommit(true, 1));
-
-        // first call with snapshot, should return complete records from 2nd commit
-        DataTableScan.DataFilePlan plan = enumerator.enumerate();
-        assertThat(plan.snapshotId).isEqualTo(2);
-        assertThat(getResult(read, plan.splits()))
-                .hasSameElementsAs(Arrays.asList("+I 1|10|101", "+I 1|20|200", "+I 1|30|300"));
-
-        // incremental call without new snapshots, should return null
-        assertThat(enumerator.enumerate()).isNull();
-
-        // write incremental data
-        write.write(rowDataWithKind(RowKind.DELETE, 1, 10, 101L));
-        write.write(rowData(1, 20, 201L));
-        write.write(rowData(1, 10, 102L));
-        write.write(rowData(1, 40, 400L));
-        commit.commit(2, write.prepareCommit(true, 2));
-
-        write.write(rowData(1, 10, 103L));
-        write.write(rowDataWithKind(RowKind.DELETE, 1, 40, 400L));
-        write.write(rowData(1, 50, 500L));
-        commit.commit(3, write.prepareCommit(true, 3));
-
-        // first incremental call, should return incremental records from 3rd commit
-        plan = enumerator.enumerate();
-        assertThat(plan.snapshotId).isEqualTo(3);
-        assertThat(getResult(read, plan.splits()))
-                .hasSameElementsAs(
-                        Arrays.asList("-D 1|10|101", "+I 1|10|102", "+I 1|20|201", "+I 1|40|400"));
-
-        // second incremental call, should return incremental records from 4th commit
-        plan = enumerator.enumerate();
-        assertThat(plan.snapshotId).isEqualTo(4);
-        assertThat(getResult(read, plan.splits()))
-                .hasSameElementsAs(Arrays.asList("+I 1|10|103", "-D 1|40|400", "+I 1|50|500"));
-
-        // no more new snapshots, should return null
-        assertThat(enumerator.enumerate()).isNull();
-
-        write.close();
-        commit.close();
-    }
-
-    @Test
-    public void testFromTimestampStartupMode() throws Exception {
-        FileStoreTable table = createFileStoreTable();
-        TableRead read = table.newRead();
-        TableWrite write = table.newWrite(commitUser);
-        TableCommit commit = table.newCommit(commitUser);
-
-        write.write(rowData(1, 10, 100L));
-        write.write(rowData(1, 20, 200L));
-        write.write(rowData(1, 40, 400L));
-        commit.commit(0, write.prepareCommit(true, 0));
-
-        // log current time millis, we'll start from here
-        Thread.sleep(50);
-        long startMillis = System.currentTimeMillis();
-        Thread.sleep(50);
-
-        write.write(rowData(1, 10, 101L));
-        write.write(rowData(1, 30, 300L));
-        write.write(rowDataWithKind(RowKind.DELETE, 1, 40, 400L));
-        commit.commit(1, write.prepareCommit(true, 1));
-
-        write.write(rowDataWithKind(RowKind.DELETE, 1, 10, 101L));
-        write.write(rowData(1, 20, 201L));
-        write.write(rowData(1, 10, 102L));
-        write.write(rowData(1, 40, 400L));
-        commit.commit(2, write.prepareCommit(true, 2));
-
-        // first call with snapshot, should return empty plan
-        SnapshotEnumerator enumerator =
-                new InputChangelogSnapshotEnumerator(
-                        tablePath,
-                        table.newScan(),
-                        CoreOptions.StartupMode.FROM_TIMESTAMP,
-                        startMillis,
-                        null);
-
-        DataTableScan.DataFilePlan plan = enumerator.enumerate();
-        assertThat(plan.snapshotId).isEqualTo(1);
-        assertThat(plan.splits()).isEmpty();
-
-        // first incremental call, should return incremental records from 2nd commit
-        plan = enumerator.enumerate();
-        assertThat(plan.snapshotId).isEqualTo(2);
-        assertThat(getResult(read, plan.splits()))
-                .hasSameElementsAs(Arrays.asList("+I 1|10|101", "+I 1|30|300", "-D 1|40|400"));
-
-        // second incremental call, should return incremental records from 3rd commit
-        plan = enumerator.enumerate();
-        assertThat(plan.snapshotId).isEqualTo(3);
-        assertThat(getResult(read, plan.splits()))
-                .hasSameElementsAs(
-                        Arrays.asList("-D 1|10|101", "+I 1|10|102", "+I 1|20|201", "+I 1|40|400"));
-
-        // no new snapshots
-        assertThat(enumerator.enumerate()).isNull();
-
-        // more incremental records
-        write.write(rowData(1, 10, 103L));
-        write.write(rowDataWithKind(RowKind.DELETE, 1, 40, 400L));
-        write.write(rowData(1, 50, 500L));
-        commit.commit(3, write.prepareCommit(true, 3));
-
-        plan = enumerator.enumerate();
-        assertThat(plan.snapshotId).isEqualTo(4);
-        assertThat(getResult(read, plan.splits()))
-                .hasSameElementsAs(Arrays.asList("+I 1|10|103", "-D 1|40|400", "+I 1|50|500"));
-
-        assertThat(enumerator.enumerate()).isNull();
-
-        write.close();
-        commit.close();
-    }
-
-    @Test
-    public void testLatestStartupMode() throws Exception {
-        FileStoreTable table = createFileStoreTable();
-        TableRead read = table.newRead();
-        TableWrite write = table.newWrite(commitUser);
-        TableCommit commit = table.newCommit(commitUser);
-
-        write.write(rowData(1, 10, 100L));
-        write.write(rowData(1, 20, 200L));
-        write.write(rowData(1, 40, 400L));
-        commit.commit(0, write.prepareCommit(true, 0));
-
-        write.write(rowData(1, 10, 101L));
-        write.write(rowData(1, 30, 300L));
-        write.write(rowDataWithKind(RowKind.DELETE, 1, 40, 400L));
-        commit.commit(1, write.prepareCommit(true, 1));
-
-        SnapshotEnumerator enumerator =
-                new InputChangelogSnapshotEnumerator(
-                        tablePath, table.newScan(), CoreOptions.StartupMode.LATEST, null, null);
-
-        DataTableScan.DataFilePlan plan = enumerator.enumerate();
-        assertThat(plan.snapshotId).isEqualTo(2);
-        assertThat(plan.splits()).isEmpty();
-
-        assertThat(enumerator.enumerate()).isNull();
-
-        write.write(rowDataWithKind(RowKind.DELETE, 1, 10, 101L));
-        write.write(rowData(1, 20, 201L));
-        write.write(rowData(1, 10, 102L));
-        write.write(rowData(1, 40, 400L));
-        commit.commit(2, write.prepareCommit(true, 2));
-
-        plan = enumerator.enumerate();
-        assertThat(plan.snapshotId).isEqualTo(3);
-        assertThat(getResult(read, plan.splits()))
-                .hasSameElementsAs(
-                        Arrays.asList("-D 1|10|101", "+I 1|10|102", "+I 1|20|201", "+I 1|40|400"));
-        assertThat(enumerator.enumerate()).isNull();
-
-        write.write(rowData(1, 10, 103L));
-        write.write(rowDataWithKind(RowKind.DELETE, 1, 40, 400L));
-        write.write(rowData(1, 50, 500L));
-        commit.commit(3, write.prepareCommit(true, 3));
-
-        plan = enumerator.enumerate();
-        assertThat(plan.snapshotId).isEqualTo(4);
-        assertThat(getResult(read, plan.splits()))
-                .hasSameElementsAs(Arrays.asList("+I 1|10|103", "-D 1|40|400", "+I 1|50|500"));
-        assertThat(enumerator.enumerate()).isNull();
-
-        write.close();
-        commit.close();
-    }
-
-    @Override
-    protected Configuration getConf() {
-        Configuration conf = new Configuration();
-        conf.set(CoreOptions.CHANGELOG_PRODUCER, CoreOptions.ChangelogProducer.INPUT);
-        return conf;
-    }
-}
diff --git a/flink-table-store-core/src/test/java/org/apache/flink/table/store/table/source/snapshot/DataFileSnapshotEnumeratorTestBase.java b/flink-table-store-core/src/test/java/org/apache/flink/table/store/table/source/snapshot/SnapshotEnumeratorTestBase.java
similarity index 94%
rename from flink-table-store-core/src/test/java/org/apache/flink/table/store/table/source/snapshot/DataFileSnapshotEnumeratorTestBase.java
rename to flink-table-store-core/src/test/java/org/apache/flink/table/store/table/source/snapshot/SnapshotEnumeratorTestBase.java
index 916df0cc..9a7f628b 100644
--- a/flink-table-store-core/src/test/java/org/apache/flink/table/store/table/source/snapshot/DataFileSnapshotEnumeratorTestBase.java
+++ b/flink-table-store-core/src/test/java/org/apache/flink/table/store/table/source/snapshot/SnapshotEnumeratorTestBase.java
@@ -50,11 +50,11 @@ import java.util.List;
 import java.util.UUID;
 
 /**
- * Base test class for {@link DataFileSnapshotEnumerator}.
+ * Base test class for {@link SnapshotEnumerator} and related classes.
  *
  * <p>TODO: merge this class with FileStoreTableTestBase.
  */
-public abstract class DataFileSnapshotEnumeratorTestBase {
+public abstract class SnapshotEnumeratorTestBase {
 
     private static final RowType ROW_TYPE =
             RowType.of(
@@ -115,7 +115,10 @@ public abstract class DataFileSnapshotEnumeratorTestBase {
     }
 
     protected FileStoreTable createFileStoreTable() throws Exception {
-        Configuration conf = getConf();
+        return createFileStoreTable(new Configuration());
+    }
+
+    protected FileStoreTable createFileStoreTable(Configuration conf) throws Exception {
         SchemaManager schemaManager = new SchemaManager(tablePath);
         TableSchema tableSchema =
                 schemaManager.commitNewVersion(
@@ -127,6 +130,4 @@ public abstract class DataFileSnapshotEnumeratorTestBase {
                                 ""));
         return FileStoreTableFactory.create(tablePath, tableSchema, conf);
     }
-
-    protected abstract Configuration getConf();
 }
diff --git a/flink-table-store-core/src/test/java/org/apache/flink/table/store/table/source/snapshot/StaticDataFileSnapshotEnumeratorTest.java b/flink-table-store-core/src/test/java/org/apache/flink/table/store/table/source/snapshot/StaticDataFileSnapshotEnumeratorTest.java
new file mode 100644
index 00000000..a56efc33
--- /dev/null
+++ b/flink-table-store-core/src/test/java/org/apache/flink/table/store/table/source/snapshot/StaticDataFileSnapshotEnumeratorTest.java
@@ -0,0 +1,72 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.store.table.source.snapshot;
+
+import org.apache.flink.table.store.file.utils.SnapshotManager;
+import org.apache.flink.table.store.table.FileStoreTable;
+import org.apache.flink.table.store.table.sink.TableCommit;
+import org.apache.flink.table.store.table.sink.TableWrite;
+import org.apache.flink.table.store.table.source.DataTableScan;
+import org.apache.flink.types.RowKind;
+
+import org.junit.jupiter.api.Test;
+
+import java.util.Arrays;
+
+import static org.assertj.core.api.Assertions.assertThat;
+
+/** Tests for {@link StaticDataFileSnapshotEnumerator}. */
+public class StaticDataFileSnapshotEnumeratorTest extends SnapshotEnumeratorTestBase {
+
+    @Test
+    public void testEnumerate() throws Exception {
+        FileStoreTable table = createFileStoreTable();
+        SnapshotManager snapshotManager = table.snapshotManager();
+        TableWrite write = table.newWrite(commitUser);
+        TableCommit commit = table.newCommit(commitUser);
+        SnapshotEnumerator enumerator =
+                StaticDataFileSnapshotEnumerator.create(table, table.newScan());
+
+        write.write(rowData(1, 10, 100L));
+        write.write(rowData(1, 20, 200L));
+        write.write(rowData(1, 40, 400L));
+        commit.commit(0, write.prepareCommit(true, 0));
+
+        write.write(rowData(1, 10, 101L));
+        write.write(rowData(1, 30, 300L));
+        write.write(rowDataWithKind(RowKind.DELETE, 1, 40, 400L));
+        commit.commit(1, write.prepareCommit(true, 1));
+
+        assertThat(snapshotManager.latestSnapshotId()).isEqualTo(2);
+
+        DataTableScan.DataFilePlan plan = enumerator.enumerate();
+        assertThat(plan.snapshotId).isEqualTo(2);
+        assertThat(getResult(table.newRead(), plan.splits()))
+                .hasSameElementsAs(Arrays.asList("+I 1|10|101", "+I 1|20|200", "+I 1|30|300"));
+
+        write.write(rowData(1, 10, 102L));
+        write.write(rowData(1, 30, 301L));
+        commit.commit(2, write.prepareCommit(true, 2));
+
+        assertThat(enumerator.enumerate()).isNull();
+
+        write.close();
+        commit.close();
+    }
+}