You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by cz...@apache.org on 2022/12/26 03:14:17 UTC
[flink-table-store] branch master updated: [FLINK-30247] Introduce time travel reading for table store
This is an automated email from the ASF dual-hosted git repository.
czweng pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/flink-table-store.git
The following commit(s) were added to refs/heads/master by this push:
new 7e9cf0dd [FLINK-30247] Introduce time travel reading for table store
7e9cf0dd is described below
commit 7e9cf0dd907c84b67850503ff61e27944363d06b
Author: shammon <zj...@gmail.com>
AuthorDate: Mon Dec 26 11:14:11 2022 +0800
[FLINK-30247] Introduce time travel reading for table store
This closes #410.
---
docs/content/docs/how-to/querying-tables.md | 11 ++-
.../shortcodes/generated/core_configuration.html | 8 +-
.../store/connector/BatchFileStoreITCase.java | 90 ++++++++++++++++++++++
.../store/connector/ContinuousFileStoreITCase.java | 31 ++++++++
.../org/apache/flink/table/store/CoreOptions.java | 75 ++++++++++++++----
.../table/store/file/utils/SnapshotManager.java | 17 ++++
.../ContinuousDataFileSnapshotEnumerator.java | 12 ++-
.../ContinuousFromSnapshotStartingScanner.java | 51 ++++++++++++
.../snapshot/StaticDataFileSnapshotEnumerator.java | 21 +++++
.../StaticFromSnapshotStartingScanner.java | 44 +++++++++++
.../StaticFromTimestampStartingScanner.java | 55 +++++++++++++
11 files changed, 393 insertions(+), 22 deletions(-)
diff --git a/docs/content/docs/how-to/querying-tables.md b/docs/content/docs/how-to/querying-tables.md
index 39140e3c..d54edd05 100644
--- a/docs/content/docs/how-to/querying-tables.md
+++ b/docs/content/docs/how-to/querying-tables.md
@@ -44,7 +44,7 @@ By specifying the `scan.mode` table property, users can specify where and how Ta
<tr>
<td>default</td>
<td colspan="2">
-The default scan mode. Determines actual scan mode according to other table properties. If "scan.timestamp-millis" is set the actual scan mode will be "from-timestamp". Otherwise the actual scan mode will be "latest-full".
+The default scan mode. Determines actual scan mode according to other table properties. If "scan.timestamp-millis" is set the actual scan mode will be "from-timestamp", and if "scan.snapshot-id" is set the actual startup mode will be "from-snapshot". Otherwise the actual scan mode will be "latest-full".
</td>
</tr>
<tr>
@@ -72,9 +72,14 @@ Produces the snapshot after the latest compaction on the table upon first startu
</tr>
<tr>
<td>from-timestamp</td>
-<td>Unsupported</td>
+<td>Produces a snapshot earlier than or equals to the timestamp specified by "scan.timestamp-millis".</td>
<td>Continuously reads changes starting from timestamp specified by "scan.timestamp-millis", without producing a snapshot at the beginning.</td>
</tr>
+<tr>
+<td>from-snapshot</td>
+<td>Produces a snapshot specified by "scan.snapshot-id".</td>
+<td>Continuously reads changes starting from a snapshot specified by "scan.snapshot-id", without producing a snapshot at the beginning.</td>
+</tr>
</tbody>
</table>
@@ -179,4 +184,4 @@ SELECT * FROM MyTable$audit_log;
+------------------+-----------------+-----------------+
3 rows in set
*/
-```
+```
\ No newline at end of file
diff --git a/docs/layouts/shortcodes/generated/core_configuration.html b/docs/layouts/shortcodes/generated/core_configuration.html
index 369aea41..069444ad 100644
--- a/docs/layouts/shortcodes/generated/core_configuration.html
+++ b/docs/layouts/shortcodes/generated/core_configuration.html
@@ -192,7 +192,13 @@
<td><h5>scan.mode</h5></td>
<td style="word-wrap: break-word;">default</td>
<td><p>Enum</p></td>
- <td>Specify the scanning behavior of the source.<br /><br />Possible values:<ul><li>"default": Determines actual startup mode according to other table properties. If "scan.timestamp-millis" is set the actual startup mode will be "from-timestamp". Otherwise the actual startup mode will be "latest-full".</li><li>"latest-full": For streaming sources, produces the latest snapshot on the table upon first startup, and continue to read the latest changes. For batch sources, just pro [...]
+ <td>Specify the scanning behavior of the source.<br /><br />Possible values:<ul><li>"default": Determines actual startup mode according to other table properties. If "scan.timestamp-millis" is set the actual startup mode will be "from-timestamp", and if "scan.snapshot-id" is set the actual startup mode will be "from-snapshot". Otherwise the actual startup mode will be "latest-full".</li><li>"latest-full": For streaming sources, produces the latest snapshot on the table upon f [...]
+ </tr>
+ <tr>
+ <td><h5>scan.snapshot-id</h5></td>
+ <td style="word-wrap: break-word;">(none)</td>
+ <td>Long</td>
+ <td>Optional snapshot id used in case of "from-snapshot" scan mode</td>
</tr>
<tr>
<td><h5>scan.timestamp-millis</h5></td>
diff --git a/flink-table-store-connector/src/test/java/org/apache/flink/table/store/connector/BatchFileStoreITCase.java b/flink-table-store-connector/src/test/java/org/apache/flink/table/store/connector/BatchFileStoreITCase.java
index 131ddd99..b9560afe 100644
--- a/flink-table-store-connector/src/test/java/org/apache/flink/table/store/connector/BatchFileStoreITCase.java
+++ b/flink-table-store-connector/src/test/java/org/apache/flink/table/store/connector/BatchFileStoreITCase.java
@@ -18,6 +18,7 @@
package org.apache.flink.table.store.connector;
+import org.apache.flink.table.store.CoreOptions;
import org.apache.flink.types.Row;
import org.junit.Test;
@@ -26,6 +27,7 @@ import java.util.Collections;
import java.util.List;
import static org.assertj.core.api.Assertions.assertThat;
+import static org.assertj.core.api.Assertions.assertThatThrownBy;
/** ITCase for batch file store. */
public class BatchFileStoreITCase extends FileStoreTableITCase {
@@ -50,4 +52,92 @@ public class BatchFileStoreITCase extends FileStoreTableITCase {
assertThat(batchSql("SELECT * FROM T /*+ OPTIONS('scan.mode'='compacted-full') */"))
.isEmpty();
}
+
+ @Test
+ public void testTimeTravelRead() throws InterruptedException {
+ batchSql("INSERT INTO T VALUES (1, 11, 111), (2, 22, 222)");
+ long time1 = System.currentTimeMillis();
+
+ Thread.sleep(10);
+ batchSql("INSERT INTO T VALUES (3, 33, 333), (4, 44, 444)");
+ long time2 = System.currentTimeMillis();
+
+ Thread.sleep(10);
+ batchSql("INSERT INTO T VALUES (5, 55, 555), (6, 66, 666)");
+ long time3 = System.currentTimeMillis();
+
+ Thread.sleep(10);
+ batchSql("INSERT INTO T VALUES (7, 77, 777), (8, 88, 888)");
+
+ assertThat(batchSql("SELECT * FROM T /*+ OPTIONS('scan.snapshot-id'='1') */"))
+ .containsExactlyInAnyOrder(Row.of(1, 11, 111), Row.of(2, 22, 222));
+
+ assertThat(batchSql("SELECT * FROM T /*+ OPTIONS('scan.snapshot-id'='0') */")).isEmpty();
+
+ assertThat(
+ batchSql(
+ String.format(
+ "SELECT * FROM T /*+ OPTIONS('scan.timestamp-millis'='%s') */",
+ time1)))
+ .containsExactlyInAnyOrder(Row.of(1, 11, 111), Row.of(2, 22, 222));
+
+ assertThat(batchSql("SELECT * FROM T /*+ OPTIONS('scan.snapshot-id'='2') */"))
+ .containsExactlyInAnyOrder(
+ Row.of(1, 11, 111),
+ Row.of(2, 22, 222),
+ Row.of(3, 33, 333),
+ Row.of(4, 44, 444));
+ assertThat(
+ batchSql(
+ String.format(
+ "SELECT * FROM T /*+ OPTIONS('scan.timestamp-millis'='%s') */",
+ time2)))
+ .containsExactlyInAnyOrder(
+ Row.of(1, 11, 111),
+ Row.of(2, 22, 222),
+ Row.of(3, 33, 333),
+ Row.of(4, 44, 444));
+
+ assertThat(batchSql("SELECT * FROM T /*+ OPTIONS('scan.snapshot-id'='3') */"))
+ .containsExactlyInAnyOrder(
+ Row.of(1, 11, 111),
+ Row.of(2, 22, 222),
+ Row.of(3, 33, 333),
+ Row.of(4, 44, 444),
+ Row.of(5, 55, 555),
+ Row.of(6, 66, 666));
+ assertThat(
+ batchSql(
+ String.format(
+ "SELECT * FROM T /*+ OPTIONS('scan.timestamp-millis'='%s') */",
+ time3)))
+ .containsExactlyInAnyOrder(
+ Row.of(1, 11, 111),
+ Row.of(2, 22, 222),
+ Row.of(3, 33, 333),
+ Row.of(4, 44, 444),
+ Row.of(5, 55, 555),
+ Row.of(6, 66, 666));
+
+ assertThatThrownBy(
+ () ->
+ batchSql(
+ String.format(
+ "SELECT * FROM T /*+ OPTIONS('scan.timestamp-millis'='%s', 'scan.snapshot-id'='1') */",
+ time3)))
+ .hasRootCauseInstanceOf(IllegalArgumentException.class)
+ .hasRootCauseMessage(
+ "%s must be null when you set %s",
+ CoreOptions.SCAN_SNAPSHOT_ID.key(),
+ CoreOptions.SCAN_TIMESTAMP_MILLIS.key());
+
+ assertThatThrownBy(
+ () ->
+ batchSql(
+ "SELECT * FROM T /*+ OPTIONS('scan.mode'='full', 'scan.snapshot-id'='1') */"))
+ .hasRootCauseInstanceOf(IllegalArgumentException.class)
+ .hasRootCauseMessage(
+ "%s must be null when you use latest-full for scan.mode",
+ CoreOptions.SCAN_SNAPSHOT_ID.key());
+ }
}
diff --git a/flink-table-store-connector/src/test/java/org/apache/flink/table/store/connector/ContinuousFileStoreITCase.java b/flink-table-store-connector/src/test/java/org/apache/flink/table/store/connector/ContinuousFileStoreITCase.java
index 9195b9c6..829ed6ca 100644
--- a/flink-table-store-connector/src/test/java/org/apache/flink/table/store/connector/ContinuousFileStoreITCase.java
+++ b/flink-table-store-connector/src/test/java/org/apache/flink/table/store/connector/ContinuousFileStoreITCase.java
@@ -215,6 +215,37 @@ public class ContinuousFileStoreITCase extends FileStoreTableITCase {
.hasMessageContaining("Unable to create a source for reading table");
}
+ @Test
+ public void testConfigureStartupSnapshot() throws Exception {
+ // Configure 'scan.snapshot-id' without 'scan.mode'.
+ BlockingIterator<Row, Row> iterator =
+ BlockingIterator.of(
+ streamSqlIter(
+ "SELECT * FROM T1 /*+ OPTIONS('scan.snapshot-id'='%s') */", 1));
+ batchSql("INSERT INTO T1 VALUES ('1', '2', '3'), ('4', '5', '6')");
+ batchSql("INSERT INTO T1 VALUES ('7', '8', '9'), ('10', '11', '12')");
+ assertThat(iterator.collect(2))
+ .containsExactlyInAnyOrder(Row.of("1", "2", "3"), Row.of("4", "5", "6"));
+ iterator.close();
+
+ // Start from earliest snapshot
+ iterator =
+ BlockingIterator.of(
+ streamSqlIter(
+ "SELECT * FROM T1 /*+ OPTIONS('scan.snapshot-id'='%s') */", 0));
+ assertThat(iterator.collect(2))
+ .containsExactlyInAnyOrder(Row.of("1", "2", "3"), Row.of("4", "5", "6"));
+ iterator.close();
+
+ // Configure 'scan.snapshot-id' with 'scan.mode=latest'.
+ assertThatThrownBy(
+ () ->
+ streamSqlIter(
+ "SELECT * FROM T1 /*+ OPTIONS('scan.mode'='latest', 'scan.snapshot-id'='%s') */",
+ 0))
+ .hasMessageContaining("Unable to create a source for reading table");
+ }
+
@Test
public void testIgnoreOverwrite() throws TimeoutException {
BlockingIterator<Row, Row> iterator =
diff --git a/flink-table-store-core/src/main/java/org/apache/flink/table/store/CoreOptions.java b/flink-table-store-core/src/main/java/org/apache/flink/table/store/CoreOptions.java
index 280a1ab2..22974af1 100644
--- a/flink-table-store-core/src/main/java/org/apache/flink/table/store/CoreOptions.java
+++ b/flink-table-store-core/src/main/java/org/apache/flink/table/store/CoreOptions.java
@@ -340,6 +340,13 @@ public class CoreOptions implements Serializable {
.withDescription(
"Optional timestamp used in case of \"from-timestamp\" scan mode.");
+ public static final ConfigOption<Long> SCAN_SNAPSHOT_ID =
+ ConfigOptions.key("scan.snapshot-id")
+ .longType()
+ .noDefaultValue()
+ .withDescription(
+ "Optional snapshot id used in case of \"from-snapshot\" scan mode");
+
public static final ConfigOption<Duration> LOG_RETENTION =
ConfigOptions.key("log.retention")
.durationType()
@@ -543,6 +550,8 @@ public class CoreOptions implements Serializable {
if (mode == StartupMode.DEFAULT) {
if (options.getOptional(SCAN_TIMESTAMP_MILLIS).isPresent()) {
return StartupMode.FROM_TIMESTAMP;
+ } else if (options.getOptional(SCAN_SNAPSHOT_ID).isPresent()) {
+ return StartupMode.FROM_SNAPSHOT;
} else {
return StartupMode.LATEST_FULL;
}
@@ -553,10 +562,14 @@ public class CoreOptions implements Serializable {
}
}
- public Long logScanTimestampMills() {
+ public Long scanTimestampMills() {
return options.get(SCAN_TIMESTAMP_MILLIS);
}
+ public Long scanSnapshotId() {
+ return options.get(SCAN_SNAPSHOT_ID);
+ }
+
public Duration changelogProducerFullCompactionTriggerInterval() {
return options.get(CHANGELOG_PRODUCER_FULL_COMPACTION_TRIGGER_INTERVAL);
}
@@ -605,7 +618,8 @@ public class CoreOptions implements Serializable {
DEFAULT(
"default",
"Determines actual startup mode according to other table properties. "
- + "If \"scan.timestamp-millis\" is set the actual startup mode will be \"from-timestamp\". "
+ + "If \"scan.timestamp-millis\" is set the actual startup mode will be \"from-timestamp\", "
+ + "and if \"scan.snapshot-id\" is set the actual startup mode will be \"from-snapshot\". "
+ "Otherwise the actual startup mode will be \"latest-full\"."),
LATEST_FULL(
@@ -634,7 +648,15 @@ public class CoreOptions implements Serializable {
"For streaming sources, continuously reads changes "
+ "starting from timestamp specified by \"scan.timestamp-millis\", "
+ "without producing a snapshot at the beginning. "
- + "Unsupported for batch sources.");
+ + "For batch sources, produces a snapshot at timestamp specified by \"scan.timestamp-millis\" "
+ + "but does not read new changes."),
+
+ FROM_SNAPSHOT(
+ "from-snapshot",
+ "For streaming sources, continuously reads changes "
+ + "starting from snapshot specified by \"scan.snapshot-id\", "
+ + "without producing a snapshot at the beginning. For batch sources, "
+ + "produces a snapshot specified by \"scan.snapshot-id\" but does not read new changes.");
private final String value;
private final String description;
@@ -766,21 +788,14 @@ public class CoreOptions implements Serializable {
public static void validateTableSchema(TableSchema schema) {
CoreOptions options = new CoreOptions(schema.options());
if (options.startupMode() == StartupMode.FROM_TIMESTAMP) {
- Preconditions.checkArgument(
- options.logScanTimestampMills() != null,
- String.format(
- "%s can not be null when you use %s for %s",
- SCAN_TIMESTAMP_MILLIS.key(),
- StartupMode.FROM_TIMESTAMP,
- SCAN_MODE.key()));
+ checkOptionExistInMode(options, SCAN_TIMESTAMP_MILLIS, StartupMode.FROM_TIMESTAMP);
+ checkOptionsConflict(options, SCAN_SNAPSHOT_ID, SCAN_TIMESTAMP_MILLIS);
+ } else if (options.startupMode() == StartupMode.FROM_SNAPSHOT) {
+ checkOptionExistInMode(options, SCAN_SNAPSHOT_ID, StartupMode.FROM_SNAPSHOT);
+ checkOptionsConflict(options, SCAN_TIMESTAMP_MILLIS, SCAN_SNAPSHOT_ID);
} else {
- Preconditions.checkArgument(
- options.logScanTimestampMills() == null,
- String.format(
- "%s should be %s when you set %s",
- SCAN_MODE.key(),
- StartupMode.FROM_TIMESTAMP,
- SCAN_TIMESTAMP_MILLIS.key()));
+ checkOptionNotExistInMode(options, SCAN_TIMESTAMP_MILLIS, options.startupMode());
+ checkOptionNotExistInMode(options, SCAN_SNAPSHOT_ID, options.startupMode());
}
Preconditions.checkArgument(
@@ -817,6 +832,32 @@ public class CoreOptions implements Serializable {
});
}
+ private static void checkOptionExistInMode(
+ CoreOptions options, ConfigOption<?> option, StartupMode startupMode) {
+ Preconditions.checkArgument(
+ options.options.contains(option),
+ String.format(
+ "%s can not be null when you use %s for %s",
+ option.key(), startupMode, SCAN_MODE.key()));
+ }
+
+ private static void checkOptionNotExistInMode(
+ CoreOptions options, ConfigOption<?> option, StartupMode startupMode) {
+ Preconditions.checkArgument(
+ !options.options.contains(option),
+ String.format(
+ "%s must be null when you use %s for %s",
+ option.key(), startupMode, SCAN_MODE.key()));
+ }
+
+ private static void checkOptionsConflict(
+ CoreOptions options, ConfigOption<?> illegalOption, ConfigOption<?> legalOption) {
+ Preconditions.checkArgument(
+ !options.options.contains(illegalOption),
+ String.format(
+ "%s must be null when you set %s", illegalOption.key(), legalOption.key()));
+ }
+
@Internal
public static List<ConfigOption<?>> getOptions() {
final Field[] fields = CoreOptions.class.getFields();
diff --git a/flink-table-store-core/src/main/java/org/apache/flink/table/store/file/utils/SnapshotManager.java b/flink-table-store-core/src/main/java/org/apache/flink/table/store/file/utils/SnapshotManager.java
index 8400a0cb..a7004afe 100644
--- a/flink-table-store-core/src/main/java/org/apache/flink/table/store/file/utils/SnapshotManager.java
+++ b/flink-table-store-core/src/main/java/org/apache/flink/table/store/file/utils/SnapshotManager.java
@@ -129,6 +129,23 @@ public class SnapshotManager {
return earliest - 1;
}
+ /** Returns a snapshot earlier than or equals to the timestamp mills. */
+ public @Nullable Long earlierOrEqualTimeMills(long timestampMills) {
+ Long earliest = earliestSnapshotId();
+ Long latest = latestSnapshotId();
+ if (earliest == null || latest == null) {
+ return null;
+ }
+
+ for (long i = latest; i >= earliest; i--) {
+ long commitTime = snapshot(i).timeMillis();
+ if (commitTime <= timestampMills) {
+ return i;
+ }
+ }
+ return null;
+ }
+
public long snapshotCount() throws IOException {
return listVersionedFiles(snapshotDirectory(), SNAPSHOT_PREFIX).count();
}
diff --git a/flink-table-store-core/src/main/java/org/apache/flink/table/store/table/source/snapshot/ContinuousDataFileSnapshotEnumerator.java b/flink-table-store-core/src/main/java/org/apache/flink/table/store/table/source/snapshot/ContinuousDataFileSnapshotEnumerator.java
index ba69827d..bee9b94f 100644
--- a/flink-table-store-core/src/main/java/org/apache/flink/table/store/table/source/snapshot/ContinuousDataFileSnapshotEnumerator.java
+++ b/flink-table-store-core/src/main/java/org/apache/flink/table/store/table/source/snapshot/ContinuousDataFileSnapshotEnumerator.java
@@ -131,7 +131,7 @@ public class ContinuousDataFileSnapshotEnumerator implements SnapshotEnumerator
private static StartingScanner createStartingScanner(DataTable table) {
CoreOptions.StartupMode startupMode = table.options().startupMode();
- Long startupMillis = table.options().logScanTimestampMills();
+ Long startupMillis = table.options().scanTimestampMills();
if (startupMode == CoreOptions.StartupMode.LATEST_FULL) {
return new FullStartingScanner();
} else if (startupMode == CoreOptions.StartupMode.LATEST) {
@@ -147,6 +147,16 @@ public class ContinuousDataFileSnapshotEnumerator implements SnapshotEnumerator
CoreOptions.StartupMode.FROM_TIMESTAMP,
CoreOptions.SCAN_MODE.key()));
return new ContinuousFromTimestampStartingScanner(startupMillis);
+ } else if (startupMode == CoreOptions.StartupMode.FROM_SNAPSHOT) {
+ Long snapshotId = table.options().scanSnapshotId();
+ Preconditions.checkNotNull(
+ snapshotId,
+ String.format(
+ "%s can not be null when you use %s for %s",
+ CoreOptions.SCAN_SNAPSHOT_ID.key(),
+ CoreOptions.StartupMode.FROM_SNAPSHOT,
+ CoreOptions.SCAN_MODE.key()));
+ return new ContinuousFromSnapshotStartingScanner(snapshotId);
} else {
throw new UnsupportedOperationException("Unknown startup mode " + startupMode.name());
}
diff --git a/flink-table-store-core/src/main/java/org/apache/flink/table/store/table/source/snapshot/ContinuousFromSnapshotStartingScanner.java b/flink-table-store-core/src/main/java/org/apache/flink/table/store/table/source/snapshot/ContinuousFromSnapshotStartingScanner.java
new file mode 100644
index 00000000..4fa08d59
--- /dev/null
+++ b/flink-table-store-core/src/main/java/org/apache/flink/table/store/table/source/snapshot/ContinuousFromSnapshotStartingScanner.java
@@ -0,0 +1,51 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.store.table.source.snapshot;
+
+import org.apache.flink.table.store.file.utils.SnapshotManager;
+import org.apache.flink.table.store.table.source.DataTableScan;
+
+import java.util.Collections;
+
+/**
+ * {@link StartingScanner} for the {@link
+ * org.apache.flink.table.store.CoreOptions.StartupMode#FROM_SNAPSHOT} startup mode of a streaming
+ * read.
+ */
+public class ContinuousFromSnapshotStartingScanner implements StartingScanner {
+
+ private final long snapshotId;
+
+ public ContinuousFromSnapshotStartingScanner(long snapshotId) {
+ this.snapshotId = snapshotId;
+ }
+
+ @Override
+ public DataTableScan.DataFilePlan getPlan(SnapshotManager snapshotManager, DataTableScan scan) {
+ Long earliestSnapshotId = snapshotManager.earliestSnapshotId();
+ if (earliestSnapshotId == null) {
+ return null;
+ }
+ // We should use `snapshotId - 1` here to start to scan delta data from specific snapshot
+ // id. If the snapshotId < earliestSnapshotId, start to scan from the earliest.
+ return new DataTableScan.DataFilePlan(
+ snapshotId >= earliestSnapshotId ? snapshotId - 1 : earliestSnapshotId - 1,
+ Collections.emptyList());
+ }
+}
diff --git a/flink-table-store-core/src/main/java/org/apache/flink/table/store/table/source/snapshot/StaticDataFileSnapshotEnumerator.java b/flink-table-store-core/src/main/java/org/apache/flink/table/store/table/source/snapshot/StaticDataFileSnapshotEnumerator.java
index 4c23caf8..cee17020 100644
--- a/flink-table-store-core/src/main/java/org/apache/flink/table/store/table/source/snapshot/StaticDataFileSnapshotEnumerator.java
+++ b/flink-table-store-core/src/main/java/org/apache/flink/table/store/table/source/snapshot/StaticDataFileSnapshotEnumerator.java
@@ -23,6 +23,7 @@ import org.apache.flink.table.store.CoreOptions;
import org.apache.flink.table.store.file.utils.SnapshotManager;
import org.apache.flink.table.store.table.DataTable;
import org.apache.flink.table.store.table.source.DataTableScan;
+import org.apache.flink.util.Preconditions;
import javax.annotation.Nullable;
@@ -69,6 +70,26 @@ public class StaticDataFileSnapshotEnumerator implements SnapshotEnumerator {
startingScanner = new FullStartingScanner();
} else if (startupMode == CoreOptions.StartupMode.COMPACTED_FULL) {
startingScanner = new CompactedStartingScanner();
+ } else if (startupMode == CoreOptions.StartupMode.FROM_TIMESTAMP) {
+ Long startupMillis = table.options().scanTimestampMills();
+ Preconditions.checkNotNull(
+ startupMillis,
+ String.format(
+ "%s can not be null when you use %s for %s",
+ CoreOptions.SCAN_TIMESTAMP_MILLIS.key(),
+ CoreOptions.StartupMode.FROM_TIMESTAMP,
+ CoreOptions.SCAN_MODE.key()));
+ startingScanner = new StaticFromTimestampStartingScanner(startupMillis);
+ } else if (startupMode == CoreOptions.StartupMode.FROM_SNAPSHOT) {
+ Long snapshotId = table.options().scanSnapshotId();
+ Preconditions.checkNotNull(
+ snapshotId,
+ String.format(
+ "%s can not be null when you use %s for %s",
+ CoreOptions.SCAN_SNAPSHOT_ID.key(),
+ CoreOptions.StartupMode.FROM_SNAPSHOT,
+ CoreOptions.SCAN_MODE.key()));
+ startingScanner = new StaticFromSnapshotStartingScanner(snapshotId);
} else {
throw new UnsupportedOperationException("Unknown startup mode " + startupMode.name());
}
diff --git a/flink-table-store-core/src/main/java/org/apache/flink/table/store/table/source/snapshot/StaticFromSnapshotStartingScanner.java b/flink-table-store-core/src/main/java/org/apache/flink/table/store/table/source/snapshot/StaticFromSnapshotStartingScanner.java
new file mode 100644
index 00000000..2cc19567
--- /dev/null
+++ b/flink-table-store-core/src/main/java/org/apache/flink/table/store/table/source/snapshot/StaticFromSnapshotStartingScanner.java
@@ -0,0 +1,44 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.store.table.source.snapshot;
+
+import org.apache.flink.table.store.file.operation.ScanKind;
+import org.apache.flink.table.store.file.utils.SnapshotManager;
+import org.apache.flink.table.store.table.source.DataTableScan;
+
+/**
+ * {@link StartingScanner} for the {@link
+ * org.apache.flink.table.store.CoreOptions.StartupMode#FROM_SNAPSHOT} startup mode of a batch read.
+ */
+public class StaticFromSnapshotStartingScanner implements StartingScanner {
+ private final long snapshotId;
+
+ public StaticFromSnapshotStartingScanner(long snapshotId) {
+ this.snapshotId = snapshotId;
+ }
+
+ @Override
+ public DataTableScan.DataFilePlan getPlan(SnapshotManager snapshotManager, DataTableScan scan) {
+ if (snapshotManager.earliestSnapshotId() == null
+ || snapshotId < snapshotManager.earliestSnapshotId()) {
+ return null;
+ }
+ return scan.withKind(ScanKind.ALL).withSnapshot(snapshotId).plan();
+ }
+}
diff --git a/flink-table-store-core/src/main/java/org/apache/flink/table/store/table/source/snapshot/StaticFromTimestampStartingScanner.java b/flink-table-store-core/src/main/java/org/apache/flink/table/store/table/source/snapshot/StaticFromTimestampStartingScanner.java
new file mode 100644
index 00000000..5b81a4e5
--- /dev/null
+++ b/flink-table-store-core/src/main/java/org/apache/flink/table/store/table/source/snapshot/StaticFromTimestampStartingScanner.java
@@ -0,0 +1,55 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.store.table.source.snapshot;
+
+import org.apache.flink.table.store.file.operation.ScanKind;
+import org.apache.flink.table.store.file.utils.SnapshotManager;
+import org.apache.flink.table.store.table.source.DataTableScan;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+/**
+ * {@link StartingScanner} for the {@link
+ * org.apache.flink.table.store.CoreOptions.StartupMode#FROM_TIMESTAMP} startup mode of a batch
+ * read.
+ */
+public class StaticFromTimestampStartingScanner implements StartingScanner {
+
+ private static final Logger LOG =
+ LoggerFactory.getLogger(StaticFromTimestampStartingScanner.class);
+
+ private final long startupMillis;
+
+ public StaticFromTimestampStartingScanner(long startupMillis) {
+ this.startupMillis = startupMillis;
+ }
+
+ @Override
+ public DataTableScan.DataFilePlan getPlan(SnapshotManager snapshotManager, DataTableScan scan) {
+ Long startingSnapshotId = snapshotManager.earlierOrEqualTimeMills(startupMillis);
+ if (startingSnapshotId == null) {
+ LOG.debug(
+ "There is currently no snapshot earlier than or equal to timestamp[{}]",
+ startupMillis);
+ return null;
+ }
+ return scan.withKind(ScanKind.ALL).withSnapshot(startingSnapshotId).plan();
+ }
+}