You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by cz...@apache.org on 2022/12/06 07:44:38 UTC

[flink-table-store] branch master updated: [FLINK-30294] Change table property key 'log.scan' to 'scan.mode' and add a default startup mode in Table Store

This is an automated email from the ASF dual-hosted git repository.

czweng pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/flink-table-store.git


The following commit(s) were added to refs/heads/master by this push:
     new dadd735d [FLINK-30294] Change table property key 'log.scan' to 'scan.mode' and add a default startup mode in Table Store
dadd735d is described below

commit dadd735d362d355d60b737bf2c9669cb2fc880f6
Author: tsreaper <ts...@gmail.com>
AuthorDate: Tue Dec 6 15:44:34 2022 +0800

    [FLINK-30294] Change table property key 'log.scan' to 'scan.mode' and add a default startup mode in Table Store
    
    This closes #422.
---
 docs/content/docs/development/streaming-query.md   | 34 +++++++--
 .../shortcodes/generated/core_configuration.html   | 24 +++---
 .../store/connector/source/FlinkSourceBuilder.java |  9 ++-
 .../ComputedColumnAndWatermarkTableITCase.java     | 12 +--
 .../store/connector/ReadWriteTableTestBase.java    | 12 +--
 .../org/apache/flink/table/store/CoreOptions.java  | 89 +++++++++++++++-------
 .../store/table/source/TableStreamingReader.java   |  2 +-
 .../snapshot/DataFileSnapshotEnumerator.java       | 12 +--
 .../source/snapshot/DeltaSnapshotEnumerator.java   |  2 +-
 .../FullCompactionChangelogSnapshotEnumerator.java |  2 +-
 .../snapshot/InputChangelogSnapshotEnumerator.java |  2 +-
 .../apache/flink/table/store/CoreOptionsTest.java  | 53 +++++++++++++
 .../table/ChangelogWithKeyFileStoreTableTest.java  |  2 +-
 .../snapshot/DeltaSnapshotEnumeratorTest.java      |  6 +-
 ...lCompactionChangelogSnapshotEnumeratorTest.java |  6 +-
 .../InputChangelogSnapshotEnumeratorTest.java      |  6 +-
 .../table/store/kafka/KafkaLogSourceProvider.java  |  6 +-
 .../table/store/kafka/KafkaLogStoreFactory.java    |  9 ++-
 18 files changed, 199 insertions(+), 89 deletions(-)

diff --git a/docs/content/docs/development/streaming-query.md b/docs/content/docs/development/streaming-query.md
index e7c0208b..8acf9531 100644
--- a/docs/content/docs/development/streaming-query.md
+++ b/docs/content/docs/development/streaming-query.md
@@ -42,10 +42,10 @@ SELECT * FROM MyTable;
 
 -- Streaming mode, streaming reading, read latest incremental
 SET 'execution.runtime-mode' = 'streaming';
-SELECT * FROM MyTable /*+ OPTIONS ('log.scan'='latest') */;
+SELECT * FROM MyTable /*+ OPTIONS ('scan.mode'='latest') */;
 ```
 
-Different `log.scan` mode will result in different consuming behavior under streaming mode.
+Different `scan.mode` will result in different consuming behavior under streaming mode.
 <table class="table table-bordered">
     <thead>
     <tr>
@@ -56,14 +56,36 @@ Different `log.scan` mode will result in different consuming behavior under stre
     </thead>
     <tbody>
     <tr>
-      <td><h5>FULL</h5></td>
+      <td><h5>default</h5></td>
       <td>Yes</td>
-      <td>FULL scan mode performs a hybrid reading with a snapshot scan and the streaming incremental scan.</td>
+      <td>
+      Determines actual startup mode according to other table properties.
+      If "scan.timestamp-millis" is set the actual startup mode will be "from-timestamp".
+      Otherwise the actual startup mode will be "full".
+      </td>
     </tr>
     <tr>
-      <td><h5>LATEST</h5></td>
+      <td><h5>full</h5></td>
       <td>No</td>
-      <td>LATEST scan mode only reads incremental data from the latest offset.</td>
+      <td>
+      Produces a snapshot on the table upon first startup,
+      and continue to read the latest changes.
+      </td>
+    </tr>
+    <tr>
+      <td><h5>latest</h5></td>
+      <td>No</td>
+      <td>
+      Continuously reads latest changes without producing a snapshot at the beginning.
+      </td>
+    </tr>
+    <tr>
+      <td><h5>from-timestamp</h5></td>
+      <td>No</td>
+      <td>
+      Continuously reads changes starting from timestamp specified by "scan.timestamp-millis",
+      without producing a snapshot at the beginning.
+      </td>
     </tr>
     </tbody>
 </table>
diff --git a/docs/layouts/shortcodes/generated/core_configuration.html b/docs/layouts/shortcodes/generated/core_configuration.html
index 1f7401ad..14c721f3 100644
--- a/docs/layouts/shortcodes/generated/core_configuration.html
+++ b/docs/layouts/shortcodes/generated/core_configuration.html
@@ -122,24 +122,12 @@
             <td>Duration</td>
             <td>It means how long changes log will be kept. The default value is from the log system cluster.</td>
         </tr>
-        <tr>
-            <td><h5>log.scan</h5></td>
-            <td style="word-wrap: break-word;">full</td>
-            <td><p>Enum</p></td>
-            <td>Specify the startup mode for log consumer.<br /><br />Possible values:<ul><li>"full": Perform a snapshot on the table upon first startup, and continue to read the latest changes.</li><li>"latest": Start from the latest.</li><li>"from-timestamp": Start from user-supplied timestamp.</li></ul></td>
-        </tr>
         <tr>
             <td><h5>log.scan.remove-normalize</h5></td>
             <td style="word-wrap: break-word;">false</td>
             <td>Boolean</td>
             <td>Whether to force the removal of the normalize node when streaming read. Note: This is dangerous and is likely to cause data errors if downstream is used to calculate aggregation and the input is not complete changelog.</td>
         </tr>
-        <tr>
-            <td><h5>log.scan.timestamp-millis</h5></td>
-            <td style="word-wrap: break-word;">(none)</td>
-            <td>Long</td>
-            <td>Optional timestamp used in case of "from-timestamp" scan mode</td>
-        </tr>
         <tr>
             <td><h5>manifest.format</h5></td>
             <td style="word-wrap: break-word;">"avro"</td>
@@ -206,6 +194,18 @@
             <td>Boolean</td>
             <td>Whether to read compacted snapshot only.</td>
         </tr>
+        <tr>
+            <td><h5>scan.mode</h5></td>
+            <td style="word-wrap: break-word;">deafult</td>
+            <td><p>Enum</p></td>
+            <td>Specify the scanning behavior of the source.<br /><br />Possible values:<ul><li>"deafult": Determines actual startup mode according to other table properties. If "scan.timestamp-millis" is set the actual startup mode will be "from-timestamp". Otherwise the actual startup mode will be "full".</li><li>"full": For streaming sources, produces a snapshot on the table upon first startup, and continue to read the latest changes. For batch sources, just produce a snapshot but doe [...]
+        </tr>
+        <tr>
+            <td><h5>scan.timestamp-millis</h5></td>
+            <td style="word-wrap: break-word;">(none)</td>
+            <td>Long</td>
+            <td>Optional timestamp used in case of "from-timestamp" scan mode.</td>
+        </tr>
         <tr>
             <td><h5>sequence.field</h5></td>
             <td style="word-wrap: break-word;">(none)</td>
diff --git a/flink-table-store-connector/src/main/java/org/apache/flink/table/store/connector/source/FlinkSourceBuilder.java b/flink-table-store-connector/src/main/java/org/apache/flink/table/store/connector/source/FlinkSourceBuilder.java
index 57b544e4..b2495234 100644
--- a/flink-table-store-connector/src/main/java/org/apache/flink/table/store/connector/source/FlinkSourceBuilder.java
+++ b/flink-table-store-connector/src/main/java/org/apache/flink/table/store/connector/source/FlinkSourceBuilder.java
@@ -29,8 +29,9 @@ import org.apache.flink.table.api.ValidationException;
 import org.apache.flink.table.catalog.ObjectIdentifier;
 import org.apache.flink.table.data.RowData;
 import org.apache.flink.table.runtime.typeutils.InternalTypeInfo;
-import org.apache.flink.table.store.CoreOptions.LogStartupMode;
+import org.apache.flink.table.store.CoreOptions;
 import org.apache.flink.table.store.CoreOptions.MergeEngine;
+import org.apache.flink.table.store.CoreOptions.StartupMode;
 import org.apache.flink.table.store.file.predicate.Predicate;
 import org.apache.flink.table.store.log.LogSourceProvider;
 import org.apache.flink.table.store.table.FileStoreTable;
@@ -46,7 +47,6 @@ import java.util.Optional;
 import static org.apache.flink.table.store.CoreOptions.CHANGELOG_PRODUCER;
 import static org.apache.flink.table.store.CoreOptions.CONTINUOUS_DISCOVERY_INTERVAL;
 import static org.apache.flink.table.store.CoreOptions.ChangelogProducer.FULL_COMPACTION;
-import static org.apache.flink.table.store.CoreOptions.LOG_SCAN;
 import static org.apache.flink.table.store.CoreOptions.MERGE_ENGINE;
 import static org.apache.flink.table.store.connector.FlinkConnectorOptions.COMPACTION_MANUAL_TRIGGERED;
 
@@ -146,11 +146,12 @@ public class FlinkSourceBuilder {
                                 + "You can use full compaction changelog producer to support streaming reading.");
             }
 
-            LogStartupMode startupMode = conf.get(LOG_SCAN);
+            // TODO visit all options through CoreOptions
+            StartupMode startupMode = CoreOptions.startupMode(conf);
             if (logSourceProvider == null) {
                 return buildContinuousFileSource();
             } else {
-                if (startupMode != LogStartupMode.FULL) {
+                if (startupMode != StartupMode.FULL) {
                     return logSourceProvider.createSource(null);
                 }
                 return HybridSource.<RowData, StaticFileStoreSplitEnumerator>builder(
diff --git a/flink-table-store-connector/src/test/java/org/apache/flink/table/store/connector/ComputedColumnAndWatermarkTableITCase.java b/flink-table-store-connector/src/test/java/org/apache/flink/table/store/connector/ComputedColumnAndWatermarkTableITCase.java
index febc784d..80199c87 100644
--- a/flink-table-store-connector/src/test/java/org/apache/flink/table/store/connector/ComputedColumnAndWatermarkTableITCase.java
+++ b/flink-table-store-connector/src/test/java/org/apache/flink/table/store/connector/ComputedColumnAndWatermarkTableITCase.java
@@ -19,7 +19,7 @@
 package org.apache.flink.table.store.connector;
 
 import org.apache.flink.api.java.tuple.Tuple2;
-import org.apache.flink.table.store.CoreOptions.LogStartupMode;
+import org.apache.flink.table.store.CoreOptions.StartupMode;
 
 import org.junit.Test;
 
@@ -29,7 +29,7 @@ import java.util.Collections;
 import java.util.stream.Collectors;
 
 import static org.apache.flink.table.planner.factories.TestValuesTableFactory.changelogRow;
-import static org.apache.flink.table.store.CoreOptions.LOG_SCAN;
+import static org.apache.flink.table.store.CoreOptions.SCAN_MODE;
 import static org.apache.flink.table.store.connector.ReadWriteTableTestUtil.rates;
 import static org.apache.flink.table.store.connector.ReadWriteTableTestUtil.ratesWithTimestamp;
 
@@ -146,7 +146,7 @@ public class ComputedColumnAndWatermarkTableITCase extends ReadWriteTableTestBas
                         null,
                         false,
                         Collections.singletonMap(
-                                LOG_SCAN.key(), LogStartupMode.LATEST.name().toLowerCase()),
+                                SCAN_MODE.key(), StartupMode.LATEST.name().toLowerCase()),
                         "rate_by_to_currency IS NULL",
                         Arrays.asList(
                                 "corrected_rate_by_to_currency",
@@ -190,7 +190,7 @@ public class ComputedColumnAndWatermarkTableITCase extends ReadWriteTableTestBas
                         WatermarkSpec.of("ts", "ts - INTERVAL '3' YEAR"),
                         false,
                         Collections.singletonMap(
-                                LOG_SCAN.key(), LogStartupMode.LATEST.name().toLowerCase()),
+                                SCAN_MODE.key(), StartupMode.LATEST.name().toLowerCase()),
                         lateEventFilter,
                         Collections.emptyList(), // projection
                         Collections.singletonList(
@@ -213,7 +213,7 @@ public class ComputedColumnAndWatermarkTableITCase extends ReadWriteTableTestBas
                         WatermarkSpec.of("ts1", "ts1 - INTERVAL '3' YEAR"),
                         false,
                         Collections.singletonMap(
-                                LOG_SCAN.key(), LogStartupMode.LATEST.name().toLowerCase()),
+                                SCAN_MODE.key(), StartupMode.LATEST.name().toLowerCase()),
                         lateEventFilter.replaceAll("ts", "ts1"),
                         Arrays.asList("currency", "rate", "ts1"),
                         Collections.singletonList(
@@ -237,7 +237,7 @@ public class ComputedColumnAndWatermarkTableITCase extends ReadWriteTableTestBas
                         WatermarkSpec.of("ts", "ts - INTERVAL '3' YEAR"),
                         false,
                         Collections.singletonMap(
-                                LOG_SCAN.key(), LogStartupMode.LATEST.name().toLowerCase()),
+                                SCAN_MODE.key(), StartupMode.LATEST.name().toLowerCase()),
                         lateEventFilter,
                         Arrays.asList(
                                 "currency",
diff --git a/flink-table-store-connector/src/test/java/org/apache/flink/table/store/connector/ReadWriteTableTestBase.java b/flink-table-store-connector/src/test/java/org/apache/flink/table/store/connector/ReadWriteTableTestBase.java
index 1da0c1ed..41867850 100644
--- a/flink-table-store-connector/src/test/java/org/apache/flink/table/store/connector/ReadWriteTableTestBase.java
+++ b/flink-table-store-connector/src/test/java/org/apache/flink/table/store/connector/ReadWriteTableTestBase.java
@@ -30,7 +30,7 @@ import org.apache.flink.table.catalog.CatalogBaseTable;
 import org.apache.flink.table.catalog.ObjectIdentifier;
 import org.apache.flink.table.catalog.ObjectPath;
 import org.apache.flink.table.catalog.ResolvedCatalogTable;
-import org.apache.flink.table.store.CoreOptions.LogStartupMode;
+import org.apache.flink.table.store.CoreOptions.StartupMode;
 import org.apache.flink.table.store.file.utils.BlockingIterator;
 import org.apache.flink.table.store.kafka.KafkaTableTestBase;
 import org.apache.flink.types.Row;
@@ -49,8 +49,8 @@ import java.util.concurrent.TimeUnit;
 import java.util.concurrent.TimeoutException;
 import java.util.stream.Collectors;
 
-import static org.apache.flink.table.store.CoreOptions.LOG_SCAN;
-import static org.apache.flink.table.store.CoreOptions.LOG_SCAN_TIMESTAMP_MILLS;
+import static org.apache.flink.table.store.CoreOptions.SCAN_MODE;
+import static org.apache.flink.table.store.CoreOptions.SCAN_TIMESTAMP_MILLIS;
 import static org.apache.flink.table.store.connector.FlinkConnectorOptions.LOG_SYSTEM;
 import static org.apache.flink.table.store.connector.FlinkConnectorOptions.ROOT_PATH;
 import static org.apache.flink.table.store.connector.FlinkConnectorOptions.relativeTablePath;
@@ -232,7 +232,7 @@ public class ReadWriteTableTestBase extends KafkaTableTestBase {
             List<Row> expected)
             throws Exception {
         Map<String, String> hints = new HashMap<>();
-        hints.put(LOG_SCAN.key(), LogStartupMode.LATEST.name().toLowerCase());
+        hints.put(SCAN_MODE.key(), StartupMode.LATEST.name().toLowerCase());
         collectAndCheckUnderSameEnv(
                         true,
                         true,
@@ -258,8 +258,8 @@ public class ReadWriteTableTestBase extends KafkaTableTestBase {
             List<Row> expected)
             throws Exception {
         Map<String, String> hints = new HashMap<>();
-        hints.put(LOG_SCAN.key(), "from-timestamp");
-        hints.put(LOG_SCAN_TIMESTAMP_MILLS.key(), String.valueOf(timestamp));
+        hints.put(SCAN_MODE.key(), "from-timestamp");
+        hints.put(SCAN_TIMESTAMP_MILLIS.key(), String.valueOf(timestamp));
         collectAndCheckUnderSameEnv(
                         true,
                         true,
diff --git a/flink-table-store-core/src/main/java/org/apache/flink/table/store/CoreOptions.java b/flink-table-store-core/src/main/java/org/apache/flink/table/store/CoreOptions.java
index 08bad53b..ed5b1474 100644
--- a/flink-table-store-core/src/main/java/org/apache/flink/table/store/CoreOptions.java
+++ b/flink-table-store-core/src/main/java/org/apache/flink/table/store/CoreOptions.java
@@ -25,6 +25,7 @@ import org.apache.flink.configuration.ConfigOptions;
 import org.apache.flink.configuration.Configuration;
 import org.apache.flink.configuration.DescribedEnum;
 import org.apache.flink.configuration.MemorySize;
+import org.apache.flink.configuration.ReadableConfig;
 import org.apache.flink.configuration.description.Description;
 import org.apache.flink.configuration.description.InlineElement;
 import org.apache.flink.core.fs.Path;
@@ -327,18 +328,20 @@ public class CoreOptions implements Serializable {
                             "The field that generates the sequence number for primary key table,"
                                     + " the sequence number determines which data is the most recent.");
 
-    public static final ConfigOption<LogStartupMode> LOG_SCAN =
-            ConfigOptions.key("log.scan")
-                    .enumType(LogStartupMode.class)
-                    .defaultValue(LogStartupMode.FULL)
-                    .withDescription("Specify the startup mode for log consumer.");
+    public static final ConfigOption<StartupMode> SCAN_MODE =
+            ConfigOptions.key("scan.mode")
+                    .enumType(StartupMode.class)
+                    .defaultValue(StartupMode.DEFAULT)
+                    .withDeprecatedKeys("log.scan")
+                    .withDescription("Specify the scanning behavior of the source.");
 
-    public static final ConfigOption<Long> LOG_SCAN_TIMESTAMP_MILLS =
-            ConfigOptions.key("log.scan.timestamp-millis")
+    public static final ConfigOption<Long> SCAN_TIMESTAMP_MILLIS =
+            ConfigOptions.key("scan.timestamp-millis")
                     .longType()
                     .noDefaultValue()
+                    .withDeprecatedKeys("log.scan.timestamp-millis")
                     .withDescription(
-                            "Optional timestamp used in case of \"from-timestamp\" scan mode");
+                            "Optional timestamp used in case of \"from-timestamp\" scan mode.");
 
     public static final ConfigOption<Duration> LOG_RETENTION =
             ConfigOptions.key("log.retention")
@@ -526,12 +529,25 @@ public class CoreOptions implements Serializable {
         return options.get(CHANGELOG_PRODUCER);
     }
 
-    public LogStartupMode logStartupMode() {
-        return options.get(LOG_SCAN);
+    public StartupMode startupMode() {
+        return startupMode(options);
+    }
+
+    public static StartupMode startupMode(ReadableConfig options) {
+        StartupMode mode = options.get(SCAN_MODE);
+        if (mode == StartupMode.DEFAULT) {
+            if (options.getOptional(SCAN_TIMESTAMP_MILLIS).isPresent()) {
+                return StartupMode.FROM_TIMESTAMP;
+            } else {
+                return StartupMode.FULL;
+            }
+        } else {
+            return mode;
+        }
     }
 
     public Long logScanTimestampMills() {
-        return options.get(LOG_SCAN_TIMESTAMP_MILLS);
+        return options.get(SCAN_TIMESTAMP_MILLIS);
     }
 
     public Duration changelogProducerFullCompactionTriggerInterval() {
@@ -582,20 +598,37 @@ public class CoreOptions implements Serializable {
     }
 
     /** Specifies the startup mode for log consumer. */
-    public enum LogStartupMode implements DescribedEnum {
+    public enum StartupMode implements DescribedEnum {
+        DEFAULT(
+                "deafult",
+                "Determines actual startup mode according to other table properties. "
+                        + "If \"scan.timestamp-millis\" is set the actual startup mode will be \"from-timestamp\". "
+                        + "Otherwise the actual startup mode will be \"full\"."),
+
         FULL(
                 "full",
-                "Perform a snapshot on the table upon first startup,"
-                        + " and continue to read the latest changes."),
-
-        LATEST("latest", "Start from the latest."),
-
-        FROM_TIMESTAMP("from-timestamp", "Start from user-supplied timestamp.");
+                "For streaming sources, produces a snapshot on the table upon first startup,"
+                        + " and continue to read the latest changes. "
+                        + "For batch sources, just produce a snapshot but does not read new changes."),
+
+        LATEST(
+                "latest",
+                "For streaming sources, continuously reads latest changes "
+                        + "without producing a snapshot at the beginning. "
+                        + "For batch sources, behaves the same as the \"full\" startup mode."),
+
+        FROM_TIMESTAMP(
+                "from-timestamp",
+                "For streaming sources, continuously reads changes "
+                        + "starting from timestamp specified by \"scan.timestamp-millis\", "
+                        + "without producing a snapshot at the beginning. "
+                        + "For batch sources, produces a snapshot at timestamp specified by \"scan.timestamp-millis\" "
+                        + "but does not read new changes.");
 
         private final String value;
         private final String description;
 
-        LogStartupMode(String value, String description) {
+        StartupMode(String value, String description) {
             this.value = value;
             this.description = description;
         }
@@ -707,8 +740,8 @@ public class CoreOptions implements Serializable {
      * @param options the options to set default values
      */
     public static void setDefaultValues(Configuration options) {
-        if (options.contains(LOG_SCAN_TIMESTAMP_MILLS) && !options.contains(LOG_SCAN)) {
-            options.set(LOG_SCAN, LogStartupMode.FROM_TIMESTAMP);
+        if (options.contains(SCAN_TIMESTAMP_MILLIS) && !options.contains(SCAN_MODE)) {
+            options.set(SCAN_MODE, StartupMode.FROM_TIMESTAMP);
         }
     }
 
@@ -721,22 +754,22 @@ public class CoreOptions implements Serializable {
      */
     public static void validateTableSchema(TableSchema schema) {
         CoreOptions options = new CoreOptions(schema.options());
-        if (options.logStartupMode() == LogStartupMode.FROM_TIMESTAMP) {
+        if (options.startupMode() == StartupMode.FROM_TIMESTAMP) {
             Preconditions.checkArgument(
                     options.logScanTimestampMills() != null,
                     String.format(
                             "%s can not be null when you use %s for %s",
-                            LOG_SCAN_TIMESTAMP_MILLS.key(),
-                            LogStartupMode.FROM_TIMESTAMP,
-                            LOG_SCAN.key()));
+                            SCAN_TIMESTAMP_MILLIS.key(),
+                            StartupMode.FROM_TIMESTAMP,
+                            SCAN_MODE.key()));
         } else {
             Preconditions.checkArgument(
                     options.logScanTimestampMills() == null,
                     String.format(
                             "%s should be %s when you set %s",
-                            LOG_SCAN.key(),
-                            LogStartupMode.FROM_TIMESTAMP,
-                            LOG_SCAN_TIMESTAMP_MILLS.key()));
+                            SCAN_MODE.key(),
+                            StartupMode.FROM_TIMESTAMP,
+                            SCAN_TIMESTAMP_MILLIS.key()));
         }
 
         Preconditions.checkArgument(
diff --git a/flink-table-store-core/src/main/java/org/apache/flink/table/store/table/source/TableStreamingReader.java b/flink-table-store-core/src/main/java/org/apache/flink/table/store/table/source/TableStreamingReader.java
index be2d05e5..b875d220 100644
--- a/flink-table-store-core/src/main/java/org/apache/flink/table/store/table/source/TableStreamingReader.java
+++ b/flink-table-store-core/src/main/java/org/apache/flink/table/store/table/source/TableStreamingReader.java
@@ -89,7 +89,7 @@ public class TableStreamingReader {
         }
         enumerator =
                 new DeltaSnapshotEnumerator(
-                        table.location(), scan, CoreOptions.LogStartupMode.FULL, null, null);
+                        table.location(), scan, CoreOptions.StartupMode.FULL, null, null);
     }
 
     @Nullable
diff --git a/flink-table-store-core/src/main/java/org/apache/flink/table/store/table/source/snapshot/DataFileSnapshotEnumerator.java b/flink-table-store-core/src/main/java/org/apache/flink/table/store/table/source/snapshot/DataFileSnapshotEnumerator.java
index c345eac5..54d201dd 100644
--- a/flink-table-store-core/src/main/java/org/apache/flink/table/store/table/source/snapshot/DataFileSnapshotEnumerator.java
+++ b/flink-table-store-core/src/main/java/org/apache/flink/table/store/table/source/snapshot/DataFileSnapshotEnumerator.java
@@ -41,7 +41,7 @@ public abstract class DataFileSnapshotEnumerator implements SnapshotEnumerator {
 
     private final SnapshotManager snapshotManager;
     private final DataTableScan scan;
-    private final CoreOptions.LogStartupMode startupMode;
+    private final CoreOptions.StartupMode startupMode;
     private @Nullable final Long startupMillis;
 
     private @Nullable Long nextSnapshotId;
@@ -49,7 +49,7 @@ public abstract class DataFileSnapshotEnumerator implements SnapshotEnumerator {
     public DataFileSnapshotEnumerator(
             Path tablePath,
             DataTableScan scan,
-            CoreOptions.LogStartupMode startupMode,
+            CoreOptions.StartupMode startupMode,
             @Nullable Long startupMillis,
             @Nullable Long nextSnapshotId) {
         this.snapshotManager = new SnapshotManager(tablePath);
@@ -86,9 +86,9 @@ public abstract class DataFileSnapshotEnumerator implements SnapshotEnumerator {
                         startupMillis,
                         String.format(
                                 "%s can not be null when you use %s for %s",
-                                CoreOptions.LOG_SCAN_TIMESTAMP_MILLS.key(),
-                                CoreOptions.LogStartupMode.FROM_TIMESTAMP,
-                                CoreOptions.LOG_SCAN.key()));
+                                CoreOptions.SCAN_TIMESTAMP_MILLIS.key(),
+                                CoreOptions.StartupMode.FROM_TIMESTAMP,
+                                CoreOptions.SCAN_MODE.key()));
                 startingSnapshotId = snapshotManager.earlierThanTimeMills(startupMillis);
                 plan = new DataTableScan.DataFilePlan(startingSnapshotId, Collections.emptyList());
                 break;
@@ -133,7 +133,7 @@ public abstract class DataFileSnapshotEnumerator implements SnapshotEnumerator {
     public static DataFileSnapshotEnumerator create(
             FileStoreTable table, DataTableScan scan, Long nextSnapshotId) {
         Path location = table.location();
-        CoreOptions.LogStartupMode startupMode = table.options().logStartupMode();
+        CoreOptions.StartupMode startupMode = table.options().startupMode();
         Long startupMillis = table.options().logScanTimestampMills();
 
         switch (table.options().changelogProducer()) {
diff --git a/flink-table-store-core/src/main/java/org/apache/flink/table/store/table/source/snapshot/DeltaSnapshotEnumerator.java b/flink-table-store-core/src/main/java/org/apache/flink/table/store/table/source/snapshot/DeltaSnapshotEnumerator.java
index 857ef88d..36642ff8 100644
--- a/flink-table-store-core/src/main/java/org/apache/flink/table/store/table/source/snapshot/DeltaSnapshotEnumerator.java
+++ b/flink-table-store-core/src/main/java/org/apache/flink/table/store/table/source/snapshot/DeltaSnapshotEnumerator.java
@@ -40,7 +40,7 @@ public class DeltaSnapshotEnumerator extends DataFileSnapshotEnumerator {
     public DeltaSnapshotEnumerator(
             Path tablePath,
             DataTableScan scan,
-            CoreOptions.LogStartupMode startupMode,
+            CoreOptions.StartupMode startupMode,
             @Nullable Long startupMillis,
             @Nullable Long nextSnapshotId) {
         super(tablePath, scan, startupMode, startupMillis, nextSnapshotId);
diff --git a/flink-table-store-core/src/main/java/org/apache/flink/table/store/table/source/snapshot/FullCompactionChangelogSnapshotEnumerator.java b/flink-table-store-core/src/main/java/org/apache/flink/table/store/table/source/snapshot/FullCompactionChangelogSnapshotEnumerator.java
index 85c90f72..ed9e5f16 100644
--- a/flink-table-store-core/src/main/java/org/apache/flink/table/store/table/source/snapshot/FullCompactionChangelogSnapshotEnumerator.java
+++ b/flink-table-store-core/src/main/java/org/apache/flink/table/store/table/source/snapshot/FullCompactionChangelogSnapshotEnumerator.java
@@ -46,7 +46,7 @@ public class FullCompactionChangelogSnapshotEnumerator extends DataFileSnapshotE
             Path tablePath,
             DataTableScan scan,
             int maxLevel,
-            CoreOptions.LogStartupMode startupMode,
+            CoreOptions.StartupMode startupMode,
             @Nullable Long startupMillis,
             @Nullable Long nextSnapshotId) {
         super(tablePath, scan.withLevel(maxLevel), startupMode, startupMillis, nextSnapshotId);
diff --git a/flink-table-store-core/src/main/java/org/apache/flink/table/store/table/source/snapshot/InputChangelogSnapshotEnumerator.java b/flink-table-store-core/src/main/java/org/apache/flink/table/store/table/source/snapshot/InputChangelogSnapshotEnumerator.java
index 03f221bf..fffe71f4 100644
--- a/flink-table-store-core/src/main/java/org/apache/flink/table/store/table/source/snapshot/InputChangelogSnapshotEnumerator.java
+++ b/flink-table-store-core/src/main/java/org/apache/flink/table/store/table/source/snapshot/InputChangelogSnapshotEnumerator.java
@@ -44,7 +44,7 @@ public class InputChangelogSnapshotEnumerator extends DataFileSnapshotEnumerator
     public InputChangelogSnapshotEnumerator(
             Path tablePath,
             DataTableScan scan,
-            CoreOptions.LogStartupMode startupMode,
+            CoreOptions.StartupMode startupMode,
             @Nullable Long startupMillis,
             Long nextSnapshotId) {
         super(tablePath, scan, startupMode, startupMillis, nextSnapshotId);
diff --git a/flink-table-store-core/src/test/java/org/apache/flink/table/store/CoreOptionsTest.java b/flink-table-store-core/src/test/java/org/apache/flink/table/store/CoreOptionsTest.java
new file mode 100644
index 00000000..f7c91231
--- /dev/null
+++ b/flink-table-store-core/src/test/java/org/apache/flink/table/store/CoreOptionsTest.java
@@ -0,0 +1,53 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.store;
+
+import org.apache.flink.configuration.Configuration;
+
+import org.junit.jupiter.api.Test;
+
+import static org.assertj.core.api.Assertions.assertThat;
+
+/** Tests for {@link CoreOptions}. */
+public class CoreOptionsTest {
+
+    @Test
+    public void testDefaultStartupMode() {
+        Configuration conf = new Configuration();
+        assertThat(conf.get(CoreOptions.SCAN_MODE)).isEqualTo(CoreOptions.StartupMode.DEFAULT);
+        assertThat(new CoreOptions(conf).startupMode()).isEqualTo(CoreOptions.StartupMode.FULL);
+
+        conf = new Configuration();
+        conf.set(CoreOptions.SCAN_TIMESTAMP_MILLIS, System.currentTimeMillis());
+        assertThat(new CoreOptions(conf).startupMode())
+                .isEqualTo(CoreOptions.StartupMode.FROM_TIMESTAMP);
+    }
+
+    @Test
+    public void testStartupModeCompatibility() {
+        Configuration conf = new Configuration();
+        conf.setString("log.scan", "latest");
+        assertThat(new CoreOptions(conf).startupMode()).isEqualTo(CoreOptions.StartupMode.LATEST);
+
+        conf = new Configuration();
+        conf.setString("log.scan.timestamp-millis", String.valueOf(System.currentTimeMillis()));
+        assertThat(new CoreOptions(conf).startupMode())
+                .isEqualTo(CoreOptions.StartupMode.FROM_TIMESTAMP);
+    }
+}
diff --git a/flink-table-store-core/src/test/java/org/apache/flink/table/store/table/ChangelogWithKeyFileStoreTableTest.java b/flink-table-store-core/src/test/java/org/apache/flink/table/store/table/ChangelogWithKeyFileStoreTableTest.java
index 8342bbd8..aae4a784 100644
--- a/flink-table-store-core/src/test/java/org/apache/flink/table/store/table/ChangelogWithKeyFileStoreTableTest.java
+++ b/flink-table-store-core/src/test/java/org/apache/flink/table/store/table/ChangelogWithKeyFileStoreTableTest.java
@@ -374,7 +374,7 @@ public class ChangelogWithKeyFileStoreTableTest extends FileStoreTableTestBase {
 
         SnapshotEnumerator enumerator =
                 new InputChangelogSnapshotEnumerator(
-                        tablePath, table.newScan(), CoreOptions.LogStartupMode.FULL, null, 1L);
+                        tablePath, table.newScan(), CoreOptions.StartupMode.FULL, null, 1L);
 
         FunctionWithException<Integer, Void, Exception> assertNextSnapshot =
                 i -> {
diff --git a/flink-table-store-core/src/test/java/org/apache/flink/table/store/table/source/snapshot/DeltaSnapshotEnumeratorTest.java b/flink-table-store-core/src/test/java/org/apache/flink/table/store/table/source/snapshot/DeltaSnapshotEnumeratorTest.java
index 65fe9f53..3252d022 100644
--- a/flink-table-store-core/src/test/java/org/apache/flink/table/store/table/source/snapshot/DeltaSnapshotEnumeratorTest.java
+++ b/flink-table-store-core/src/test/java/org/apache/flink/table/store/table/source/snapshot/DeltaSnapshotEnumeratorTest.java
@@ -44,7 +44,7 @@ public class DeltaSnapshotEnumeratorTest extends DataFileSnapshotEnumeratorTestB
         TableCommit commit = table.newCommit(commitUser);
         SnapshotEnumerator enumerator =
                 new DeltaSnapshotEnumerator(
-                        tablePath, table.newScan(), CoreOptions.LogStartupMode.FULL, null, null);
+                        tablePath, table.newScan(), CoreOptions.StartupMode.FULL, null, null);
 
         // first call without any snapshot, should return null
         assertThat(enumerator.enumerate()).isNull();
@@ -133,7 +133,7 @@ public class DeltaSnapshotEnumeratorTest extends DataFileSnapshotEnumeratorTestB
                 new DeltaSnapshotEnumerator(
                         tablePath,
                         table.newScan(),
-                        CoreOptions.LogStartupMode.FROM_TIMESTAMP,
+                        CoreOptions.StartupMode.FROM_TIMESTAMP,
                         startMillis,
                         null);
 
@@ -192,7 +192,7 @@ public class DeltaSnapshotEnumeratorTest extends DataFileSnapshotEnumeratorTestB
 
         SnapshotEnumerator enumerator =
                 new DeltaSnapshotEnumerator(
-                        tablePath, table.newScan(), CoreOptions.LogStartupMode.LATEST, null, null);
+                        tablePath, table.newScan(), CoreOptions.StartupMode.LATEST, null, null);
 
         DataTableScan.DataFilePlan plan = enumerator.enumerate();
         assertThat(plan.snapshotId).isEqualTo(2);
diff --git a/flink-table-store-core/src/test/java/org/apache/flink/table/store/table/source/snapshot/FullCompactionChangelogSnapshotEnumeratorTest.java b/flink-table-store-core/src/test/java/org/apache/flink/table/store/table/source/snapshot/FullCompactionChangelogSnapshotEnumeratorTest.java
index 72531f4f..9a3e890f 100644
--- a/flink-table-store-core/src/test/java/org/apache/flink/table/store/table/source/snapshot/FullCompactionChangelogSnapshotEnumeratorTest.java
+++ b/flink-table-store-core/src/test/java/org/apache/flink/table/store/table/source/snapshot/FullCompactionChangelogSnapshotEnumeratorTest.java
@@ -48,7 +48,7 @@ public class FullCompactionChangelogSnapshotEnumeratorTest
                         tablePath,
                         table.newScan(),
                         table.options().numLevels() - 1,
-                        CoreOptions.LogStartupMode.FULL,
+                        CoreOptions.StartupMode.FULL,
                         null,
                         null);
 
@@ -148,7 +148,7 @@ public class FullCompactionChangelogSnapshotEnumeratorTest
                         tablePath,
                         table.newScan(),
                         table.options().numLevels() - 1,
-                        CoreOptions.LogStartupMode.FROM_TIMESTAMP,
+                        CoreOptions.StartupMode.FROM_TIMESTAMP,
                         startMillis,
                         null);
 
@@ -206,7 +206,7 @@ public class FullCompactionChangelogSnapshotEnumeratorTest
                         tablePath,
                         table.newScan(),
                         table.options().numLevels() - 1,
-                        CoreOptions.LogStartupMode.LATEST,
+                        CoreOptions.StartupMode.LATEST,
                         null,
                         null);
 
diff --git a/flink-table-store-core/src/test/java/org/apache/flink/table/store/table/source/snapshot/InputChangelogSnapshotEnumeratorTest.java b/flink-table-store-core/src/test/java/org/apache/flink/table/store/table/source/snapshot/InputChangelogSnapshotEnumeratorTest.java
index 5760110c..2c2fc091 100644
--- a/flink-table-store-core/src/test/java/org/apache/flink/table/store/table/source/snapshot/InputChangelogSnapshotEnumeratorTest.java
+++ b/flink-table-store-core/src/test/java/org/apache/flink/table/store/table/source/snapshot/InputChangelogSnapshotEnumeratorTest.java
@@ -44,7 +44,7 @@ public class InputChangelogSnapshotEnumeratorTest extends DataFileSnapshotEnumer
         TableCommit commit = table.newCommit(commitUser);
         SnapshotEnumerator enumerator =
                 new InputChangelogSnapshotEnumerator(
-                        tablePath, table.newScan(), CoreOptions.LogStartupMode.FULL, null, null);
+                        tablePath, table.newScan(), CoreOptions.StartupMode.FULL, null, null);
 
         // first call without any snapshot, should return null
         assertThat(enumerator.enumerate()).isNull();
@@ -134,7 +134,7 @@ public class InputChangelogSnapshotEnumeratorTest extends DataFileSnapshotEnumer
                 new InputChangelogSnapshotEnumerator(
                         tablePath,
                         table.newScan(),
-                        CoreOptions.LogStartupMode.FROM_TIMESTAMP,
+                        CoreOptions.StartupMode.FROM_TIMESTAMP,
                         startMillis,
                         null);
 
@@ -194,7 +194,7 @@ public class InputChangelogSnapshotEnumeratorTest extends DataFileSnapshotEnumer
 
         SnapshotEnumerator enumerator =
                 new InputChangelogSnapshotEnumerator(
-                        tablePath, table.newScan(), CoreOptions.LogStartupMode.LATEST, null, null);
+                        tablePath, table.newScan(), CoreOptions.StartupMode.LATEST, null, null);
 
         DataTableScan.DataFilePlan plan = enumerator.enumerate();
         assertThat(plan.snapshotId).isEqualTo(2);
diff --git a/flink-table-store-kafka/src/main/java/org/apache/flink/table/store/kafka/KafkaLogSourceProvider.java b/flink-table-store-kafka/src/main/java/org/apache/flink/table/store/kafka/KafkaLogSourceProvider.java
index c1bcc617..ba3ab9e8 100644
--- a/flink-table-store-kafka/src/main/java/org/apache/flink/table/store/kafka/KafkaLogSourceProvider.java
+++ b/flink-table-store-kafka/src/main/java/org/apache/flink/table/store/kafka/KafkaLogSourceProvider.java
@@ -25,7 +25,7 @@ import org.apache.flink.connector.kafka.source.enumerator.initializer.OffsetsIni
 import org.apache.flink.connector.kafka.source.reader.deserializer.KafkaRecordDeserializationSchema;
 import org.apache.flink.table.data.RowData;
 import org.apache.flink.table.store.CoreOptions.LogConsistency;
-import org.apache.flink.table.store.CoreOptions.LogStartupMode;
+import org.apache.flink.table.store.CoreOptions.StartupMode;
 import org.apache.flink.table.store.log.LogSourceProvider;
 import org.apache.flink.table.types.DataType;
 
@@ -61,7 +61,7 @@ public class KafkaLogSourceProvider implements LogSourceProvider {
 
     private final LogConsistency consistency;
 
-    private final LogStartupMode scanMode;
+    private final StartupMode scanMode;
 
     @Nullable private final Long timestampMills;
 
@@ -74,7 +74,7 @@ public class KafkaLogSourceProvider implements LogSourceProvider {
             DeserializationSchema<RowData> valueDeserializer,
             @Nullable int[][] projectFields,
             LogConsistency consistency,
-            LogStartupMode scanMode,
+            StartupMode scanMode,
             @Nullable Long timestampMills) {
         this.topic = topic;
         this.properties = properties;
diff --git a/flink-table-store-kafka/src/main/java/org/apache/flink/table/store/kafka/KafkaLogStoreFactory.java b/flink-table-store-kafka/src/main/java/org/apache/flink/table/store/kafka/KafkaLogStoreFactory.java
index d05a06cc..6a76e2eb 100644
--- a/flink-table-store-kafka/src/main/java/org/apache/flink/table/store/kafka/KafkaLogStoreFactory.java
+++ b/flink-table-store-kafka/src/main/java/org/apache/flink/table/store/kafka/KafkaLogStoreFactory.java
@@ -31,6 +31,7 @@ import org.apache.flink.table.connector.source.DynamicTableSource;
 import org.apache.flink.table.data.RowData;
 import org.apache.flink.table.factories.DynamicTableFactory;
 import org.apache.flink.table.factories.FactoryUtil;
+import org.apache.flink.table.store.CoreOptions;
 import org.apache.flink.table.store.log.LogStoreTableFactory;
 import org.apache.flink.table.types.DataType;
 import org.apache.flink.table.types.utils.DataTypeUtils;
@@ -58,9 +59,8 @@ import static org.apache.flink.table.factories.FactoryUtil.createTableFactoryHel
 import static org.apache.flink.table.store.CoreOptions.LOG_CHANGELOG_MODE;
 import static org.apache.flink.table.store.CoreOptions.LOG_CONSISTENCY;
 import static org.apache.flink.table.store.CoreOptions.LOG_RETENTION;
-import static org.apache.flink.table.store.CoreOptions.LOG_SCAN;
-import static org.apache.flink.table.store.CoreOptions.LOG_SCAN_TIMESTAMP_MILLS;
 import static org.apache.flink.table.store.CoreOptions.LogConsistency;
+import static org.apache.flink.table.store.CoreOptions.SCAN_TIMESTAMP_MILLIS;
 import static org.apache.flink.table.store.kafka.KafkaLogOptions.BOOTSTRAP_SERVERS;
 import static org.apache.flink.table.store.kafka.KafkaLogOptions.TOPIC;
 import static org.apache.kafka.clients.consumer.ConsumerConfig.ISOLATION_LEVEL_CONFIG;
@@ -198,8 +198,9 @@ public class KafkaLogStoreFactory implements LogStoreTableFactory {
                 valueDeserializer,
                 projectFields,
                 helper.getOptions().get(LOG_CONSISTENCY),
-                helper.getOptions().get(LOG_SCAN),
-                helper.getOptions().get(LOG_SCAN_TIMESTAMP_MILLS));
+                // TODO visit all options through CoreOptions
+                CoreOptions.startupMode(helper.getOptions()),
+                helper.getOptions().get(SCAN_TIMESTAMP_MILLIS));
     }
 
     @Override