You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by cz...@apache.org on 2022/12/26 03:14:17 UTC

[flink-table-store] branch master updated: [FLINK-30247] Introduce time travel reading for table store

This is an automated email from the ASF dual-hosted git repository.

czweng pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/flink-table-store.git


The following commit(s) were added to refs/heads/master by this push:
     new 7e9cf0dd [FLINK-30247] Introduce time travel reading for table store
7e9cf0dd is described below

commit 7e9cf0dd907c84b67850503ff61e27944363d06b
Author: shammon <zj...@gmail.com>
AuthorDate: Mon Dec 26 11:14:11 2022 +0800

    [FLINK-30247] Introduce time travel reading for table store
    
    This closes #410.
---
 docs/content/docs/how-to/querying-tables.md        | 11 ++-
 .../shortcodes/generated/core_configuration.html   |  8 +-
 .../store/connector/BatchFileStoreITCase.java      | 90 ++++++++++++++++++++++
 .../store/connector/ContinuousFileStoreITCase.java | 31 ++++++++
 .../org/apache/flink/table/store/CoreOptions.java  | 75 ++++++++++++++----
 .../table/store/file/utils/SnapshotManager.java    | 17 ++++
 .../ContinuousDataFileSnapshotEnumerator.java      | 12 ++-
 .../ContinuousFromSnapshotStartingScanner.java     | 51 ++++++++++++
 .../snapshot/StaticDataFileSnapshotEnumerator.java | 21 +++++
 .../StaticFromSnapshotStartingScanner.java         | 44 +++++++++++
 .../StaticFromTimestampStartingScanner.java        | 55 +++++++++++++
 11 files changed, 393 insertions(+), 22 deletions(-)

diff --git a/docs/content/docs/how-to/querying-tables.md b/docs/content/docs/how-to/querying-tables.md
index 39140e3c..d54edd05 100644
--- a/docs/content/docs/how-to/querying-tables.md
+++ b/docs/content/docs/how-to/querying-tables.md
@@ -44,7 +44,7 @@ By specifying the `scan.mode` table property, users can specify where and how Ta
 <tr>
 <td>default</td>
 <td colspan="2">
-The default scan mode. Determines actual scan mode according to other table properties. If "scan.timestamp-millis" is set the actual scan mode will be "from-timestamp". Otherwise the actual scan mode will be "latest-full".
+The default scan mode. Determines actual scan mode according to other table properties. If "scan.timestamp-millis" is set the actual scan mode will be "from-timestamp", and if "scan.snapshot-id" is set the actual startup mode will be "from-snapshot". Otherwise the actual scan mode will be "latest-full".
 </td>
 </tr>
 <tr>
@@ -72,9 +72,14 @@ Produces the snapshot after the latest compaction on the table upon first startu
 </tr>
 <tr>
 <td>from-timestamp</td>
-<td>Unsupported</td>
+<td>Produces a snapshot earlier than or equals to the timestamp specified by "scan.timestamp-millis".</td>
 <td>Continuously reads changes starting from timestamp specified by "scan.timestamp-millis", without producing a snapshot at the beginning.</td>
 </tr>
+<tr>
+<td>from-snapshot</td>
+<td>Produces a snapshot specified by "scan.snapshot-id".</td>
+<td>Continuously reads changes starting from a snapshot specified by "scan.snapshot-id", without producing a snapshot at the beginning.</td>
+</tr>
 </tbody>
 </table>
 
@@ -179,4 +184,4 @@ SELECT * FROM MyTable$audit_log;
 +------------------+-----------------+-----------------+
 3 rows in set
 */
-```
+```
\ No newline at end of file
diff --git a/docs/layouts/shortcodes/generated/core_configuration.html b/docs/layouts/shortcodes/generated/core_configuration.html
index 369aea41..069444ad 100644
--- a/docs/layouts/shortcodes/generated/core_configuration.html
+++ b/docs/layouts/shortcodes/generated/core_configuration.html
@@ -192,7 +192,13 @@
             <td><h5>scan.mode</h5></td>
             <td style="word-wrap: break-word;">default</td>
             <td><p>Enum</p></td>
-            <td>Specify the scanning behavior of the source.<br /><br />Possible values:<ul><li>"default": Determines actual startup mode according to other table properties. If "scan.timestamp-millis" is set the actual startup mode will be "from-timestamp". Otherwise the actual startup mode will be "latest-full".</li><li>"latest-full": For streaming sources, produces the latest snapshot on the table upon first startup, and continue to read the latest changes. For batch sources, just pro [...]
+            <td>Specify the scanning behavior of the source.<br /><br />Possible values:<ul><li>"default": Determines actual startup mode according to other table properties. If "scan.timestamp-millis" is set the actual startup mode will be "from-timestamp", and if "scan.snapshot-id" is set the actual startup mode will be "from-snapshot". Otherwise the actual startup mode will be "latest-full".</li><li>"latest-full": For streaming sources, produces the latest snapshot on the table upon f [...]
+        </tr>
+        <tr>
+            <td><h5>scan.snapshot-id</h5></td>
+            <td style="word-wrap: break-word;">(none)</td>
+            <td>Long</td>
+            <td>Optional snapshot id used in case of "from-snapshot" scan mode</td>
         </tr>
         <tr>
             <td><h5>scan.timestamp-millis</h5></td>
diff --git a/flink-table-store-connector/src/test/java/org/apache/flink/table/store/connector/BatchFileStoreITCase.java b/flink-table-store-connector/src/test/java/org/apache/flink/table/store/connector/BatchFileStoreITCase.java
index 131ddd99..b9560afe 100644
--- a/flink-table-store-connector/src/test/java/org/apache/flink/table/store/connector/BatchFileStoreITCase.java
+++ b/flink-table-store-connector/src/test/java/org/apache/flink/table/store/connector/BatchFileStoreITCase.java
@@ -18,6 +18,7 @@
 
 package org.apache.flink.table.store.connector;
 
+import org.apache.flink.table.store.CoreOptions;
 import org.apache.flink.types.Row;
 
 import org.junit.Test;
@@ -26,6 +27,7 @@ import java.util.Collections;
 import java.util.List;
 
 import static org.assertj.core.api.Assertions.assertThat;
+import static org.assertj.core.api.Assertions.assertThatThrownBy;
 
 /** ITCase for batch file store. */
 public class BatchFileStoreITCase extends FileStoreTableITCase {
@@ -50,4 +52,92 @@ public class BatchFileStoreITCase extends FileStoreTableITCase {
         assertThat(batchSql("SELECT * FROM T /*+ OPTIONS('scan.mode'='compacted-full') */"))
                 .isEmpty();
     }
+
+    @Test
+    public void testTimeTravelRead() throws InterruptedException {
+        batchSql("INSERT INTO T VALUES (1, 11, 111), (2, 22, 222)");
+        long time1 = System.currentTimeMillis();
+
+        Thread.sleep(10);
+        batchSql("INSERT INTO T VALUES (3, 33, 333), (4, 44, 444)");
+        long time2 = System.currentTimeMillis();
+
+        Thread.sleep(10);
+        batchSql("INSERT INTO T VALUES (5, 55, 555), (6, 66, 666)");
+        long time3 = System.currentTimeMillis();
+
+        Thread.sleep(10);
+        batchSql("INSERT INTO T VALUES (7, 77, 777), (8, 88, 888)");
+
+        assertThat(batchSql("SELECT * FROM T /*+ OPTIONS('scan.snapshot-id'='1') */"))
+                .containsExactlyInAnyOrder(Row.of(1, 11, 111), Row.of(2, 22, 222));
+
+        assertThat(batchSql("SELECT * FROM T /*+ OPTIONS('scan.snapshot-id'='0') */")).isEmpty();
+
+        assertThat(
+                        batchSql(
+                                String.format(
+                                        "SELECT * FROM T /*+ OPTIONS('scan.timestamp-millis'='%s') */",
+                                        time1)))
+                .containsExactlyInAnyOrder(Row.of(1, 11, 111), Row.of(2, 22, 222));
+
+        assertThat(batchSql("SELECT * FROM T /*+ OPTIONS('scan.snapshot-id'='2') */"))
+                .containsExactlyInAnyOrder(
+                        Row.of(1, 11, 111),
+                        Row.of(2, 22, 222),
+                        Row.of(3, 33, 333),
+                        Row.of(4, 44, 444));
+        assertThat(
+                        batchSql(
+                                String.format(
+                                        "SELECT * FROM T /*+ OPTIONS('scan.timestamp-millis'='%s') */",
+                                        time2)))
+                .containsExactlyInAnyOrder(
+                        Row.of(1, 11, 111),
+                        Row.of(2, 22, 222),
+                        Row.of(3, 33, 333),
+                        Row.of(4, 44, 444));
+
+        assertThat(batchSql("SELECT * FROM T /*+ OPTIONS('scan.snapshot-id'='3') */"))
+                .containsExactlyInAnyOrder(
+                        Row.of(1, 11, 111),
+                        Row.of(2, 22, 222),
+                        Row.of(3, 33, 333),
+                        Row.of(4, 44, 444),
+                        Row.of(5, 55, 555),
+                        Row.of(6, 66, 666));
+        assertThat(
+                        batchSql(
+                                String.format(
+                                        "SELECT * FROM T /*+ OPTIONS('scan.timestamp-millis'='%s') */",
+                                        time3)))
+                .containsExactlyInAnyOrder(
+                        Row.of(1, 11, 111),
+                        Row.of(2, 22, 222),
+                        Row.of(3, 33, 333),
+                        Row.of(4, 44, 444),
+                        Row.of(5, 55, 555),
+                        Row.of(6, 66, 666));
+
+        assertThatThrownBy(
+                        () ->
+                                batchSql(
+                                        String.format(
+                                                "SELECT * FROM T /*+ OPTIONS('scan.timestamp-millis'='%s', 'scan.snapshot-id'='1') */",
+                                                time3)))
+                .hasRootCauseInstanceOf(IllegalArgumentException.class)
+                .hasRootCauseMessage(
+                        "%s must be null when you set %s",
+                        CoreOptions.SCAN_SNAPSHOT_ID.key(),
+                        CoreOptions.SCAN_TIMESTAMP_MILLIS.key());
+
+        assertThatThrownBy(
+                        () ->
+                                batchSql(
+                                        "SELECT * FROM T /*+ OPTIONS('scan.mode'='full', 'scan.snapshot-id'='1') */"))
+                .hasRootCauseInstanceOf(IllegalArgumentException.class)
+                .hasRootCauseMessage(
+                        "%s must be null when you use latest-full for scan.mode",
+                        CoreOptions.SCAN_SNAPSHOT_ID.key());
+    }
 }
diff --git a/flink-table-store-connector/src/test/java/org/apache/flink/table/store/connector/ContinuousFileStoreITCase.java b/flink-table-store-connector/src/test/java/org/apache/flink/table/store/connector/ContinuousFileStoreITCase.java
index 9195b9c6..829ed6ca 100644
--- a/flink-table-store-connector/src/test/java/org/apache/flink/table/store/connector/ContinuousFileStoreITCase.java
+++ b/flink-table-store-connector/src/test/java/org/apache/flink/table/store/connector/ContinuousFileStoreITCase.java
@@ -215,6 +215,37 @@ public class ContinuousFileStoreITCase extends FileStoreTableITCase {
                 .hasMessageContaining("Unable to create a source for reading table");
     }
 
+    @Test
+    public void testConfigureStartupSnapshot() throws Exception {
+        // Configure 'scan.snapshot-id' without 'scan.mode'.
+        BlockingIterator<Row, Row> iterator =
+                BlockingIterator.of(
+                        streamSqlIter(
+                                "SELECT * FROM T1 /*+ OPTIONS('scan.snapshot-id'='%s') */", 1));
+        batchSql("INSERT INTO T1 VALUES ('1', '2', '3'), ('4', '5', '6')");
+        batchSql("INSERT INTO T1 VALUES ('7', '8', '9'), ('10', '11', '12')");
+        assertThat(iterator.collect(2))
+                .containsExactlyInAnyOrder(Row.of("1", "2", "3"), Row.of("4", "5", "6"));
+        iterator.close();
+
+        // Start from earliest snapshot
+        iterator =
+                BlockingIterator.of(
+                        streamSqlIter(
+                                "SELECT * FROM T1 /*+ OPTIONS('scan.snapshot-id'='%s') */", 0));
+        assertThat(iterator.collect(2))
+                .containsExactlyInAnyOrder(Row.of("1", "2", "3"), Row.of("4", "5", "6"));
+        iterator.close();
+
+        // Configure 'scan.snapshot-id' with 'scan.mode=latest'.
+        assertThatThrownBy(
+                        () ->
+                                streamSqlIter(
+                                        "SELECT * FROM T1 /*+ OPTIONS('scan.mode'='latest', 'scan.snapshot-id'='%s') */",
+                                        0))
+                .hasMessageContaining("Unable to create a source for reading table");
+    }
+
     @Test
     public void testIgnoreOverwrite() throws TimeoutException {
         BlockingIterator<Row, Row> iterator =
diff --git a/flink-table-store-core/src/main/java/org/apache/flink/table/store/CoreOptions.java b/flink-table-store-core/src/main/java/org/apache/flink/table/store/CoreOptions.java
index 280a1ab2..22974af1 100644
--- a/flink-table-store-core/src/main/java/org/apache/flink/table/store/CoreOptions.java
+++ b/flink-table-store-core/src/main/java/org/apache/flink/table/store/CoreOptions.java
@@ -340,6 +340,13 @@ public class CoreOptions implements Serializable {
                     .withDescription(
                             "Optional timestamp used in case of \"from-timestamp\" scan mode.");
 
+    public static final ConfigOption<Long> SCAN_SNAPSHOT_ID =
+            ConfigOptions.key("scan.snapshot-id")
+                    .longType()
+                    .noDefaultValue()
+                    .withDescription(
+                            "Optional snapshot id used in case of \"from-snapshot\" scan mode");
+
     public static final ConfigOption<Duration> LOG_RETENTION =
             ConfigOptions.key("log.retention")
                     .durationType()
@@ -543,6 +550,8 @@ public class CoreOptions implements Serializable {
         if (mode == StartupMode.DEFAULT) {
             if (options.getOptional(SCAN_TIMESTAMP_MILLIS).isPresent()) {
                 return StartupMode.FROM_TIMESTAMP;
+            } else if (options.getOptional(SCAN_SNAPSHOT_ID).isPresent()) {
+                return StartupMode.FROM_SNAPSHOT;
             } else {
                 return StartupMode.LATEST_FULL;
             }
@@ -553,10 +562,14 @@ public class CoreOptions implements Serializable {
         }
     }
 
-    public Long logScanTimestampMills() {
+    public Long scanTimestampMills() {
         return options.get(SCAN_TIMESTAMP_MILLIS);
     }
 
+    public Long scanSnapshotId() {
+        return options.get(SCAN_SNAPSHOT_ID);
+    }
+
     public Duration changelogProducerFullCompactionTriggerInterval() {
         return options.get(CHANGELOG_PRODUCER_FULL_COMPACTION_TRIGGER_INTERVAL);
     }
@@ -605,7 +618,8 @@ public class CoreOptions implements Serializable {
         DEFAULT(
                 "default",
                 "Determines actual startup mode according to other table properties. "
-                        + "If \"scan.timestamp-millis\" is set the actual startup mode will be \"from-timestamp\". "
+                        + "If \"scan.timestamp-millis\" is set the actual startup mode will be \"from-timestamp\", "
+                        + "and if \"scan.snapshot-id\" is set the actual startup mode will be \"from-snapshot\". "
                         + "Otherwise the actual startup mode will be \"latest-full\"."),
 
         LATEST_FULL(
@@ -634,7 +648,15 @@ public class CoreOptions implements Serializable {
                 "For streaming sources, continuously reads changes "
                         + "starting from timestamp specified by \"scan.timestamp-millis\", "
                         + "without producing a snapshot at the beginning. "
-                        + "Unsupported for batch sources.");
+                        + "For batch sources, produces a snapshot at timestamp specified by \"scan.timestamp-millis\" "
+                        + "but does not read new changes."),
+
+        FROM_SNAPSHOT(
+                "from-snapshot",
+                "For streaming sources, continuously reads changes "
+                        + "starting from snapshot specified by \"scan.snapshot-id\", "
+                        + "without producing a snapshot at the beginning. For batch sources, "
+                        + "produces a snapshot specified by \"scan.snapshot-id\" but does not read new changes.");
 
         private final String value;
         private final String description;
@@ -766,21 +788,14 @@ public class CoreOptions implements Serializable {
     public static void validateTableSchema(TableSchema schema) {
         CoreOptions options = new CoreOptions(schema.options());
         if (options.startupMode() == StartupMode.FROM_TIMESTAMP) {
-            Preconditions.checkArgument(
-                    options.logScanTimestampMills() != null,
-                    String.format(
-                            "%s can not be null when you use %s for %s",
-                            SCAN_TIMESTAMP_MILLIS.key(),
-                            StartupMode.FROM_TIMESTAMP,
-                            SCAN_MODE.key()));
+            checkOptionExistInMode(options, SCAN_TIMESTAMP_MILLIS, StartupMode.FROM_TIMESTAMP);
+            checkOptionsConflict(options, SCAN_SNAPSHOT_ID, SCAN_TIMESTAMP_MILLIS);
+        } else if (options.startupMode() == StartupMode.FROM_SNAPSHOT) {
+            checkOptionExistInMode(options, SCAN_SNAPSHOT_ID, StartupMode.FROM_SNAPSHOT);
+            checkOptionsConflict(options, SCAN_TIMESTAMP_MILLIS, SCAN_SNAPSHOT_ID);
         } else {
-            Preconditions.checkArgument(
-                    options.logScanTimestampMills() == null,
-                    String.format(
-                            "%s should be %s when you set %s",
-                            SCAN_MODE.key(),
-                            StartupMode.FROM_TIMESTAMP,
-                            SCAN_TIMESTAMP_MILLIS.key()));
+            checkOptionNotExistInMode(options, SCAN_TIMESTAMP_MILLIS, options.startupMode());
+            checkOptionNotExistInMode(options, SCAN_SNAPSHOT_ID, options.startupMode());
         }
 
         Preconditions.checkArgument(
@@ -817,6 +832,32 @@ public class CoreOptions implements Serializable {
                         });
     }
 
+    private static void checkOptionExistInMode(
+            CoreOptions options, ConfigOption<?> option, StartupMode startupMode) {
+        Preconditions.checkArgument(
+                options.options.contains(option),
+                String.format(
+                        "%s can not be null when you use %s for %s",
+                        option.key(), startupMode, SCAN_MODE.key()));
+    }
+
+    private static void checkOptionNotExistInMode(
+            CoreOptions options, ConfigOption<?> option, StartupMode startupMode) {
+        Preconditions.checkArgument(
+                !options.options.contains(option),
+                String.format(
+                        "%s must be null when you use %s for %s",
+                        option.key(), startupMode, SCAN_MODE.key()));
+    }
+
+    private static void checkOptionsConflict(
+            CoreOptions options, ConfigOption<?> illegalOption, ConfigOption<?> legalOption) {
+        Preconditions.checkArgument(
+                !options.options.contains(illegalOption),
+                String.format(
+                        "%s must be null when you set %s", illegalOption.key(), legalOption.key()));
+    }
+
     @Internal
     public static List<ConfigOption<?>> getOptions() {
         final Field[] fields = CoreOptions.class.getFields();
diff --git a/flink-table-store-core/src/main/java/org/apache/flink/table/store/file/utils/SnapshotManager.java b/flink-table-store-core/src/main/java/org/apache/flink/table/store/file/utils/SnapshotManager.java
index 8400a0cb..a7004afe 100644
--- a/flink-table-store-core/src/main/java/org/apache/flink/table/store/file/utils/SnapshotManager.java
+++ b/flink-table-store-core/src/main/java/org/apache/flink/table/store/file/utils/SnapshotManager.java
@@ -129,6 +129,23 @@ public class SnapshotManager {
         return earliest - 1;
     }
 
+    /** Returns a snapshot earlier than or equals to the timestamp mills. */
+    public @Nullable Long earlierOrEqualTimeMills(long timestampMills) {
+        Long earliest = earliestSnapshotId();
+        Long latest = latestSnapshotId();
+        if (earliest == null || latest == null) {
+            return null;
+        }
+
+        for (long i = latest; i >= earliest; i--) {
+            long commitTime = snapshot(i).timeMillis();
+            if (commitTime <= timestampMills) {
+                return i;
+            }
+        }
+        return null;
+    }
+
     public long snapshotCount() throws IOException {
         return listVersionedFiles(snapshotDirectory(), SNAPSHOT_PREFIX).count();
     }
diff --git a/flink-table-store-core/src/main/java/org/apache/flink/table/store/table/source/snapshot/ContinuousDataFileSnapshotEnumerator.java b/flink-table-store-core/src/main/java/org/apache/flink/table/store/table/source/snapshot/ContinuousDataFileSnapshotEnumerator.java
index ba69827d..bee9b94f 100644
--- a/flink-table-store-core/src/main/java/org/apache/flink/table/store/table/source/snapshot/ContinuousDataFileSnapshotEnumerator.java
+++ b/flink-table-store-core/src/main/java/org/apache/flink/table/store/table/source/snapshot/ContinuousDataFileSnapshotEnumerator.java
@@ -131,7 +131,7 @@ public class ContinuousDataFileSnapshotEnumerator implements SnapshotEnumerator
 
     private static StartingScanner createStartingScanner(DataTable table) {
         CoreOptions.StartupMode startupMode = table.options().startupMode();
-        Long startupMillis = table.options().logScanTimestampMills();
+        Long startupMillis = table.options().scanTimestampMills();
         if (startupMode == CoreOptions.StartupMode.LATEST_FULL) {
             return new FullStartingScanner();
         } else if (startupMode == CoreOptions.StartupMode.LATEST) {
@@ -147,6 +147,16 @@ public class ContinuousDataFileSnapshotEnumerator implements SnapshotEnumerator
                             CoreOptions.StartupMode.FROM_TIMESTAMP,
                             CoreOptions.SCAN_MODE.key()));
             return new ContinuousFromTimestampStartingScanner(startupMillis);
+        } else if (startupMode == CoreOptions.StartupMode.FROM_SNAPSHOT) {
+            Long snapshotId = table.options().scanSnapshotId();
+            Preconditions.checkNotNull(
+                    snapshotId,
+                    String.format(
+                            "%s can not be null when you use %s for %s",
+                            CoreOptions.SCAN_SNAPSHOT_ID.key(),
+                            CoreOptions.StartupMode.FROM_SNAPSHOT,
+                            CoreOptions.SCAN_MODE.key()));
+            return new ContinuousFromSnapshotStartingScanner(snapshotId);
         } else {
             throw new UnsupportedOperationException("Unknown startup mode " + startupMode.name());
         }
diff --git a/flink-table-store-core/src/main/java/org/apache/flink/table/store/table/source/snapshot/ContinuousFromSnapshotStartingScanner.java b/flink-table-store-core/src/main/java/org/apache/flink/table/store/table/source/snapshot/ContinuousFromSnapshotStartingScanner.java
new file mode 100644
index 00000000..4fa08d59
--- /dev/null
+++ b/flink-table-store-core/src/main/java/org/apache/flink/table/store/table/source/snapshot/ContinuousFromSnapshotStartingScanner.java
@@ -0,0 +1,51 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.store.table.source.snapshot;
+
+import org.apache.flink.table.store.file.utils.SnapshotManager;
+import org.apache.flink.table.store.table.source.DataTableScan;
+
+import java.util.Collections;
+
+/**
+ * {@link StartingScanner} for the {@link
+ * org.apache.flink.table.store.CoreOptions.StartupMode#FROM_SNAPSHOT} startup mode of a streaming
+ * read.
+ */
+public class ContinuousFromSnapshotStartingScanner implements StartingScanner {
+
+    private final long snapshotId;
+
+    public ContinuousFromSnapshotStartingScanner(long snapshotId) {
+        this.snapshotId = snapshotId;
+    }
+
+    @Override
+    public DataTableScan.DataFilePlan getPlan(SnapshotManager snapshotManager, DataTableScan scan) {
+        Long earliestSnapshotId = snapshotManager.earliestSnapshotId();
+        if (earliestSnapshotId == null) {
+            return null;
+        }
+        // We should use `snapshotId - 1` here to start to scan delta data from specific snapshot
+        // id. If the snapshotId < earliestSnapshotId, start to scan from the earliest.
+        return new DataTableScan.DataFilePlan(
+                snapshotId >= earliestSnapshotId ? snapshotId - 1 : earliestSnapshotId - 1,
+                Collections.emptyList());
+    }
+}
diff --git a/flink-table-store-core/src/main/java/org/apache/flink/table/store/table/source/snapshot/StaticDataFileSnapshotEnumerator.java b/flink-table-store-core/src/main/java/org/apache/flink/table/store/table/source/snapshot/StaticDataFileSnapshotEnumerator.java
index 4c23caf8..cee17020 100644
--- a/flink-table-store-core/src/main/java/org/apache/flink/table/store/table/source/snapshot/StaticDataFileSnapshotEnumerator.java
+++ b/flink-table-store-core/src/main/java/org/apache/flink/table/store/table/source/snapshot/StaticDataFileSnapshotEnumerator.java
@@ -23,6 +23,7 @@ import org.apache.flink.table.store.CoreOptions;
 import org.apache.flink.table.store.file.utils.SnapshotManager;
 import org.apache.flink.table.store.table.DataTable;
 import org.apache.flink.table.store.table.source.DataTableScan;
+import org.apache.flink.util.Preconditions;
 
 import javax.annotation.Nullable;
 
@@ -69,6 +70,26 @@ public class StaticDataFileSnapshotEnumerator implements SnapshotEnumerator {
             startingScanner = new FullStartingScanner();
         } else if (startupMode == CoreOptions.StartupMode.COMPACTED_FULL) {
             startingScanner = new CompactedStartingScanner();
+        } else if (startupMode == CoreOptions.StartupMode.FROM_TIMESTAMP) {
+            Long startupMillis = table.options().scanTimestampMills();
+            Preconditions.checkNotNull(
+                    startupMillis,
+                    String.format(
+                            "%s can not be null when you use %s for %s",
+                            CoreOptions.SCAN_TIMESTAMP_MILLIS.key(),
+                            CoreOptions.StartupMode.FROM_TIMESTAMP,
+                            CoreOptions.SCAN_MODE.key()));
+            startingScanner = new StaticFromTimestampStartingScanner(startupMillis);
+        } else if (startupMode == CoreOptions.StartupMode.FROM_SNAPSHOT) {
+            Long snapshotId = table.options().scanSnapshotId();
+            Preconditions.checkNotNull(
+                    snapshotId,
+                    String.format(
+                            "%s can not be null when you use %s for %s",
+                            CoreOptions.SCAN_SNAPSHOT_ID.key(),
+                            CoreOptions.StartupMode.FROM_SNAPSHOT,
+                            CoreOptions.SCAN_MODE.key()));
+            startingScanner = new StaticFromSnapshotStartingScanner(snapshotId);
         } else {
             throw new UnsupportedOperationException("Unknown startup mode " + startupMode.name());
         }
diff --git a/flink-table-store-core/src/main/java/org/apache/flink/table/store/table/source/snapshot/StaticFromSnapshotStartingScanner.java b/flink-table-store-core/src/main/java/org/apache/flink/table/store/table/source/snapshot/StaticFromSnapshotStartingScanner.java
new file mode 100644
index 00000000..2cc19567
--- /dev/null
+++ b/flink-table-store-core/src/main/java/org/apache/flink/table/store/table/source/snapshot/StaticFromSnapshotStartingScanner.java
@@ -0,0 +1,44 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.store.table.source.snapshot;
+
+import org.apache.flink.table.store.file.operation.ScanKind;
+import org.apache.flink.table.store.file.utils.SnapshotManager;
+import org.apache.flink.table.store.table.source.DataTableScan;
+
+/**
+ * {@link StartingScanner} for the {@link
+ * org.apache.flink.table.store.CoreOptions.StartupMode#FROM_SNAPSHOT} startup mode of a batch read.
+ */
+public class StaticFromSnapshotStartingScanner implements StartingScanner {
+    private final long snapshotId;
+
+    public StaticFromSnapshotStartingScanner(long snapshotId) {
+        this.snapshotId = snapshotId;
+    }
+
+    @Override
+    public DataTableScan.DataFilePlan getPlan(SnapshotManager snapshotManager, DataTableScan scan) {
+        if (snapshotManager.earliestSnapshotId() == null
+                || snapshotId < snapshotManager.earliestSnapshotId()) {
+            return null;
+        }
+        return scan.withKind(ScanKind.ALL).withSnapshot(snapshotId).plan();
+    }
+}
diff --git a/flink-table-store-core/src/main/java/org/apache/flink/table/store/table/source/snapshot/StaticFromTimestampStartingScanner.java b/flink-table-store-core/src/main/java/org/apache/flink/table/store/table/source/snapshot/StaticFromTimestampStartingScanner.java
new file mode 100644
index 00000000..5b81a4e5
--- /dev/null
+++ b/flink-table-store-core/src/main/java/org/apache/flink/table/store/table/source/snapshot/StaticFromTimestampStartingScanner.java
@@ -0,0 +1,55 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.store.table.source.snapshot;
+
+import org.apache.flink.table.store.file.operation.ScanKind;
+import org.apache.flink.table.store.file.utils.SnapshotManager;
+import org.apache.flink.table.store.table.source.DataTableScan;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+/**
+ * {@link StartingScanner} for the {@link
+ * org.apache.flink.table.store.CoreOptions.StartupMode#FROM_TIMESTAMP} startup mode of a batch
+ * read.
+ */
+public class StaticFromTimestampStartingScanner implements StartingScanner {
+
+    private static final Logger LOG =
+            LoggerFactory.getLogger(StaticFromTimestampStartingScanner.class);
+
+    private final long startupMillis;
+
+    public StaticFromTimestampStartingScanner(long startupMillis) {
+        this.startupMillis = startupMillis;
+    }
+
+    @Override
+    public DataTableScan.DataFilePlan getPlan(SnapshotManager snapshotManager, DataTableScan scan) {
+        Long startingSnapshotId = snapshotManager.earlierOrEqualTimeMills(startupMillis);
+        if (startingSnapshotId == null) {
+            LOG.debug(
+                    "There is currently no snapshot earlier than or equal to timestamp[{}]",
+                    startupMillis);
+            return null;
+        }
+        return scan.withKind(ScanKind.ALL).withSnapshot(startingSnapshotId).plan();
+    }
+}