You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by lz...@apache.org on 2022/07/05 06:19:57 UTC

[flink-table-store] branch master updated: [FLINK-28118] Introduce TableStoreOptions to merge all options

This is an automated email from the ASF dual-hosted git repository.

lzljs3620320 pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/flink-table-store.git


The following commit(s) were added to refs/heads/master by this push:
     new 80b69f77 [FLINK-28118] Introduce TableStoreOptions to merge all options
80b69f77 is described below

commit 80b69f77af418da669bbeb70ee7e7320aafdb945
Author: liliwei <hi...@gmail.com>
AuthorDate: Tue Jul 5 14:19:51 2022 +0800

    [FLINK-28118] Introduce TableStoreOptions to merge all options
    
    This closes #196
---
 .../store/connector/TableStoreManagedFactory.java  |  18 +-
 .../store/connector/AbstractTableStoreFactory.java |  28 +-
 .../flink/table/store/connector/FlinkCatalog.java  |   2 +-
 .../store/connector/sink/FlinkSinkBuilder.java     |   4 +-
 .../table/store/connector/sink/TableStoreSink.java |   8 +-
 .../store/connector/source/FlinkSourceBuilder.java |  18 +-
 .../store/connector/source/TableStoreSource.java   |  12 +-
 .../ComputedColumnAndWatermarkTableITCase.java     |  20 +-
 .../table/store/connector/CreateTableITCase.java   |  12 +-
 .../table/store/connector/DropTableITCase.java     |   6 +-
 .../table/store/connector/FileStoreITCase.java     |  10 +-
 .../store/connector/FileStoreTableITCase.java      |   4 +-
 .../store/connector/ReadWriteTableTestBase.java    |  16 +-
 .../table/store/connector/RescaleBucketITCase.java |   2 +-
 .../connector/TableStoreManagedFactoryTest.java    |  29 +-
 .../table/store/connector/TableStoreTestBase.java  |   8 +-
 .../store/connector/sink/LogStoreSinkITCase.java   |   8 +-
 .../source/TestChangelogDataReadWrite.java         |   7 +-
 .../org/apache/flink/table/store/CoreOptions.java  | 582 +++++++++++++++++++++
 .../flink/table/store/file/AbstractFileStore.java  |   7 +-
 .../table/store/file/AppendOnlyFileStore.java      |   5 +-
 .../flink/table/store/file/KeyValueFileStore.java  |   5 +-
 .../file/operation/KeyValueFileStoreWrite.java     |   6 +-
 .../store/file/utils/FileStorePathFactory.java     |   4 +-
 .../table/store/log/LogStoreTableFactory.java      |  18 +-
 .../store/table/AppendOnlyFileStoreTable.java      |   4 +-
 .../table/ChangelogValueCountFileStoreTable.java   |   4 +-
 .../table/ChangelogWithKeyFileStoreTable.java      |   6 +-
 .../table/store/table/FileStoreTableFactory.java   |   8 +-
 .../table/store/table/sink/MemoryTableWrite.java   |  11 +-
 .../flink/table/store/file/FileFormatTest.java     |   5 +-
 .../flink/table/store/file/TestFileStore.java      |  22 +-
 .../store/file/data/AppendOnlyWriterTest.java      |   4 +-
 .../store/file/data/DataFilePathFactoryTest.java   |  10 +-
 .../store/file/manifest/ManifestFileMetaTest.java  |   4 +-
 .../store/file/manifest/ManifestFileTest.java      |   4 +-
 .../store/file/manifest/ManifestListTest.java      |   4 +-
 .../table/store/file/mergetree/MergeTreeTest.java  |  11 +-
 .../store/file/utils/FileStorePathFactoryTest.java |   4 +-
 .../store/table/AppendOnlyFileStoreTableTest.java  |   8 +-
 .../ChangelogValueCountFileStoreTableTest.java     |   8 +-
 .../table/ChangelogWithKeyFileStoreTableTest.java  |  11 +-
 .../table/store/table/WritePreemptMemoryTest.java  |  13 +-
 .../table/store/mapred/TableStoreInputFormat.java  |   4 +-
 .../flink/table/store/FileStoreTestUtils.java      |   5 +-
 .../hive/TableStoreHiveStorageHandlerITCase.java   |  26 +-
 .../store/mapred/TableStoreRecordReaderTest.java   |  10 +-
 .../store/kafka/KafkaLogSerializationSchema.java   |   2 +-
 .../table/store/kafka/KafkaLogSinkProvider.java    |   4 +-
 .../table/store/kafka/KafkaLogSourceProvider.java  |   4 +-
 .../table/store/kafka/KafkaLogStoreFactory.java    |  44 +-
 .../flink/table/store/kafka/KafkaLogITCase.java    |   4 +-
 .../store/kafka/KafkaLogSerializationTest.java     |   2 +-
 .../flink/table/store/kafka/KafkaLogTestUtils.java |  12 +-
 .../table/store/spark/SimpleTableTestHelper.java   |   4 +-
 55 files changed, 838 insertions(+), 263 deletions(-)

diff --git a/flink-table-store-connector/src/main/1.15.0/org/apache/flink/table/store/connector/TableStoreManagedFactory.java b/flink-table-store-connector/src/main/1.15.0/org/apache/flink/table/store/connector/TableStoreManagedFactory.java
index 9d326646..431e5c9b 100644
--- a/flink-table-store-connector/src/main/1.15.0/org/apache/flink/table/store/connector/TableStoreManagedFactory.java
+++ b/flink-table-store-connector/src/main/1.15.0/org/apache/flink/table/store/connector/TableStoreManagedFactory.java
@@ -24,7 +24,7 @@ import org.apache.flink.table.api.TableException;
 import org.apache.flink.table.catalog.CatalogPartitionSpec;
 import org.apache.flink.table.catalog.ObjectIdentifier;
 import org.apache.flink.table.factories.ManagedTableFactory;
-import org.apache.flink.table.store.file.FileStoreOptions;
+import org.apache.flink.table.store.CoreOptions;
 import org.apache.flink.table.store.file.schema.SchemaManager;
 import org.apache.flink.table.store.file.schema.UpdateSchema;
 import org.apache.flink.table.store.file.utils.JsonSerdeUtil;
@@ -38,15 +38,15 @@ import java.util.Map;
 import java.util.Objects;
 import java.util.Optional;
 
+import static org.apache.flink.table.store.CoreOptions.BUCKET;
+import static org.apache.flink.table.store.CoreOptions.LOG_PREFIX;
+import static org.apache.flink.table.store.CoreOptions.PATH;
+import static org.apache.flink.table.store.CoreOptions.TABLE_STORE_PREFIX;
+import static org.apache.flink.table.store.CoreOptions.WRITE_MODE;
 import static org.apache.flink.table.store.connector.TableStoreFactoryOptions.COMPACTION_MANUAL_TRIGGERED;
 import static org.apache.flink.table.store.connector.TableStoreFactoryOptions.COMPACTION_PARTITION_SPEC;
 import static org.apache.flink.table.store.connector.TableStoreFactoryOptions.ROOT_PATH;
-import static org.apache.flink.table.store.file.FileStoreOptions.BUCKET;
-import static org.apache.flink.table.store.file.FileStoreOptions.PATH;
-import static org.apache.flink.table.store.file.FileStoreOptions.TABLE_STORE_PREFIX;
-import static org.apache.flink.table.store.file.FileStoreOptions.WRITE_MODE;
 import static org.apache.flink.table.store.file.WriteMode.APPEND_ONLY;
-import static org.apache.flink.table.store.log.LogOptions.LOG_PREFIX;
 
 /** Default implementation of {@link ManagedTableFactory}. */
 public class TableStoreManagedFactory extends AbstractTableStoreFactory
@@ -103,7 +103,7 @@ public class TableStoreManagedFactory extends AbstractTableStoreFactory
     @Override
     public void onCreateTable(Context context, boolean ignoreIfExists) {
         Map<String, String> options = context.getCatalogTable().getOptions();
-        Path path = FileStoreOptions.path(options);
+        Path path = CoreOptions.path(options);
         try {
             if (path.getFileSystem().exists(path) && !ignoreIfExists) {
                 throw new TableException(
@@ -161,7 +161,7 @@ public class TableStoreManagedFactory extends AbstractTableStoreFactory
 
     @Override
     public void onDropTable(Context context, boolean ignoreIfNotExists) {
-        Path path = FileStoreOptions.path(context.getCatalogTable().getOptions());
+        Path path = CoreOptions.path(context.getCatalogTable().getOptions());
         try {
             if (path.getFileSystem().exists(path)) {
                 path.getFileSystem().delete(path, true);
@@ -186,7 +186,7 @@ public class TableStoreManagedFactory extends AbstractTableStoreFactory
     public Map<String, String> onCompactTable(
             Context context, CatalogPartitionSpec catalogPartitionSpec) {
         Map<String, String> newOptions = new HashMap<>(context.getCatalogTable().getOptions());
-        if (APPEND_ONLY.toString().equals(newOptions.get(FileStoreOptions.WRITE_MODE.key()))) {
+        if (APPEND_ONLY.toString().equals(newOptions.get(CoreOptions.WRITE_MODE.key()))) {
             throw new UnsupportedOperationException(
                     "ALTER TABLE COMPACT is not yet supported for append only table.");
         }
diff --git a/flink-table-store-connector/src/main/java/org/apache/flink/table/store/connector/AbstractTableStoreFactory.java b/flink-table-store-connector/src/main/java/org/apache/flink/table/store/connector/AbstractTableStoreFactory.java
index 8f6659c3..54de82ff 100644
--- a/flink-table-store-connector/src/main/java/org/apache/flink/table/store/connector/AbstractTableStoreFactory.java
+++ b/flink-table-store-connector/src/main/java/org/apache/flink/table/store/connector/AbstractTableStoreFactory.java
@@ -27,13 +27,11 @@ import org.apache.flink.table.api.ValidationException;
 import org.apache.flink.table.factories.DynamicTableFactory;
 import org.apache.flink.table.factories.DynamicTableSinkFactory;
 import org.apache.flink.table.factories.DynamicTableSourceFactory;
+import org.apache.flink.table.store.CoreOptions;
 import org.apache.flink.table.store.connector.sink.TableStoreSink;
 import org.apache.flink.table.store.connector.source.TableStoreSource;
-import org.apache.flink.table.store.file.FileStoreOptions;
-import org.apache.flink.table.store.file.mergetree.MergeTreeOptions;
 import org.apache.flink.table.store.file.schema.TableSchema;
 import org.apache.flink.table.store.file.schema.UpdateSchema;
-import org.apache.flink.table.store.log.LogOptions;
 import org.apache.flink.table.store.log.LogStoreTableFactory;
 import org.apache.flink.table.store.table.FileStoreTable;
 import org.apache.flink.table.store.table.FileStoreTableFactory;
@@ -47,11 +45,11 @@ import java.util.Optional;
 import java.util.Set;
 import java.util.stream.Collectors;
 
+import static org.apache.flink.table.store.CoreOptions.LOG_CHANGELOG_MODE;
+import static org.apache.flink.table.store.CoreOptions.LOG_CONSISTENCY;
+import static org.apache.flink.table.store.CoreOptions.LOG_PREFIX;
+import static org.apache.flink.table.store.CoreOptions.LOG_SCAN;
 import static org.apache.flink.table.store.connector.TableStoreFactoryOptions.LOG_SYSTEM;
-import static org.apache.flink.table.store.log.LogOptions.CHANGELOG_MODE;
-import static org.apache.flink.table.store.log.LogOptions.CONSISTENCY;
-import static org.apache.flink.table.store.log.LogOptions.LOG_PREFIX;
-import static org.apache.flink.table.store.log.LogOptions.SCAN;
 import static org.apache.flink.table.store.log.LogStoreTableFactory.discoverLogStoreFactory;
 
 /** Abstract table store factory to create table source and table sink. */
@@ -85,8 +83,8 @@ public abstract class AbstractTableStoreFactory
 
     @Override
     public Set<ConfigOption<?>> optionalOptions() {
-        Set<ConfigOption<?>> options = FileStoreOptions.allOptions();
-        options.addAll(MergeTreeOptions.allOptions());
+        Set<ConfigOption<?>> options = CoreOptions.allOptions();
+        options.addAll(CoreOptions.allOptions());
         options.addAll(TableStoreFactoryOptions.allOptions());
         return options;
     }
@@ -115,18 +113,18 @@ public abstract class AbstractTableStoreFactory
 
     private static void validateFileStoreContinuous(Configuration options) {
         Configuration logOptions = new DelegatingConfiguration(options, LOG_PREFIX);
-        LogOptions.LogChangelogMode changelogMode = logOptions.get(CHANGELOG_MODE);
-        if (changelogMode == LogOptions.LogChangelogMode.UPSERT) {
+        CoreOptions.LogChangelogMode changelogMode = logOptions.get(LOG_CHANGELOG_MODE);
+        if (changelogMode == CoreOptions.LogChangelogMode.UPSERT) {
             throw new ValidationException(
                     "File store continuous reading dose not support upsert changelog mode.");
         }
-        LogOptions.LogConsistency consistency = logOptions.get(CONSISTENCY);
-        if (consistency == LogOptions.LogConsistency.EVENTUAL) {
+        CoreOptions.LogConsistency consistency = logOptions.get(LOG_CONSISTENCY);
+        if (consistency == CoreOptions.LogConsistency.EVENTUAL) {
             throw new ValidationException(
                     "File store continuous reading dose not support eventual consistency mode.");
         }
-        LogOptions.LogStartupMode startupMode = logOptions.get(SCAN);
-        if (startupMode == LogOptions.LogStartupMode.FROM_TIMESTAMP) {
+        CoreOptions.LogStartupMode startupMode = logOptions.get(LOG_SCAN);
+        if (startupMode == CoreOptions.LogStartupMode.FROM_TIMESTAMP) {
             throw new ValidationException(
                     "File store continuous reading dose not support from_timestamp scan mode, "
                             + "you can add timestamp filters instead.");
diff --git a/flink-table-store-connector/src/main/java/org/apache/flink/table/store/connector/FlinkCatalog.java b/flink-table-store-connector/src/main/java/org/apache/flink/table/store/connector/FlinkCatalog.java
index 6077e57d..11d79dd4 100644
--- a/flink-table-store-connector/src/main/java/org/apache/flink/table/store/connector/FlinkCatalog.java
+++ b/flink-table-store-connector/src/main/java/org/apache/flink/table/store/connector/FlinkCatalog.java
@@ -52,8 +52,8 @@ import java.util.Map;
 import java.util.Optional;
 
 import static org.apache.flink.table.factories.FactoryUtil.CONNECTOR;
+import static org.apache.flink.table.store.CoreOptions.PATH;
 import static org.apache.flink.table.store.connector.FlinkCatalogFactory.IDENTIFIER;
-import static org.apache.flink.table.store.file.FileStoreOptions.PATH;
 
 /** Catalog for table store. */
 public class FlinkCatalog extends AbstractCatalog {
diff --git a/flink-table-store-connector/src/main/java/org/apache/flink/table/store/connector/sink/FlinkSinkBuilder.java b/flink-table-store-connector/src/main/java/org/apache/flink/table/store/connector/sink/FlinkSinkBuilder.java
index 91141ece..d0d9cccb 100644
--- a/flink-table-store-connector/src/main/java/org/apache/flink/table/store/connector/sink/FlinkSinkBuilder.java
+++ b/flink-table-store-connector/src/main/java/org/apache/flink/table/store/connector/sink/FlinkSinkBuilder.java
@@ -24,8 +24,8 @@ import org.apache.flink.streaming.api.datastream.DataStreamSink;
 import org.apache.flink.streaming.api.transformations.PartitionTransformation;
 import org.apache.flink.table.catalog.ObjectIdentifier;
 import org.apache.flink.table.data.RowData;
+import org.apache.flink.table.store.CoreOptions;
 import org.apache.flink.table.store.connector.TableStoreFactoryOptions;
-import org.apache.flink.table.store.file.FileStoreOptions;
 import org.apache.flink.table.store.file.utils.JsonSerdeUtil;
 import org.apache.flink.table.store.table.FileStoreTable;
 import org.apache.flink.table.store.table.sink.LogSinkFunction;
@@ -89,7 +89,7 @@ public class FlinkSinkBuilder {
     }
 
     public DataStreamSink<?> build() {
-        int numBucket = conf.get(FileStoreOptions.BUCKET);
+        int numBucket = conf.get(CoreOptions.BUCKET);
 
         BucketStreamPartitioner partitioner =
                 new BucketStreamPartitioner(numBucket, table.schema());
diff --git a/flink-table-store-connector/src/main/java/org/apache/flink/table/store/connector/sink/TableStoreSink.java b/flink-table-store-connector/src/main/java/org/apache/flink/table/store/connector/sink/TableStoreSink.java
index 81ad0dba..0f4e5dac 100644
--- a/flink-table-store-connector/src/main/java/org/apache/flink/table/store/connector/sink/TableStoreSink.java
+++ b/flink-table-store-connector/src/main/java/org/apache/flink/table/store/connector/sink/TableStoreSink.java
@@ -28,9 +28,9 @@ import org.apache.flink.table.connector.sink.DynamicTableSink;
 import org.apache.flink.table.connector.sink.abilities.SupportsOverwrite;
 import org.apache.flink.table.connector.sink.abilities.SupportsPartitioning;
 import org.apache.flink.table.factories.DynamicTableFactory;
+import org.apache.flink.table.store.CoreOptions;
 import org.apache.flink.table.store.connector.TableStoreDataStreamSinkProvider;
 import org.apache.flink.table.store.connector.TableStoreFactoryOptions;
-import org.apache.flink.table.store.log.LogOptions;
 import org.apache.flink.table.store.log.LogSinkProvider;
 import org.apache.flink.table.store.log.LogStoreTableFactory;
 import org.apache.flink.table.store.table.AppendOnlyFileStoreTable;
@@ -78,8 +78,10 @@ public class TableStoreSink implements DynamicTableSink, SupportsOverwrite, Supp
         } else if (table instanceof ChangelogWithKeyFileStoreTable) {
             Configuration logOptions =
                     new DelegatingConfiguration(
-                            Configuration.fromMap(table.schema().options()), LogOptions.LOG_PREFIX);
-            if (logOptions.get(LogOptions.CHANGELOG_MODE) != LogOptions.LogChangelogMode.ALL) {
+                            Configuration.fromMap(table.schema().options()),
+                            CoreOptions.LOG_PREFIX);
+            if (logOptions.get(CoreOptions.LOG_CHANGELOG_MODE)
+                    != CoreOptions.LogChangelogMode.ALL) {
                 // with primary key, default sink upsert
                 ChangelogMode.Builder builder = ChangelogMode.newBuilder();
                 for (RowKind kind : requestedMode.getContainedKinds()) {
diff --git a/flink-table-store-connector/src/main/java/org/apache/flink/table/store/connector/source/FlinkSourceBuilder.java b/flink-table-store-connector/src/main/java/org/apache/flink/table/store/connector/source/FlinkSourceBuilder.java
index ebeb5482..396f6d94 100644
--- a/flink-table-store-connector/src/main/java/org/apache/flink/table/store/connector/source/FlinkSourceBuilder.java
+++ b/flink-table-store-connector/src/main/java/org/apache/flink/table/store/connector/source/FlinkSourceBuilder.java
@@ -30,10 +30,9 @@ import org.apache.flink.table.api.ValidationException;
 import org.apache.flink.table.catalog.ObjectIdentifier;
 import org.apache.flink.table.data.RowData;
 import org.apache.flink.table.runtime.typeutils.InternalTypeInfo;
+import org.apache.flink.table.store.CoreOptions;
 import org.apache.flink.table.store.connector.TableStoreFactoryOptions;
-import org.apache.flink.table.store.file.FileStoreOptions;
 import org.apache.flink.table.store.file.predicate.Predicate;
-import org.apache.flink.table.store.log.LogOptions;
 import org.apache.flink.table.store.log.LogSourceProvider;
 import org.apache.flink.table.store.table.FileStoreTable;
 import org.apache.flink.table.store.utils.Projection;
@@ -95,7 +94,7 @@ public class FlinkSourceBuilder {
     }
 
     private long discoveryIntervalMills() {
-        return conf.get(FileStoreOptions.CONTINUOUS_DISCOVERY_INTERVAL).toMillis();
+        return conf.get(CoreOptions.CONTINUOUS_DISCOVERY_INTERVAL).toMillis();
     }
 
     private FileStoreSource buildFileSource(boolean isContinuous, boolean continuousScanLatest) {
@@ -112,18 +111,19 @@ public class FlinkSourceBuilder {
         if (isContinuous) {
             // TODO move validation to a dedicated method
             if (table.schema().primaryKeys().size() > 0
-                    && conf.get(FileStoreOptions.MERGE_ENGINE)
-                            == FileStoreOptions.MergeEngine.PARTIAL_UPDATE) {
+                    && conf.get(CoreOptions.MERGE_ENGINE)
+                            == CoreOptions.MergeEngine.PARTIAL_UPDATE) {
                 throw new ValidationException(
                         "Partial update continuous reading is not supported.");
             }
 
-            LogOptions.LogStartupMode startupMode =
-                    new DelegatingConfiguration(conf, LogOptions.LOG_PREFIX).get(LogOptions.SCAN);
+            CoreOptions.LogStartupMode startupMode =
+                    new DelegatingConfiguration(conf, CoreOptions.LOG_PREFIX)
+                            .get(CoreOptions.LOG_SCAN);
             if (logSourceProvider == null) {
-                return buildFileSource(true, startupMode == LogOptions.LogStartupMode.LATEST);
+                return buildFileSource(true, startupMode == CoreOptions.LogStartupMode.LATEST);
             } else {
-                if (startupMode != LogOptions.LogStartupMode.FULL) {
+                if (startupMode != CoreOptions.LogStartupMode.FULL) {
                     return logSourceProvider.createSource(null);
                 }
                 return HybridSource.<RowData, StaticFileStoreSplitEnumerator>builder(
diff --git a/flink-table-store-connector/src/main/java/org/apache/flink/table/store/connector/source/TableStoreSource.java b/flink-table-store-connector/src/main/java/org/apache/flink/table/store/connector/source/TableStoreSource.java
index 5d03ee02..880c6a9c 100644
--- a/flink-table-store-connector/src/main/java/org/apache/flink/table/store/connector/source/TableStoreSource.java
+++ b/flink-table-store-connector/src/main/java/org/apache/flink/table/store/connector/source/TableStoreSource.java
@@ -28,12 +28,12 @@ import org.apache.flink.table.connector.source.abilities.SupportsFilterPushDown;
 import org.apache.flink.table.connector.source.abilities.SupportsProjectionPushDown;
 import org.apache.flink.table.expressions.ResolvedExpression;
 import org.apache.flink.table.factories.DynamicTableFactory;
+import org.apache.flink.table.store.CoreOptions;
 import org.apache.flink.table.store.connector.TableStoreDataStreamScanProvider;
 import org.apache.flink.table.store.connector.TableStoreFactoryOptions;
 import org.apache.flink.table.store.file.predicate.Predicate;
 import org.apache.flink.table.store.file.predicate.PredicateBuilder;
 import org.apache.flink.table.store.file.predicate.PredicateConverter;
-import org.apache.flink.table.store.log.LogOptions;
 import org.apache.flink.table.store.log.LogSourceProvider;
 import org.apache.flink.table.store.log.LogStoreTableFactory;
 import org.apache.flink.table.store.table.AppendOnlyFileStoreTable;
@@ -94,10 +94,12 @@ public class TableStoreSource
             // normalized nodes. See TableStoreSink.getChangelogMode validation.
             Configuration logOptions =
                     new DelegatingConfiguration(
-                            Configuration.fromMap(table.schema().options()), LogOptions.LOG_PREFIX);
-            return logOptions.get(LogOptions.CONSISTENCY) == LogOptions.LogConsistency.TRANSACTIONAL
-                            && logOptions.get(LogOptions.CHANGELOG_MODE)
-                                    == LogOptions.LogChangelogMode.ALL
+                            Configuration.fromMap(table.schema().options()),
+                            CoreOptions.LOG_PREFIX);
+            return logOptions.get(CoreOptions.LOG_CONSISTENCY)
+                                    == CoreOptions.LogConsistency.TRANSACTIONAL
+                            && logOptions.get(CoreOptions.LOG_CHANGELOG_MODE)
+                                    == CoreOptions.LogChangelogMode.ALL
                     ? ChangelogMode.all()
                     : ChangelogMode.upsert();
         } else {
diff --git a/flink-table-store-connector/src/test/java/org/apache/flink/table/store/connector/ComputedColumnAndWatermarkTableITCase.java b/flink-table-store-connector/src/test/java/org/apache/flink/table/store/connector/ComputedColumnAndWatermarkTableITCase.java
index a7fe99b3..be277d35 100644
--- a/flink-table-store-connector/src/test/java/org/apache/flink/table/store/connector/ComputedColumnAndWatermarkTableITCase.java
+++ b/flink-table-store-connector/src/test/java/org/apache/flink/table/store/connector/ComputedColumnAndWatermarkTableITCase.java
@@ -19,7 +19,7 @@
 package org.apache.flink.table.store.connector;
 
 import org.apache.flink.api.java.tuple.Tuple2;
-import org.apache.flink.table.store.log.LogOptions;
+import org.apache.flink.table.store.CoreOptions;
 
 import org.junit.Test;
 
@@ -29,9 +29,9 @@ import java.util.Collections;
 import java.util.stream.Collectors;
 
 import static org.apache.flink.table.planner.factories.TestValuesTableFactory.changelogRow;
+import static org.apache.flink.table.store.CoreOptions.LOG_PREFIX;
 import static org.apache.flink.table.store.connector.ReadWriteTableTestUtil.rates;
 import static org.apache.flink.table.store.connector.ReadWriteTableTestUtil.ratesWithTimestamp;
-import static org.apache.flink.table.store.log.LogOptions.LOG_PREFIX;
 
 /** Table store IT case when the managed table has computed column and watermark spec. */
 public class ComputedColumnAndWatermarkTableITCase extends ReadWriteTableTestBase {
@@ -146,8 +146,8 @@ public class ComputedColumnAndWatermarkTableITCase extends ReadWriteTableTestBas
                         null,
                         false,
                         Collections.singletonMap(
-                                LOG_PREFIX + LogOptions.SCAN.key(),
-                                LogOptions.LogStartupMode.LATEST.name().toLowerCase()),
+                                LOG_PREFIX + CoreOptions.LOG_SCAN.key(),
+                                CoreOptions.LogStartupMode.LATEST.name().toLowerCase()),
                         "rate_by_to_currency IS NULL",
                         Arrays.asList(
                                 "corrected_rate_by_to_currency",
@@ -191,8 +191,8 @@ public class ComputedColumnAndWatermarkTableITCase extends ReadWriteTableTestBas
                         WatermarkSpec.of("ts", "ts - INTERVAL '3' YEAR"),
                         false,
                         Collections.singletonMap(
-                                LOG_PREFIX + LogOptions.SCAN.key(),
-                                LogOptions.LogStartupMode.LATEST.name().toLowerCase()),
+                                LOG_PREFIX + CoreOptions.LOG_SCAN.key(),
+                                CoreOptions.LogStartupMode.LATEST.name().toLowerCase()),
                         lateEventFilter,
                         Collections.emptyList(), // projection
                         Collections.singletonList(
@@ -215,8 +215,8 @@ public class ComputedColumnAndWatermarkTableITCase extends ReadWriteTableTestBas
                         WatermarkSpec.of("ts1", "ts1 - INTERVAL '3' YEAR"),
                         false,
                         Collections.singletonMap(
-                                LOG_PREFIX + LogOptions.SCAN.key(),
-                                LogOptions.LogStartupMode.LATEST.name().toLowerCase()),
+                                LOG_PREFIX + CoreOptions.LOG_SCAN.key(),
+                                CoreOptions.LogStartupMode.LATEST.name().toLowerCase()),
                         lateEventFilter.replaceAll("ts", "ts1"),
                         Arrays.asList("currency", "rate", "ts1"),
                         Collections.singletonList(
@@ -240,8 +240,8 @@ public class ComputedColumnAndWatermarkTableITCase extends ReadWriteTableTestBas
                         WatermarkSpec.of("ts", "ts - INTERVAL '3' YEAR"),
                         false,
                         Collections.singletonMap(
-                                LOG_PREFIX + LogOptions.SCAN.key(),
-                                LogOptions.LogStartupMode.LATEST.name().toLowerCase()),
+                                LOG_PREFIX + CoreOptions.LOG_SCAN.key(),
+                                CoreOptions.LogStartupMode.LATEST.name().toLowerCase()),
                         lateEventFilter,
                         Arrays.asList(
                                 "currency",
diff --git a/flink-table-store-connector/src/test/java/org/apache/flink/table/store/connector/CreateTableITCase.java b/flink-table-store-connector/src/test/java/org/apache/flink/table/store/connector/CreateTableITCase.java
index cbbe4b4b..9e3be19f 100644
--- a/flink-table-store-connector/src/test/java/org/apache/flink/table/store/connector/CreateTableITCase.java
+++ b/flink-table-store-connector/src/test/java/org/apache/flink/table/store/connector/CreateTableITCase.java
@@ -26,7 +26,7 @@ import org.apache.flink.table.catalog.ResolvedCatalogTable;
 import org.apache.flink.table.catalog.exceptions.DatabaseNotExistException;
 import org.apache.flink.table.catalog.exceptions.TableAlreadyExistException;
 import org.apache.flink.table.catalog.exceptions.TableNotExistException;
-import org.apache.flink.table.store.file.FileStoreOptions;
+import org.apache.flink.table.store.CoreOptions;
 import org.apache.flink.table.types.logical.IntType;
 import org.apache.flink.table.types.logical.RowType;
 import org.apache.flink.table.types.logical.VarCharType;
@@ -41,7 +41,7 @@ import java.util.Collections;
 import java.util.List;
 import java.util.UUID;
 
-import static org.apache.flink.table.store.file.FileStoreOptions.BUCKET;
+import static org.apache.flink.table.store.CoreOptions.BUCKET;
 import static org.assertj.core.api.Assertions.assertThat;
 import static org.assertj.core.api.Assertions.assertThatThrownBy;
 
@@ -79,9 +79,7 @@ public class CreateTableITCase extends TableStoreTestBase {
             assertThat(((TableEnvironmentImpl) tEnv).getCatalogManager().getTable(tableIdentifier))
                     .isPresent();
             // check table store
-            assertThat(
-                            Paths.get(rootPath, FileStoreOptions.relativeTablePath(tableIdentifier))
-                                    .toFile())
+            assertThat(Paths.get(rootPath, CoreOptions.relativeTablePath(tableIdentifier)).toFile())
                     .exists();
             // check log store
             assertThat(topicExists(tableIdentifier.asSummaryString())).isEqualTo(enableLogStore);
@@ -131,9 +129,7 @@ public class CreateTableITCase extends TableStoreTestBase {
             }
         } else if (expectedResult.expectedMessage.startsWith("Failed to create file store path.")) {
             // failed when creating file store
-            Paths.get(rootPath, FileStoreOptions.relativeTablePath(tableIdentifier))
-                    .toFile()
-                    .mkdirs();
+            Paths.get(rootPath, CoreOptions.relativeTablePath(tableIdentifier)).toFile().mkdirs();
         } else if (expectedResult.expectedMessage.startsWith("Failed to create kafka topic.")) {
             // failed when creating log store
             createTopicIfNotExists(tableIdentifier.asSummaryString(), BUCKET.defaultValue());
diff --git a/flink-table-store-connector/src/test/java/org/apache/flink/table/store/connector/DropTableITCase.java b/flink-table-store-connector/src/test/java/org/apache/flink/table/store/connector/DropTableITCase.java
index 89631e42..e7278976 100644
--- a/flink-table-store-connector/src/test/java/org/apache/flink/table/store/connector/DropTableITCase.java
+++ b/flink-table-store-connector/src/test/java/org/apache/flink/table/store/connector/DropTableITCase.java
@@ -25,7 +25,7 @@ import org.apache.flink.table.api.internal.TableEnvironmentImpl;
 import org.apache.flink.table.catalog.ObjectIdentifier;
 import org.apache.flink.table.catalog.ResolvedCatalogTable;
 import org.apache.flink.table.catalog.exceptions.TableNotExistException;
-import org.apache.flink.table.store.file.FileStoreOptions;
+import org.apache.flink.table.store.CoreOptions;
 import org.apache.flink.table.types.logical.IntType;
 import org.apache.flink.table.types.logical.RowType;
 import org.apache.flink.table.types.logical.VarCharType;
@@ -80,9 +80,7 @@ public class DropTableITCase extends TableStoreTestBase {
             assertThat(((TableEnvironmentImpl) tEnv).getCatalogManager().getTable(tableIdentifier))
                     .isNotPresent();
             // check table store
-            assertThat(
-                            Paths.get(rootPath, FileStoreOptions.relativeTablePath(tableIdentifier))
-                                    .toFile())
+            assertThat(Paths.get(rootPath, CoreOptions.relativeTablePath(tableIdentifier)).toFile())
                     .doesNotExist();
             // check log store
             assertThat(topicExists(tableIdentifier.asSummaryString())).isFalse();
diff --git a/flink-table-store-connector/src/test/java/org/apache/flink/table/store/connector/FileStoreITCase.java b/flink-table-store-connector/src/test/java/org/apache/flink/table/store/connector/FileStoreITCase.java
index 43df8244..1eb0596c 100644
--- a/flink-table-store-connector/src/test/java/org/apache/flink/table/store/connector/FileStoreITCase.java
+++ b/flink-table-store-connector/src/test/java/org/apache/flink/table/store/connector/FileStoreITCase.java
@@ -32,11 +32,11 @@ import org.apache.flink.table.data.conversion.DataStructureConverter;
 import org.apache.flink.table.data.conversion.DataStructureConverters;
 import org.apache.flink.table.runtime.typeutils.InternalSerializers;
 import org.apache.flink.table.runtime.typeutils.InternalTypeInfo;
+import org.apache.flink.table.store.CoreOptions;
 import org.apache.flink.table.store.connector.sink.FlinkSinkBuilder;
 import org.apache.flink.table.store.connector.sink.StoreSink;
 import org.apache.flink.table.store.connector.source.FileStoreSource;
 import org.apache.flink.table.store.connector.source.FlinkSourceBuilder;
-import org.apache.flink.table.store.file.FileStoreOptions;
 import org.apache.flink.table.store.file.schema.SchemaManager;
 import org.apache.flink.table.store.file.schema.UpdateSchema;
 import org.apache.flink.table.store.file.utils.BlockingIterator;
@@ -70,9 +70,9 @@ import java.util.concurrent.ThreadLocalRandom;
 import java.util.stream.Collectors;
 import java.util.stream.Stream;
 
-import static org.apache.flink.table.store.file.FileStoreOptions.BUCKET;
-import static org.apache.flink.table.store.file.FileStoreOptions.FILE_FORMAT;
-import static org.apache.flink.table.store.file.FileStoreOptions.PATH;
+import static org.apache.flink.table.store.CoreOptions.BUCKET;
+import static org.apache.flink.table.store.CoreOptions.FILE_FORMAT;
+import static org.apache.flink.table.store.CoreOptions.PATH;
 import static org.apache.flink.table.store.file.utils.FailingAtomicRenameFileSystem.retryArtificialException;
 import static org.assertj.core.api.Assertions.assertThat;
 
@@ -365,7 +365,7 @@ public class FileStoreITCase extends AbstractTestBase {
             boolean noFail, TemporaryFolder temporaryFolder, int[] partitions, int[] primaryKey)
             throws Exception {
         Configuration options = buildConfiguration(noFail, temporaryFolder.newFolder());
-        Path tablePath = new FileStoreOptions(options).path();
+        Path tablePath = new CoreOptions(options).path();
         UpdateSchema updateSchema =
                 new UpdateSchema(
                         TABLE_TYPE,
diff --git a/flink-table-store-connector/src/test/java/org/apache/flink/table/store/connector/FileStoreTableITCase.java b/flink-table-store-connector/src/test/java/org/apache/flink/table/store/connector/FileStoreTableITCase.java
index 4e6e8c59..4e313402 100644
--- a/flink-table-store-connector/src/test/java/org/apache/flink/table/store/connector/FileStoreTableITCase.java
+++ b/flink-table-store-connector/src/test/java/org/apache/flink/table/store/connector/FileStoreTableITCase.java
@@ -51,9 +51,9 @@ import java.time.Duration;
 import java.util.List;
 
 import static org.apache.flink.streaming.api.environment.ExecutionCheckpointingOptions.CHECKPOINTING_INTERVAL;
+import static org.apache.flink.table.store.CoreOptions.TABLE_STORE_PREFIX;
+import static org.apache.flink.table.store.CoreOptions.relativeTablePath;
 import static org.apache.flink.table.store.connector.TableStoreFactoryOptions.ROOT_PATH;
-import static org.apache.flink.table.store.file.FileStoreOptions.TABLE_STORE_PREFIX;
-import static org.apache.flink.table.store.file.FileStoreOptions.relativeTablePath;
 import static org.junit.jupiter.api.Assertions.fail;
 
 /** ITCase for file store table api. */
diff --git a/flink-table-store-connector/src/test/java/org/apache/flink/table/store/connector/ReadWriteTableTestBase.java b/flink-table-store-connector/src/test/java/org/apache/flink/table/store/connector/ReadWriteTableTestBase.java
index 1e233b76..33d28349 100644
--- a/flink-table-store-connector/src/test/java/org/apache/flink/table/store/connector/ReadWriteTableTestBase.java
+++ b/flink-table-store-connector/src/test/java/org/apache/flink/table/store/connector/ReadWriteTableTestBase.java
@@ -30,10 +30,9 @@ import org.apache.flink.table.catalog.CatalogBaseTable;
 import org.apache.flink.table.catalog.ObjectIdentifier;
 import org.apache.flink.table.catalog.ObjectPath;
 import org.apache.flink.table.catalog.ResolvedCatalogTable;
-import org.apache.flink.table.store.file.FileStoreOptions;
+import org.apache.flink.table.store.CoreOptions;
 import org.apache.flink.table.store.file.utils.BlockingIterator;
 import org.apache.flink.table.store.kafka.KafkaTableTestBase;
-import org.apache.flink.table.store.log.LogOptions;
 import org.apache.flink.types.Row;
 
 import javax.annotation.Nullable;
@@ -50,6 +49,7 @@ import java.util.concurrent.TimeUnit;
 import java.util.concurrent.TimeoutException;
 import java.util.stream.Collectors;
 
+import static org.apache.flink.table.store.CoreOptions.LOG_PREFIX;
 import static org.apache.flink.table.store.connector.ReadWriteTableTestUtil.prepareHelperSourceWithChangelogRecords;
 import static org.apache.flink.table.store.connector.ReadWriteTableTestUtil.prepareHelperSourceWithInsertOnlyRecords;
 import static org.apache.flink.table.store.connector.ShowCreateUtil.buildInsertIntoQuery;
@@ -60,7 +60,6 @@ import static org.apache.flink.table.store.connector.ShowCreateUtil.createTableL
 import static org.apache.flink.table.store.connector.TableStoreFactoryOptions.LOG_SYSTEM;
 import static org.apache.flink.table.store.connector.TableStoreFactoryOptions.ROOT_PATH;
 import static org.apache.flink.table.store.kafka.KafkaLogOptions.BOOTSTRAP_SERVERS;
-import static org.apache.flink.table.store.log.LogOptions.LOG_PREFIX;
 import static org.assertj.core.api.Assertions.assertThat;
 
 /** Table store read write test base. */
@@ -72,7 +71,7 @@ public class ReadWriteTableTestBase extends KafkaTableTestBase {
     protected void checkFileStorePath(
             StreamTableEnvironment tEnv, String managedTable, @Nullable String partitionList) {
         String relativeFilePath =
-                FileStoreOptions.relativeTablePath(
+                CoreOptions.relativeTablePath(
                         ObjectIdentifier.of(
                                 tEnv.getCurrentCatalog(), tEnv.getCurrentDatabase(), managedTable));
         // check snapshot file path
@@ -232,8 +231,8 @@ public class ReadWriteTableTestBase extends KafkaTableTestBase {
             throws Exception {
         Map<String, String> hints = new HashMap<>();
         hints.put(
-                LOG_PREFIX + LogOptions.SCAN.key(),
-                LogOptions.LogStartupMode.LATEST.name().toLowerCase());
+                LOG_PREFIX + CoreOptions.LOG_SCAN.key(),
+                CoreOptions.LogStartupMode.LATEST.name().toLowerCase());
         collectAndCheckUnderSameEnv(
                         true,
                         true,
@@ -259,8 +258,9 @@ public class ReadWriteTableTestBase extends KafkaTableTestBase {
             List<Row> expected)
             throws Exception {
         Map<String, String> hints = new HashMap<>();
-        hints.put(LOG_PREFIX + LogOptions.SCAN.key(), "from-timestamp");
-        hints.put(LOG_PREFIX + LogOptions.SCAN_TIMESTAMP_MILLS.key(), String.valueOf(timestamp));
+        hints.put(LOG_PREFIX + CoreOptions.LOG_SCAN.key(), "from-timestamp");
+        hints.put(
+                LOG_PREFIX + CoreOptions.LOG_SCAN_TIMESTAMP_MILLS.key(), String.valueOf(timestamp));
         collectAndCheckUnderSameEnv(
                         true,
                         true,
diff --git a/flink-table-store-connector/src/test/java/org/apache/flink/table/store/connector/RescaleBucketITCase.java b/flink-table-store-connector/src/test/java/org/apache/flink/table/store/connector/RescaleBucketITCase.java
index 996917ca..d3efcf5f 100644
--- a/flink-table-store-connector/src/test/java/org/apache/flink/table/store/connector/RescaleBucketITCase.java
+++ b/flink-table-store-connector/src/test/java/org/apache/flink/table/store/connector/RescaleBucketITCase.java
@@ -38,7 +38,7 @@ import java.util.Arrays;
 import java.util.List;
 import java.util.concurrent.ExecutionException;
 
-import static org.apache.flink.table.store.file.FileStoreOptions.BUCKET;
+import static org.apache.flink.table.store.CoreOptions.BUCKET;
 import static org.assertj.core.api.Assertions.assertThat;
 import static org.assertj.core.api.Assertions.assertThatThrownBy;
 
diff --git a/flink-table-store-connector/src/test/java/org/apache/flink/table/store/connector/TableStoreManagedFactoryTest.java b/flink-table-store-connector/src/test/java/org/apache/flink/table/store/connector/TableStoreManagedFactoryTest.java
index 252f9828..31832f59 100644
--- a/flink-table-store-connector/src/test/java/org/apache/flink/table/store/connector/TableStoreManagedFactoryTest.java
+++ b/flink-table-store-connector/src/test/java/org/apache/flink/table/store/connector/TableStoreManagedFactoryTest.java
@@ -28,9 +28,8 @@ import org.apache.flink.table.catalog.ResolvedCatalogTable;
 import org.apache.flink.table.catalog.ResolvedSchema;
 import org.apache.flink.table.factories.DynamicTableFactory;
 import org.apache.flink.table.factories.FactoryUtil;
-import org.apache.flink.table.store.file.FileStoreOptions;
+import org.apache.flink.table.store.CoreOptions;
 import org.apache.flink.table.store.file.utils.JsonSerdeUtil;
-import org.apache.flink.table.store.log.LogOptions;
 import org.apache.flink.table.store.table.FileStoreTable;
 import org.apache.flink.table.types.logical.RowType;
 
@@ -56,22 +55,22 @@ import static java.util.Collections.emptyList;
 import static java.util.Collections.emptyMap;
 import static java.util.Collections.singletonList;
 import static java.util.Collections.singletonMap;
+import static org.apache.flink.table.store.CoreOptions.BUCKET;
+import static org.apache.flink.table.store.CoreOptions.LOG_CONSISTENCY;
+import static org.apache.flink.table.store.CoreOptions.LOG_PREFIX;
+import static org.apache.flink.table.store.CoreOptions.PATH;
+import static org.apache.flink.table.store.CoreOptions.TABLE_STORE_PREFIX;
+import static org.apache.flink.table.store.CoreOptions.path;
 import static org.apache.flink.table.store.connector.TableStoreFactoryOptions.COMPACTION_MANUAL_TRIGGERED;
 import static org.apache.flink.table.store.connector.TableStoreFactoryOptions.COMPACTION_PARTITION_SPEC;
 import static org.apache.flink.table.store.connector.TableStoreFactoryOptions.ROOT_PATH;
 import static org.apache.flink.table.store.connector.TableStoreTestBase.createResolvedTable;
-import static org.apache.flink.table.store.file.FileStoreOptions.BUCKET;
-import static org.apache.flink.table.store.file.FileStoreOptions.PATH;
-import static org.apache.flink.table.store.file.FileStoreOptions.TABLE_STORE_PREFIX;
-import static org.apache.flink.table.store.file.FileStoreOptions.path;
 import static org.apache.flink.table.store.file.TestKeyValueGenerator.DEFAULT_PART_TYPE;
 import static org.apache.flink.table.store.file.TestKeyValueGenerator.DEFAULT_ROW_TYPE;
 import static org.apache.flink.table.store.file.TestKeyValueGenerator.GeneratorMode.MULTI_PARTITIONED;
 import static org.apache.flink.table.store.file.TestKeyValueGenerator.getPrimaryKeys;
 import static org.apache.flink.table.store.file.WriteMode.APPEND_ONLY;
 import static org.apache.flink.table.store.kafka.KafkaLogOptions.BOOTSTRAP_SERVERS;
-import static org.apache.flink.table.store.log.LogOptions.CONSISTENCY;
-import static org.apache.flink.table.store.log.LogOptions.LOG_PREFIX;
 import static org.assertj.core.api.Assertions.assertThat;
 import static org.assertj.core.api.Assertions.assertThatThrownBy;
 
@@ -194,13 +193,13 @@ public class TableStoreManagedFactoryTest {
         // mix invalid key and leave value to empty to emphasize the deferred validation
         Map<String, String> expectedLogOptions =
                 of(
-                        LogOptions.SCAN.key(),
+                        CoreOptions.LOG_SCAN.key(),
                         "",
-                        LogOptions.RETENTION.key(),
+                        CoreOptions.LOG_RETENTION.key(),
                         "",
                         "dummy.key",
                         "",
-                        LogOptions.CHANGELOG_MODE.key(),
+                        CoreOptions.LOG_CHANGELOG_MODE.key(),
                         "");
         Map<String, String> enrichedOptions =
                 addPrefix(expectedLogOptions, LOG_PREFIX, (key) -> true);
@@ -273,7 +272,7 @@ public class TableStoreManagedFactoryTest {
         context =
                 createEnrichedContext(
                         Collections.singletonMap(
-                                FileStoreOptions.WRITE_MODE.key(), APPEND_ONLY.toString()));
+                                CoreOptions.WRITE_MODE.key(), APPEND_ONLY.toString()));
         assertThatThrownBy(
                         () ->
                                 tableStoreManagedFactory.onCompactTable(
@@ -325,8 +324,8 @@ public class TableStoreManagedFactoryTest {
                         sharedTempDir.toString(),
                         LOG_PREFIX + BOOTSTRAP_SERVERS.key(),
                         "localhost:9092",
-                        LOG_PREFIX + CONSISTENCY.key(),
-                        CONSISTENCY.defaultValue().name());
+                        LOG_PREFIX + LOG_CONSISTENCY.key(),
+                        LOG_CONSISTENCY.defaultValue().name());
 
         // set configuration under session level
         Arguments arg1 =
@@ -342,7 +341,7 @@ public class TableStoreManagedFactoryTest {
         // set both session and table level configuration to test options combination
         Map<String, String> tableOptions = new HashMap<>(enrichedOptions);
         tableOptions.remove(ROOT_PATH.key());
-        tableOptions.remove(CONSISTENCY.key());
+        tableOptions.remove(LOG_CONSISTENCY.key());
         Arguments arg3 =
                 Arguments.of(
                         addPrefix(
diff --git a/flink-table-store-connector/src/test/java/org/apache/flink/table/store/connector/TableStoreTestBase.java b/flink-table-store-connector/src/test/java/org/apache/flink/table/store/connector/TableStoreTestBase.java
index 558f6d25..1f321af9 100644
--- a/flink-table-store-connector/src/test/java/org/apache/flink/table/store/connector/TableStoreTestBase.java
+++ b/flink-table-store-connector/src/test/java/org/apache/flink/table/store/connector/TableStoreTestBase.java
@@ -31,7 +31,7 @@ import org.apache.flink.table.catalog.ObjectIdentifier;
 import org.apache.flink.table.catalog.ResolvedCatalogTable;
 import org.apache.flink.table.catalog.ResolvedSchema;
 import org.apache.flink.table.catalog.UniqueConstraint;
-import org.apache.flink.table.store.file.FileStoreOptions;
+import org.apache.flink.table.store.CoreOptions;
 import org.apache.flink.table.store.kafka.KafkaTableTestBase;
 import org.apache.flink.table.types.DataType;
 import org.apache.flink.table.types.logical.RowType;
@@ -50,11 +50,11 @@ import java.util.Map;
 import java.util.stream.Collectors;
 import java.util.stream.IntStream;
 
+import static org.apache.flink.table.store.CoreOptions.LOG_PREFIX;
+import static org.apache.flink.table.store.CoreOptions.TABLE_STORE_PREFIX;
 import static org.apache.flink.table.store.connector.TableStoreFactoryOptions.LOG_SYSTEM;
 import static org.apache.flink.table.store.connector.TableStoreFactoryOptions.ROOT_PATH;
-import static org.apache.flink.table.store.file.FileStoreOptions.TABLE_STORE_PREFIX;
 import static org.apache.flink.table.store.kafka.KafkaLogOptions.BOOTSTRAP_SERVERS;
-import static org.apache.flink.table.store.log.LogOptions.LOG_PREFIX;
 
 /** End-to-end test base for table store. */
 public abstract class TableStoreTestBase extends KafkaTableTestBase {
@@ -154,7 +154,7 @@ public abstract class TableStoreTestBase extends KafkaTableTestBase {
 
     protected void deleteTablePath() {
         FileUtils.deleteQuietly(
-                Paths.get(rootPath, FileStoreOptions.relativeTablePath(tableIdentifier)).toFile());
+                Paths.get(rootPath, CoreOptions.relativeTablePath(tableIdentifier)).toFile());
     }
 
     /** Expected result wrapper. */
diff --git a/flink-table-store-connector/src/test/java/org/apache/flink/table/store/connector/sink/LogStoreSinkITCase.java b/flink-table-store-connector/src/test/java/org/apache/flink/table/store/connector/sink/LogStoreSinkITCase.java
index 201f1692..59465a73 100644
--- a/flink-table-store-connector/src/test/java/org/apache/flink/table/store/connector/sink/LogStoreSinkITCase.java
+++ b/flink-table-store-connector/src/test/java/org/apache/flink/table/store/connector/sink/LogStoreSinkITCase.java
@@ -21,6 +21,7 @@ package org.apache.flink.table.store.connector.sink;
 import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
 import org.apache.flink.table.data.RowData;
 import org.apache.flink.table.factories.DynamicTableFactory;
+import org.apache.flink.table.store.CoreOptions;
 import org.apache.flink.table.store.connector.source.FlinkSourceBuilder;
 import org.apache.flink.table.store.file.utils.BlockingIterator;
 import org.apache.flink.table.store.kafka.KafkaLogSinkProvider;
@@ -28,7 +29,6 @@ import org.apache.flink.table.store.kafka.KafkaLogSourceProvider;
 import org.apache.flink.table.store.kafka.KafkaLogStoreFactory;
 import org.apache.flink.table.store.kafka.KafkaLogTestUtils;
 import org.apache.flink.table.store.kafka.KafkaTableTestBase;
-import org.apache.flink.table.store.log.LogOptions;
 import org.apache.flink.table.store.table.FileStoreTable;
 import org.apache.flink.types.Row;
 
@@ -102,10 +102,10 @@ public class LogStoreSinkITCase extends KafkaTableTestBase {
                 KafkaLogTestUtils.testContext(
                         name,
                         getBootstrapServers(),
-                        LogOptions.LogChangelogMode.AUTO,
+                        CoreOptions.LogChangelogMode.AUTO,
                         transaction
-                                ? LogOptions.LogConsistency.TRANSACTIONAL
-                                : LogOptions.LogConsistency.EVENTUAL,
+                                ? CoreOptions.LogConsistency.TRANSACTIONAL
+                                : CoreOptions.LogConsistency.EVENTUAL,
                         TABLE_TYPE,
                         hasPk ? new int[] {2} : new int[0]);
 
diff --git a/flink-table-store-connector/src/test/java/org/apache/flink/table/store/connector/source/TestChangelogDataReadWrite.java b/flink-table-store-connector/src/test/java/org/apache/flink/table/store/connector/source/TestChangelogDataReadWrite.java
index a99d05c0..4a4be635 100644
--- a/flink-table-store-connector/src/test/java/org/apache/flink/table/store/connector/source/TestChangelogDataReadWrite.java
+++ b/flink-table-store-connector/src/test/java/org/apache/flink/table/store/connector/source/TestChangelogDataReadWrite.java
@@ -24,12 +24,11 @@ import org.apache.flink.core.fs.Path;
 import org.apache.flink.table.data.GenericRowData;
 import org.apache.flink.table.data.RowData;
 import org.apache.flink.table.data.binary.BinaryRowData;
-import org.apache.flink.table.store.file.FileStoreOptions;
+import org.apache.flink.table.store.CoreOptions;
 import org.apache.flink.table.store.file.KeyValue;
 import org.apache.flink.table.store.file.data.DataFileMeta;
 import org.apache.flink.table.store.file.memory.HeapMemorySegmentPool;
 import org.apache.flink.table.store.file.memory.MemoryOwner;
-import org.apache.flink.table.store.file.mergetree.MergeTreeOptions;
 import org.apache.flink.table.store.file.mergetree.compact.DeduplicateMergeFunction;
 import org.apache.flink.table.store.file.operation.KeyValueFileStoreRead;
 import org.apache.flink.table.store.file.operation.KeyValueFileStoreWrite;
@@ -81,7 +80,7 @@ public class TestChangelogDataReadWrite {
                         tablePath,
                         RowType.of(new IntType()),
                         "default",
-                        FileStoreOptions.FILE_FORMAT.defaultValue());
+                        CoreOptions.FILE_FORMAT.defaultValue());
         this.snapshotManager = new SnapshotManager(new Path(root));
         this.service = service;
     }
@@ -140,7 +139,7 @@ public class TestChangelogDataReadWrite {
     }
 
     public RecordWriter<KeyValue> createMergeTreeWriter(BinaryRowData partition, int bucket) {
-        MergeTreeOptions options = new MergeTreeOptions(new Configuration());
+        CoreOptions options = new CoreOptions(new Configuration());
         RecordWriter<KeyValue> writer =
                 new KeyValueFileStoreWrite(
                                 new SchemaManager(tablePath),
diff --git a/flink-table-store-core/src/main/java/org/apache/flink/table/store/CoreOptions.java b/flink-table-store-core/src/main/java/org/apache/flink/table/store/CoreOptions.java
new file mode 100644
index 00000000..1df27218
--- /dev/null
+++ b/flink-table-store-core/src/main/java/org/apache/flink/table/store/CoreOptions.java
@@ -0,0 +1,582 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.store;
+
+import org.apache.flink.configuration.ConfigOption;
+import org.apache.flink.configuration.ConfigOptions;
+import org.apache.flink.configuration.Configuration;
+import org.apache.flink.configuration.DescribedEnum;
+import org.apache.flink.configuration.MemorySize;
+import org.apache.flink.configuration.description.Description;
+import org.apache.flink.configuration.description.InlineElement;
+import org.apache.flink.core.fs.Path;
+import org.apache.flink.table.catalog.ObjectIdentifier;
+import org.apache.flink.table.store.file.WriteMode;
+import org.apache.flink.table.store.format.FileFormat;
+import org.apache.flink.util.Preconditions;
+
+import java.io.Serializable;
+import java.time.Duration;
+import java.util.HashSet;
+import java.util.Map;
+import java.util.Set;
+
+import static org.apache.flink.configuration.ConfigOptions.key;
+import static org.apache.flink.configuration.description.TextElement.text;
+import static org.apache.flink.table.store.utils.OptionsUtils.formatEnumOption;
+
+/** Core options for table store. */
+public class CoreOptions implements Serializable {
+    public static final String LOG_PREFIX = "log.";
+    public static final String TABLE_STORE_PREFIX = "table-store.";
+
+    public static final ConfigOption<Integer> BUCKET =
+            ConfigOptions.key("bucket")
+                    .intType()
+                    .defaultValue(1)
+                    .withDescription("Bucket number for file store.");
+
+    public static final ConfigOption<String> PATH =
+            ConfigOptions.key("path")
+                    .stringType()
+                    .noDefaultValue()
+                    .withDescription("The file path of this table in the filesystem.");
+
+    public static final ConfigOption<String> FILE_FORMAT =
+            ConfigOptions.key("file.format")
+                    .stringType()
+                    .defaultValue("orc")
+                    .withDescription("Specify the message format of data files.");
+
+    public static final ConfigOption<String> MANIFEST_FORMAT =
+            ConfigOptions.key("manifest.format")
+                    .stringType()
+                    .defaultValue("avro")
+                    .withDescription("Specify the message format of manifest files.");
+
+    public static final ConfigOption<MemorySize> MANIFEST_TARGET_FILE_SIZE =
+            ConfigOptions.key("manifest.target-file-size")
+                    .memoryType()
+                    .defaultValue(MemorySize.ofMebiBytes(8))
+                    .withDescription("Suggested file size of a manifest file.");
+
+    public static final ConfigOption<Integer> MANIFEST_MERGE_MIN_COUNT =
+            ConfigOptions.key("manifest.merge-min-count")
+                    .intType()
+                    .defaultValue(30)
+                    .withDescription(
+                            "To avoid frequent manifest merges, this parameter specifies the minimum number "
+                                    + "of ManifestFileMeta to merge.");
+
+    public static final ConfigOption<String> PARTITION_DEFAULT_NAME =
+            key("partition.default-name")
+                    .stringType()
+                    .defaultValue("__DEFAULT_PARTITION__")
+                    .withDescription(
+                            "The default partition name in case the dynamic partition"
+                                    + " column value is null/empty string.");
+
+    public static final ConfigOption<Integer> SNAPSHOT_NUM_RETAINED_MIN =
+            ConfigOptions.key("snapshot.num-retained.min")
+                    .intType()
+                    .defaultValue(10)
+                    .withDescription("The minimum number of completed snapshots to retain.");
+
+    public static final ConfigOption<Integer> SNAPSHOT_NUM_RETAINED_MAX =
+            ConfigOptions.key("snapshot.num-retained.max")
+                    .intType()
+                    .defaultValue(Integer.MAX_VALUE)
+                    .withDescription("The maximum number of completed snapshots to retain.");
+
+    public static final ConfigOption<Duration> SNAPSHOT_TIME_RETAINED =
+            ConfigOptions.key("snapshot.time-retained")
+                    .durationType()
+                    .defaultValue(Duration.ofHours(1))
+                    .withDescription("The maximum time of completed snapshots to retain.");
+
+    public static final ConfigOption<Duration> CONTINUOUS_DISCOVERY_INTERVAL =
+            ConfigOptions.key("continuous.discovery-interval")
+                    .durationType()
+                    .defaultValue(Duration.ofSeconds(1))
+                    .withDescription("The discovery interval of continuous reading.");
+
+    public static final ConfigOption<CoreOptions.MergeEngine> MERGE_ENGINE =
+            ConfigOptions.key("merge-engine")
+                    .enumType(CoreOptions.MergeEngine.class)
+                    .defaultValue(CoreOptions.MergeEngine.DEDUPLICATE)
+                    .withDescription(
+                            Description.builder()
+                                    .text("Specify the merge engine for table with primary key.")
+                                    .linebreak()
+                                    .list(
+                                            formatEnumOption(CoreOptions.MergeEngine.DEDUPLICATE),
+                                            formatEnumOption(
+                                                    CoreOptions.MergeEngine.PARTIAL_UPDATE))
+                                    .build());
+
+    public static final ConfigOption<WriteMode> WRITE_MODE =
+            ConfigOptions.key("write-mode")
+                    .enumType(WriteMode.class)
+                    .defaultValue(WriteMode.CHANGE_LOG)
+                    .withDescription(
+                            Description.builder()
+                                    .text("Specify the write mode for table.")
+                                    .linebreak()
+                                    .list(formatEnumOption(WriteMode.APPEND_ONLY))
+                                    .list(formatEnumOption(WriteMode.CHANGE_LOG))
+                                    .build());
+
+    public static final ConfigOption<MemorySize> SOURCE_SPLIT_TARGET_SIZE =
+            ConfigOptions.key("source.split.target-size")
+                    .memoryType()
+                    .defaultValue(MemorySize.ofMebiBytes(128))
+                    .withDescription("Target size of a source split when scanning a bucket.");
+
+    public static final ConfigOption<MemorySize> SOURCE_SPLIT_OPEN_FILE_COST =
+            ConfigOptions.key("source.split.open-file-cost")
+                    .memoryType()
+                    .defaultValue(MemorySize.ofMebiBytes(4))
+                    .withDescription(
+                            "Open file cost of a source file. It is used to avoid reading"
+                                    + " too many files with a source split, which can be very slow.");
+
+    public static final ConfigOption<MemorySize> WRITE_BUFFER_SIZE =
+            ConfigOptions.key("write-buffer-size")
+                    .memoryType()
+                    .defaultValue(MemorySize.parse("256 mb"))
+                    .withDescription(
+                            "Amount of data to build up in memory before converting to a sorted on-disk file.");
+
+    public static final ConfigOption<MemorySize> PAGE_SIZE =
+            ConfigOptions.key("page-size")
+                    .memoryType()
+                    .defaultValue(MemorySize.parse("64 kb"))
+                    .withDescription("Memory page size.");
+
+    public static final ConfigOption<MemorySize> TARGET_FILE_SIZE =
+            ConfigOptions.key("target-file-size")
+                    .memoryType()
+                    .defaultValue(MemorySize.ofMebiBytes(128))
+                    .withDescription("Target size of a file.");
+
+    public static final ConfigOption<Integer> NUM_SORTED_RUNS_COMPACTION_TRIGGER =
+            ConfigOptions.key("num-sorted-run.compaction-trigger")
+                    .intType()
+                    .defaultValue(5)
+                    .withDescription(
+                            "The sorted run number to trigger compaction. Includes level0 files (one file one sorted run) and "
+                                    + "high-level runs (one level one sorted run).");
+
+    public static final ConfigOption<Integer> NUM_SORTED_RUNS_STOP_TRIGGER =
+            ConfigOptions.key("num-sorted-run.stop-trigger")
+                    .intType()
+                    .defaultValue(10)
+                    .withDescription(
+                            "The number of sorted-runs that trigger the stopping of writes.");
+
+    public static final ConfigOption<Integer> NUM_LEVELS =
+            ConfigOptions.key("num-levels")
+                    .intType()
+                    .noDefaultValue()
+                    .withDescription(
+                            "Total level number, for example, there are 3 levels, including 0,1,2 levels.");
+
+    public static final ConfigOption<Boolean> COMMIT_FORCE_COMPACT =
+            ConfigOptions.key("commit.force-compact")
+                    .booleanType()
+                    .defaultValue(false)
+                    .withDescription("Whether to force a compaction before commit.");
+
+    public static final ConfigOption<Integer> COMPACTION_MAX_SIZE_AMPLIFICATION_PERCENT =
+            ConfigOptions.key("compaction.max-size-amplification-percent")
+                    .intType()
+                    .defaultValue(200)
+                    .withDescription(
+                            "The size amplification is defined as the amount (in percentage) of additional storage "
+                                    + "needed to store a single byte of data in the merge tree.");
+
+    public static final ConfigOption<Integer> COMPACTION_SIZE_RATIO =
+            ConfigOptions.key("compaction.size-ratio")
+                    .intType()
+                    .defaultValue(1)
+                    .withDescription(
+                            "Percentage flexibility while comparing sorted run size. If the candidate sorted run(s) "
+                                    + "size is 1% smaller than the next sorted run's size, then include next sorted run "
+                                    + "into this candidate set.");
+
+    public static final ConfigOption<Boolean> CHANGELOG_FILE =
+            ConfigOptions.key("changelog-file")
+                    .booleanType()
+                    .defaultValue(false)
+                    .withDescription(
+                            "Whether to double write to a changelog file when flushing memory table. "
+                                    + "This changelog file keeps the order of data input and the details of data changes, "
+                                    + "it can be read directly during stream reads.");
+
+    public static final ConfigOption<LogStartupMode> LOG_SCAN =
+            ConfigOptions.key("scan")
+                    .enumType(LogStartupMode.class)
+                    .defaultValue(LogStartupMode.FULL)
+                    .withDescription(
+                            Description.builder()
+                                    .text("Specify the startup mode for log consumer.")
+                                    .linebreak()
+                                    .list(formatEnumOption(LogStartupMode.FULL))
+                                    .list(formatEnumOption(LogStartupMode.LATEST))
+                                    .list(formatEnumOption(LogStartupMode.FROM_TIMESTAMP))
+                                    .build());
+
+    public static final ConfigOption<Long> LOG_SCAN_TIMESTAMP_MILLS =
+            ConfigOptions.key("scan.timestamp-millis")
+                    .longType()
+                    .noDefaultValue()
+                    .withDescription(
+                            "Optional timestamp used in case of \"from-timestamp\" scan mode");
+
+    public static final ConfigOption<Duration> LOG_RETENTION =
+            ConfigOptions.key("retention")
+                    .durationType()
+                    .noDefaultValue()
+                    .withDescription(
+                            "It means how long changes log will be kept. The default value is from the log system cluster.");
+
+    public static final ConfigOption<LogConsistency> LOG_CONSISTENCY =
+            ConfigOptions.key("consistency")
+                    .enumType(LogConsistency.class)
+                    .defaultValue(LogConsistency.TRANSACTIONAL)
+                    .withDescription(
+                            Description.builder()
+                                    .text("Specify the log consistency mode for table.")
+                                    .linebreak()
+                                    .list(
+                                            formatEnumOption(LogConsistency.TRANSACTIONAL),
+                                            formatEnumOption(LogConsistency.EVENTUAL))
+                                    .build());
+
+    public static final ConfigOption<LogChangelogMode> LOG_CHANGELOG_MODE =
+            ConfigOptions.key("changelog-mode")
+                    .enumType(LogChangelogMode.class)
+                    .defaultValue(LogChangelogMode.AUTO)
+                    .withDescription(
+                            Description.builder()
+                                    .text("Specify the log changelog mode for table.")
+                                    .linebreak()
+                                    .list(
+                                            formatEnumOption(LogChangelogMode.AUTO),
+                                            formatEnumOption(LogChangelogMode.ALL),
+                                            formatEnumOption(LogChangelogMode.UPSERT))
+                                    .build());
+
+    public static final ConfigOption<String> LOG_KEY_FORMAT =
+            ConfigOptions.key("key.format")
+                    .stringType()
+                    .defaultValue("json")
+                    .withDescription(
+                            "Specify the key message format of log system with primary key.");
+
+    public static final ConfigOption<String> LOG_FORMAT =
+            ConfigOptions.key("format")
+                    .stringType()
+                    .defaultValue("debezium-json")
+                    .withDescription("Specify the message format of log system.");
+
+    public long writeBufferSize;
+
+    public int pageSize;
+
+    public long targetFileSize;
+
+    public int numSortedRunCompactionTrigger;
+
+    public int numSortedRunStopTrigger;
+
+    public int numLevels;
+
+    public boolean commitForceCompact;
+
+    public int maxSizeAmplificationPercent;
+
+    public int sizeRatio;
+
+    public boolean enableChangelogFile;
+
+    private Configuration options;
+
+    public CoreOptions(
+            long writeBufferSize,
+            int pageSize,
+            long targetFileSize,
+            int numSortedRunCompactionTrigger,
+            int numSortedRunStopTrigger,
+            Integer numLevels,
+            boolean commitForceCompact,
+            int maxSizeAmplificationPercent,
+            int sizeRatio,
+            boolean enableChangelogFile) {
+        this.writeBufferSize = writeBufferSize;
+        this.pageSize = pageSize;
+        this.targetFileSize = targetFileSize;
+        this.numSortedRunCompactionTrigger = numSortedRunCompactionTrigger;
+        this.numSortedRunStopTrigger =
+                Math.max(numSortedRunCompactionTrigger, numSortedRunStopTrigger);
+        // By default, this ensures that the compaction does not fall to level 0, but at least to
+        // level 1
+        this.numLevels = numLevels == null ? numSortedRunCompactionTrigger + 1 : numLevels;
+        this.commitForceCompact = commitForceCompact;
+        this.maxSizeAmplificationPercent = maxSizeAmplificationPercent;
+        this.sizeRatio = sizeRatio;
+        this.enableChangelogFile = enableChangelogFile;
+    }
+
+    public CoreOptions(Configuration config) {
+        this(
+                config.get(WRITE_BUFFER_SIZE).getBytes(),
+                (int) config.get(PAGE_SIZE).getBytes(),
+                config.get(TARGET_FILE_SIZE).getBytes(),
+                config.get(NUM_SORTED_RUNS_COMPACTION_TRIGGER),
+                config.get(NUM_SORTED_RUNS_STOP_TRIGGER),
+                config.get(NUM_LEVELS),
+                config.get(COMMIT_FORCE_COMPACT),
+                config.get(COMPACTION_MAX_SIZE_AMPLIFICATION_PERCENT),
+                config.get(COMPACTION_SIZE_RATIO),
+                config.get(CHANGELOG_FILE));
+        this.options = config;
+        Preconditions.checkArgument(
+                snapshotNumRetainMin() > 0,
+                SNAPSHOT_NUM_RETAINED_MIN.key() + " should be at least 1");
+        Preconditions.checkArgument(
+                snapshotNumRetainMin() <= snapshotNumRetainMax(),
+                SNAPSHOT_NUM_RETAINED_MIN.key()
+                        + " should not be larger than "
+                        + SNAPSHOT_NUM_RETAINED_MAX.key());
+    }
+
+    public CoreOptions(Map<String, String> options) {
+        this(Configuration.fromMap(options));
+    }
+
+    public static Set<ConfigOption<?>> allOptions() {
+        Set<ConfigOption<?>> allOptions = new HashSet<>();
+        allOptions.add(BUCKET);
+        allOptions.add(PATH);
+        allOptions.add(FILE_FORMAT);
+        allOptions.add(MANIFEST_FORMAT);
+        allOptions.add(MANIFEST_TARGET_FILE_SIZE);
+        allOptions.add(MANIFEST_MERGE_MIN_COUNT);
+        allOptions.add(PARTITION_DEFAULT_NAME);
+        allOptions.add(SNAPSHOT_NUM_RETAINED_MIN);
+        allOptions.add(SNAPSHOT_NUM_RETAINED_MAX);
+        allOptions.add(SNAPSHOT_TIME_RETAINED);
+        allOptions.add(CONTINUOUS_DISCOVERY_INTERVAL);
+        allOptions.add(MERGE_ENGINE);
+        allOptions.add(WRITE_MODE);
+        allOptions.add(SOURCE_SPLIT_TARGET_SIZE);
+        allOptions.add(SOURCE_SPLIT_OPEN_FILE_COST);
+        allOptions.add(WRITE_BUFFER_SIZE);
+        allOptions.add(PAGE_SIZE);
+        allOptions.add(TARGET_FILE_SIZE);
+        allOptions.add(NUM_SORTED_RUNS_COMPACTION_TRIGGER);
+        allOptions.add(NUM_SORTED_RUNS_STOP_TRIGGER);
+        allOptions.add(NUM_LEVELS);
+        allOptions.add(COMMIT_FORCE_COMPACT);
+        allOptions.add(COMPACTION_MAX_SIZE_AMPLIFICATION_PERCENT);
+        allOptions.add(COMPACTION_SIZE_RATIO);
+        return allOptions;
+    }
+
+    public int bucket() {
+        return options.get(BUCKET);
+    }
+
+    public Path path() {
+        return path(options.toMap());
+    }
+
+    public static Path path(Map<String, String> options) {
+        return new Path(options.get(PATH.key()));
+    }
+
+    public static Path path(Configuration options) {
+        return new Path(options.get(PATH));
+    }
+
+    public static String relativeTablePath(ObjectIdentifier tableIdentifier) {
+        return String.format(
+                "%s.catalog/%s.db/%s",
+                tableIdentifier.getCatalogName(),
+                tableIdentifier.getDatabaseName(),
+                tableIdentifier.getObjectName());
+    }
+
+    public FileFormat fileFormat() {
+        return FileFormat.fromTableOptions(options, FILE_FORMAT);
+    }
+
+    public FileFormat manifestFormat() {
+        return FileFormat.fromTableOptions(options, MANIFEST_FORMAT);
+    }
+
+    public MemorySize manifestTargetSize() {
+        return options.get(MANIFEST_TARGET_FILE_SIZE);
+    }
+
+    public String partitionDefaultName() {
+        return options.get(PARTITION_DEFAULT_NAME);
+    }
+
+    public int snapshotNumRetainMin() {
+        return options.get(SNAPSHOT_NUM_RETAINED_MIN);
+    }
+
+    public int snapshotNumRetainMax() {
+        return options.get(SNAPSHOT_NUM_RETAINED_MAX);
+    }
+
+    public Duration snapshotTimeRetain() {
+        return options.get(SNAPSHOT_TIME_RETAINED);
+    }
+
+    public int manifestMergeMinCount() {
+        return options.get(MANIFEST_MERGE_MIN_COUNT);
+    }
+
+    public long splitTargetSize() {
+        return options.get(SOURCE_SPLIT_TARGET_SIZE).getBytes();
+    }
+
+    public long splitOpenFileCost() {
+        return options.get(SOURCE_SPLIT_OPEN_FILE_COST).getBytes();
+    }
+
+    /** Specifies the merge engine for table with primary key. */
+    public enum MergeEngine implements DescribedEnum {
+        DEDUPLICATE("deduplicate", "De-duplicate and keep the last row."),
+
+        PARTIAL_UPDATE("partial-update", "Partial update non-null fields.");
+
+        private final String value;
+        private final String description;
+
+        MergeEngine(String value, String description) {
+            this.value = value;
+            this.description = description;
+        }
+
+        @Override
+        public String toString() {
+            return value;
+        }
+
+        @Override
+        public InlineElement getDescription() {
+            return text(description);
+        }
+    }
+
+    /** Specifies the startup mode for log consumer. */
+    public enum LogStartupMode implements DescribedEnum {
+        FULL(
+                "full",
+                "Perform a snapshot on the table upon first startup,"
+                        + " and continue to read the latest changes."),
+
+        LATEST("latest", "Start from the latest."),
+
+        FROM_TIMESTAMP("from-timestamp", "Start from user-supplied timestamp.");
+
+        private final String value;
+        private final String description;
+
+        LogStartupMode(String value, String description) {
+            this.value = value;
+            this.description = description;
+        }
+
+        @Override
+        public String toString() {
+            return value;
+        }
+
+        @Override
+        public InlineElement getDescription() {
+            return text(description);
+        }
+    }
+
+    /** Specifies the log consistency mode for table. */
+    public enum LogConsistency implements DescribedEnum {
+        TRANSACTIONAL(
+                "transactional",
+                "Only the data after the checkpoint can be seen by readers, the latency depends on checkpoint interval."),
+
+        EVENTUAL(
+                "eventual",
+                "Immediate data visibility, you may see some intermediate states, "
+                        + "but eventually the right results will be produced, only works for table with primary key.");
+
+        private final String value;
+        private final String description;
+
+        LogConsistency(String value, String description) {
+            this.value = value;
+            this.description = description;
+        }
+
+        @Override
+        public String toString() {
+            return value;
+        }
+
+        @Override
+        public InlineElement getDescription() {
+            return text(description);
+        }
+    }
+
+    /** Specifies the log changelog mode for table. */
+    public enum LogChangelogMode implements DescribedEnum {
+        AUTO("auto", "Upsert for table with primary key, all for table without primary key.."),
+
+        ALL("all", "The log system stores all changes including UPDATE_BEFORE."),
+
+        UPSERT(
+                "upsert",
+                "The log system does not store the UPDATE_BEFORE changes, the log consumed job"
+                        + " will automatically add the normalized node, relying on the state"
+                        + " to generate the required update_before.");
+
+        private final String value;
+        private final String description;
+
+        LogChangelogMode(String value, String description) {
+            this.value = value;
+            this.description = description;
+        }
+
+        @Override
+        public String toString() {
+            return value;
+        }
+
+        @Override
+        public InlineElement getDescription() {
+            return text(description);
+        }
+    }
+}
diff --git a/flink-table-store-core/src/main/java/org/apache/flink/table/store/file/AbstractFileStore.java b/flink-table-store-core/src/main/java/org/apache/flink/table/store/file/AbstractFileStore.java
index 4e3338f5..ff943a19 100644
--- a/flink-table-store-core/src/main/java/org/apache/flink/table/store/file/AbstractFileStore.java
+++ b/flink-table-store-core/src/main/java/org/apache/flink/table/store/file/AbstractFileStore.java
@@ -19,6 +19,7 @@
 package org.apache.flink.table.store.file;
 
 import org.apache.flink.annotation.VisibleForTesting;
+import org.apache.flink.table.store.CoreOptions;
 import org.apache.flink.table.store.file.manifest.ManifestFile;
 import org.apache.flink.table.store.file.manifest.ManifestList;
 import org.apache.flink.table.store.file.operation.FileStoreCommitImpl;
@@ -37,14 +38,14 @@ public abstract class AbstractFileStore<T> implements FileStore<T> {
 
     protected final SchemaManager schemaManager;
     protected final long schemaId;
-    protected final FileStoreOptions options;
+    protected final CoreOptions options;
     protected final String user;
     protected final RowType partitionType;
 
     public AbstractFileStore(
             SchemaManager schemaManager,
             long schemaId,
-            FileStoreOptions options,
+            CoreOptions options,
             String user,
             RowType partitionType) {
         this.schemaManager = schemaManager;
@@ -88,7 +89,7 @@ public abstract class AbstractFileStore<T> implements FileStore<T> {
         return partitionType;
     }
 
-    public FileStoreOptions options() {
+    public CoreOptions options() {
         return options;
     }
 
diff --git a/flink-table-store-core/src/main/java/org/apache/flink/table/store/file/AppendOnlyFileStore.java b/flink-table-store-core/src/main/java/org/apache/flink/table/store/file/AppendOnlyFileStore.java
index c9a8dec2..6236a39f 100644
--- a/flink-table-store-core/src/main/java/org/apache/flink/table/store/file/AppendOnlyFileStore.java
+++ b/flink-table-store-core/src/main/java/org/apache/flink/table/store/file/AppendOnlyFileStore.java
@@ -19,6 +19,7 @@
 package org.apache.flink.table.store.file;
 
 import org.apache.flink.table.data.RowData;
+import org.apache.flink.table.store.CoreOptions;
 import org.apache.flink.table.store.file.operation.AppendOnlyFileStoreRead;
 import org.apache.flink.table.store.file.operation.AppendOnlyFileStoreScan;
 import org.apache.flink.table.store.file.operation.AppendOnlyFileStoreWrite;
@@ -33,7 +34,7 @@ public class AppendOnlyFileStore extends AbstractFileStore<RowData> {
     public AppendOnlyFileStore(
             SchemaManager schemaManager,
             long schemaId,
-            FileStoreOptions options,
+            CoreOptions options,
             String user,
             RowType partitionType,
             RowType rowType) {
@@ -61,7 +62,7 @@ public class AppendOnlyFileStore extends AbstractFileStore<RowData> {
                 pathFactory(),
                 snapshotManager(),
                 newScan(true),
-                options.mergeTreeOptions().targetFileSize);
+                options.targetFileSize);
     }
 
     private AppendOnlyFileStoreScan newScan(boolean checkNumOfBuckets) {
diff --git a/flink-table-store-core/src/main/java/org/apache/flink/table/store/file/KeyValueFileStore.java b/flink-table-store-core/src/main/java/org/apache/flink/table/store/file/KeyValueFileStore.java
index 6d2dc3a8..fb751a73 100644
--- a/flink-table-store-core/src/main/java/org/apache/flink/table/store/file/KeyValueFileStore.java
+++ b/flink-table-store-core/src/main/java/org/apache/flink/table/store/file/KeyValueFileStore.java
@@ -19,6 +19,7 @@
 package org.apache.flink.table.store.file;
 
 import org.apache.flink.table.data.RowData;
+import org.apache.flink.table.store.CoreOptions;
 import org.apache.flink.table.store.file.mergetree.compact.MergeFunction;
 import org.apache.flink.table.store.file.operation.KeyValueFileStoreRead;
 import org.apache.flink.table.store.file.operation.KeyValueFileStoreScan;
@@ -41,7 +42,7 @@ public class KeyValueFileStore extends AbstractFileStore<KeyValue> {
     public KeyValueFileStore(
             SchemaManager schemaManager,
             long schemaId,
-            FileStoreOptions options,
+            CoreOptions options,
             String user,
             RowType partitionType,
             RowType keyType,
@@ -85,7 +86,7 @@ public class KeyValueFileStore extends AbstractFileStore<KeyValue> {
                 pathFactory(),
                 snapshotManager(),
                 newScan(true),
-                options.mergeTreeOptions());
+                options);
     }
 
     private KeyValueFileStoreScan newScan(boolean checkNumOfBuckets) {
diff --git a/flink-table-store-core/src/main/java/org/apache/flink/table/store/file/operation/KeyValueFileStoreWrite.java b/flink-table-store-core/src/main/java/org/apache/flink/table/store/file/operation/KeyValueFileStoreWrite.java
index 3c628af0..b06df389 100644
--- a/flink-table-store-core/src/main/java/org/apache/flink/table/store/file/operation/KeyValueFileStoreWrite.java
+++ b/flink-table-store-core/src/main/java/org/apache/flink/table/store/file/operation/KeyValueFileStoreWrite.java
@@ -20,12 +20,12 @@ package org.apache.flink.table.store.file.operation;
 
 import org.apache.flink.table.data.RowData;
 import org.apache.flink.table.data.binary.BinaryRowData;
+import org.apache.flink.table.store.CoreOptions;
 import org.apache.flink.table.store.file.KeyValue;
 import org.apache.flink.table.store.file.data.DataFileMeta;
 import org.apache.flink.table.store.file.data.DataFileReader;
 import org.apache.flink.table.store.file.data.DataFileWriter;
 import org.apache.flink.table.store.file.mergetree.Levels;
-import org.apache.flink.table.store.file.mergetree.MergeTreeOptions;
 import org.apache.flink.table.store.file.mergetree.MergeTreeReader;
 import org.apache.flink.table.store.file.mergetree.MergeTreeWriter;
 import org.apache.flink.table.store.file.mergetree.compact.CompactManager;
@@ -60,7 +60,7 @@ public class KeyValueFileStoreWrite extends AbstractFileStoreWrite<KeyValue> {
     private final DataFileWriter.Factory dataFileWriterFactory;
     private final Supplier<Comparator<RowData>> keyComparatorSupplier;
     private final MergeFunction mergeFunction;
-    private final MergeTreeOptions options;
+    private final CoreOptions options;
 
     public KeyValueFileStoreWrite(
             SchemaManager schemaManager,
@@ -73,7 +73,7 @@ public class KeyValueFileStoreWrite extends AbstractFileStoreWrite<KeyValue> {
             FileStorePathFactory pathFactory,
             SnapshotManager snapshotManager,
             FileStoreScan scan,
-            MergeTreeOptions options) {
+            CoreOptions options) {
         super(snapshotManager, scan);
         this.dataFileReaderFactory =
                 new DataFileReader.Factory(
diff --git a/flink-table-store-core/src/main/java/org/apache/flink/table/store/file/utils/FileStorePathFactory.java b/flink-table-store-core/src/main/java/org/apache/flink/table/store/file/utils/FileStorePathFactory.java
index e97b467d..7fc48bc9 100644
--- a/flink-table-store-core/src/main/java/org/apache/flink/table/store/file/utils/FileStorePathFactory.java
+++ b/flink-table-store-core/src/main/java/org/apache/flink/table/store/file/utils/FileStorePathFactory.java
@@ -22,7 +22,7 @@ import org.apache.flink.annotation.VisibleForTesting;
 import org.apache.flink.configuration.ConfigOption;
 import org.apache.flink.core.fs.Path;
 import org.apache.flink.table.data.binary.BinaryRowData;
-import org.apache.flink.table.store.file.FileStoreOptions;
+import org.apache.flink.table.store.CoreOptions;
 import org.apache.flink.table.store.file.data.DataFilePathFactory;
 import org.apache.flink.table.types.logical.RowType;
 import org.apache.flink.table.utils.PartitionPathUtils;
@@ -60,7 +60,7 @@ public class FileStorePathFactory {
                 root,
                 RowType.of(),
                 PARTITION_DEFAULT_NAME.defaultValue(),
-                FileStoreOptions.FILE_FORMAT.defaultValue());
+                CoreOptions.FILE_FORMAT.defaultValue());
     }
 
     // for tables without partition, partitionType should be a row type with 0 columns (not null)
diff --git a/flink-table-store-core/src/main/java/org/apache/flink/table/store/log/LogStoreTableFactory.java b/flink-table-store-core/src/main/java/org/apache/flink/table/store/log/LogStoreTableFactory.java
index 7032ea06..4ae4db06 100644
--- a/flink-table-store-core/src/main/java/org/apache/flink/table/store/log/LogStoreTableFactory.java
+++ b/flink-table-store-core/src/main/java/org/apache/flink/table/store/log/LogStoreTableFactory.java
@@ -34,6 +34,7 @@ import org.apache.flink.table.factories.DynamicTableFactory;
 import org.apache.flink.table.factories.FactoryUtil;
 import org.apache.flink.table.factories.FactoryUtil.TableFactoryHelper;
 import org.apache.flink.table.factories.SerializationFormatFactory;
+import org.apache.flink.table.store.CoreOptions;
 import org.apache.flink.types.RowKind;
 
 import javax.annotation.Nullable;
@@ -83,8 +84,8 @@ public interface LogStoreTableFactory extends DynamicTableFactory {
             TableFactoryHelper helper) {
         DecodingFormat<DeserializationSchema<RowData>> format =
                 helper.discoverDecodingFormat(
-                        DeserializationFormatFactory.class, LogOptions.KEY_FORMAT);
-        validateKeyFormat(format, helper.getOptions().get(LogOptions.KEY_FORMAT));
+                        DeserializationFormatFactory.class, CoreOptions.LOG_KEY_FORMAT);
+        validateKeyFormat(format, helper.getOptions().get(CoreOptions.LOG_KEY_FORMAT));
         return format;
     }
 
@@ -92,8 +93,8 @@ public interface LogStoreTableFactory extends DynamicTableFactory {
             TableFactoryHelper helper) {
         EncodingFormat<SerializationSchema<RowData>> format =
                 helper.discoverEncodingFormat(
-                        SerializationFormatFactory.class, LogOptions.KEY_FORMAT);
-        validateKeyFormat(format, helper.getOptions().get(LogOptions.KEY_FORMAT));
+                        SerializationFormatFactory.class, CoreOptions.LOG_KEY_FORMAT);
+        validateKeyFormat(format, helper.getOptions().get(CoreOptions.LOG_KEY_FORMAT));
         return format;
     }
 
@@ -101,16 +102,17 @@ public interface LogStoreTableFactory extends DynamicTableFactory {
             TableFactoryHelper helper) {
         DecodingFormat<DeserializationSchema<RowData>> format =
                 helper.discoverDecodingFormat(
-                        DeserializationFormatFactory.class, LogOptions.FORMAT);
-        validateValueFormat(format, helper.getOptions().get(LogOptions.FORMAT));
+                        DeserializationFormatFactory.class, CoreOptions.LOG_FORMAT);
+        validateValueFormat(format, helper.getOptions().get(CoreOptions.LOG_FORMAT));
         return format;
     }
 
     static EncodingFormat<SerializationSchema<RowData>> getValueEncodingFormat(
             TableFactoryHelper helper) {
         EncodingFormat<SerializationSchema<RowData>> format =
-                helper.discoverEncodingFormat(SerializationFormatFactory.class, LogOptions.FORMAT);
-        validateValueFormat(format, helper.getOptions().get(LogOptions.FORMAT));
+                helper.discoverEncodingFormat(
+                        SerializationFormatFactory.class, CoreOptions.LOG_FORMAT);
+        validateValueFormat(format, helper.getOptions().get(CoreOptions.LOG_FORMAT));
         return format;
     }
 
diff --git a/flink-table-store-core/src/main/java/org/apache/flink/table/store/table/AppendOnlyFileStoreTable.java b/flink-table-store-core/src/main/java/org/apache/flink/table/store/table/AppendOnlyFileStoreTable.java
index 4fdede94..103dc3a9 100644
--- a/flink-table-store-core/src/main/java/org/apache/flink/table/store/table/AppendOnlyFileStoreTable.java
+++ b/flink-table-store-core/src/main/java/org/apache/flink/table/store/table/AppendOnlyFileStoreTable.java
@@ -20,8 +20,8 @@ package org.apache.flink.table.store.table;
 
 import org.apache.flink.core.fs.Path;
 import org.apache.flink.table.data.RowData;
+import org.apache.flink.table.store.CoreOptions;
 import org.apache.flink.table.store.file.AppendOnlyFileStore;
-import org.apache.flink.table.store.file.FileStoreOptions;
 import org.apache.flink.table.store.file.WriteMode;
 import org.apache.flink.table.store.file.operation.AppendOnlyFileStoreRead;
 import org.apache.flink.table.store.file.operation.AppendOnlyFileStoreScan;
@@ -59,7 +59,7 @@ public class AppendOnlyFileStoreTable extends AbstractFileStoreTable {
                 new AppendOnlyFileStore(
                         schemaManager,
                         tableSchema.id(),
-                        new FileStoreOptions(tableSchema.options()),
+                        new CoreOptions(tableSchema.options()),
                         user,
                         tableSchema.logicalPartitionType(),
                         tableSchema.logicalRowType());
diff --git a/flink-table-store-core/src/main/java/org/apache/flink/table/store/table/ChangelogValueCountFileStoreTable.java b/flink-table-store-core/src/main/java/org/apache/flink/table/store/table/ChangelogValueCountFileStoreTable.java
index d622f6ec..2830700f 100644
--- a/flink-table-store-core/src/main/java/org/apache/flink/table/store/table/ChangelogValueCountFileStoreTable.java
+++ b/flink-table-store-core/src/main/java/org/apache/flink/table/store/table/ChangelogValueCountFileStoreTable.java
@@ -21,7 +21,7 @@ package org.apache.flink.table.store.table;
 import org.apache.flink.core.fs.Path;
 import org.apache.flink.table.data.GenericRowData;
 import org.apache.flink.table.data.RowData;
-import org.apache.flink.table.store.file.FileStoreOptions;
+import org.apache.flink.table.store.CoreOptions;
 import org.apache.flink.table.store.file.KeyValue;
 import org.apache.flink.table.store.file.KeyValueFileStore;
 import org.apache.flink.table.store.file.WriteMode;
@@ -67,7 +67,7 @@ public class ChangelogValueCountFileStoreTable extends AbstractFileStoreTable {
                 new KeyValueFileStore(
                         schemaManager,
                         tableSchema.id(),
-                        new FileStoreOptions(tableSchema.options()),
+                        new CoreOptions(tableSchema.options()),
                         user,
                         tableSchema.logicalPartitionType(),
                         tableSchema.logicalRowType(),
diff --git a/flink-table-store-core/src/main/java/org/apache/flink/table/store/table/ChangelogWithKeyFileStoreTable.java b/flink-table-store-core/src/main/java/org/apache/flink/table/store/table/ChangelogWithKeyFileStoreTable.java
index 1c32917f..585caffd 100644
--- a/flink-table-store-core/src/main/java/org/apache/flink/table/store/table/ChangelogWithKeyFileStoreTable.java
+++ b/flink-table-store-core/src/main/java/org/apache/flink/table/store/table/ChangelogWithKeyFileStoreTable.java
@@ -21,7 +21,7 @@ package org.apache.flink.table.store.table;
 import org.apache.flink.configuration.Configuration;
 import org.apache.flink.core.fs.Path;
 import org.apache.flink.table.data.RowData;
-import org.apache.flink.table.store.file.FileStoreOptions;
+import org.apache.flink.table.store.CoreOptions;
 import org.apache.flink.table.store.file.KeyValue;
 import org.apache.flink.table.store.file.KeyValueFileStore;
 import org.apache.flink.table.store.file.WriteMode;
@@ -80,7 +80,7 @@ public class ChangelogWithKeyFileStoreTable extends AbstractFileStoreTable {
 
         Configuration conf = Configuration.fromMap(tableSchema.options());
 
-        FileStoreOptions.MergeEngine mergeEngine = conf.get(FileStoreOptions.MERGE_ENGINE);
+        CoreOptions.MergeEngine mergeEngine = conf.get(CoreOptions.MERGE_ENGINE);
         MergeFunction mergeFunction;
         switch (mergeEngine) {
             case DEDUPLICATE:
@@ -102,7 +102,7 @@ public class ChangelogWithKeyFileStoreTable extends AbstractFileStoreTable {
                 new KeyValueFileStore(
                         schemaManager,
                         tableSchema.id(),
-                        new FileStoreOptions(conf),
+                        new CoreOptions(conf),
                         user,
                         tableSchema.logicalPartitionType(),
                         keyType,
diff --git a/flink-table-store-core/src/main/java/org/apache/flink/table/store/table/FileStoreTableFactory.java b/flink-table-store-core/src/main/java/org/apache/flink/table/store/table/FileStoreTableFactory.java
index 82a1a039..440222f8 100644
--- a/flink-table-store-core/src/main/java/org/apache/flink/table/store/table/FileStoreTableFactory.java
+++ b/flink-table-store-core/src/main/java/org/apache/flink/table/store/table/FileStoreTableFactory.java
@@ -20,14 +20,14 @@ package org.apache.flink.table.store.table;
 
 import org.apache.flink.configuration.Configuration;
 import org.apache.flink.core.fs.Path;
-import org.apache.flink.table.store.file.FileStoreOptions;
+import org.apache.flink.table.store.CoreOptions;
 import org.apache.flink.table.store.file.WriteMode;
 import org.apache.flink.table.store.file.schema.SchemaManager;
 import org.apache.flink.table.store.file.schema.TableSchema;
 
 import java.util.UUID;
 
-import static org.apache.flink.table.store.file.FileStoreOptions.PATH;
+import static org.apache.flink.table.store.CoreOptions.PATH;
 
 /** Factory to create {@link FileStoreTable}. */
 public class FileStoreTableFactory {
@@ -43,7 +43,7 @@ public class FileStoreTableFactory {
     }
 
     public static FileStoreTable create(Configuration conf, String user) {
-        Path tablePath = FileStoreOptions.path(conf);
+        Path tablePath = CoreOptions.path(conf);
         TableSchema tableSchema =
                 new SchemaManager(tablePath)
                         .latest()
@@ -69,7 +69,7 @@ public class FileStoreTableFactory {
         tableSchema = tableSchema.copy(newOptions.toMap());
 
         SchemaManager schemaManager = new SchemaManager(tablePath);
-        if (newOptions.get(FileStoreOptions.WRITE_MODE) == WriteMode.APPEND_ONLY) {
+        if (newOptions.get(CoreOptions.WRITE_MODE) == WriteMode.APPEND_ONLY) {
             return new AppendOnlyFileStoreTable(tablePath, schemaManager, tableSchema, user);
         } else {
             if (tableSchema.primaryKeys().isEmpty()) {
diff --git a/flink-table-store-core/src/main/java/org/apache/flink/table/store/table/sink/MemoryTableWrite.java b/flink-table-store-core/src/main/java/org/apache/flink/table/store/table/sink/MemoryTableWrite.java
index 215840a1..1e997477 100644
--- a/flink-table-store-core/src/main/java/org/apache/flink/table/store/table/sink/MemoryTableWrite.java
+++ b/flink-table-store-core/src/main/java/org/apache/flink/table/store/table/sink/MemoryTableWrite.java
@@ -18,11 +18,10 @@
 
 package org.apache.flink.table.store.table.sink;
 
-import org.apache.flink.table.store.file.FileStoreOptions;
+import org.apache.flink.table.store.CoreOptions;
 import org.apache.flink.table.store.file.memory.HeapMemorySegmentPool;
 import org.apache.flink.table.store.file.memory.MemoryOwner;
 import org.apache.flink.table.store.file.memory.MemoryPoolFactory;
-import org.apache.flink.table.store.file.mergetree.MergeTreeOptions;
 import org.apache.flink.table.store.file.operation.FileStoreWrite;
 import org.apache.flink.table.store.file.writer.RecordWriter;
 
@@ -39,15 +38,11 @@ public abstract class MemoryTableWrite<T> extends AbstractTableWrite<T> {
     private final MemoryPoolFactory memoryPoolFactory;
 
     protected MemoryTableWrite(
-            FileStoreWrite<T> write,
-            SinkRecordConverter recordConverter,
-            FileStoreOptions options) {
+            FileStoreWrite<T> write, SinkRecordConverter recordConverter, CoreOptions options) {
         super(write, recordConverter);
 
-        MergeTreeOptions mergeTreeOptions = options.mergeTreeOptions();
         HeapMemorySegmentPool memoryPool =
-                new HeapMemorySegmentPool(
-                        mergeTreeOptions.writeBufferSize, mergeTreeOptions.pageSize);
+                new HeapMemorySegmentPool(options.writeBufferSize, options.pageSize);
         this.memoryPoolFactory = new MemoryPoolFactory(memoryPool, this::memoryOwners);
     }
 
diff --git a/flink-table-store-core/src/test/java/org/apache/flink/table/store/file/FileFormatTest.java b/flink-table-store-core/src/test/java/org/apache/flink/table/store/file/FileFormatTest.java
index e46fd09f..e096cf00 100644
--- a/flink-table-store-core/src/test/java/org/apache/flink/table/store/file/FileFormatTest.java
+++ b/flink-table-store-core/src/test/java/org/apache/flink/table/store/file/FileFormatTest.java
@@ -28,6 +28,7 @@ import org.apache.flink.core.fs.FileSystem;
 import org.apache.flink.core.fs.Path;
 import org.apache.flink.table.data.GenericRowData;
 import org.apache.flink.table.data.RowData;
+import org.apache.flink.table.store.CoreOptions;
 import org.apache.flink.table.store.format.FileFormat;
 import org.apache.flink.table.types.logical.IntType;
 import org.apache.flink.table.types.logical.RowType;
@@ -97,8 +98,8 @@ public class FileFormatTest {
 
     public FileFormat createFileFormat(String codec) {
         Configuration tableOptions = new Configuration();
-        tableOptions.set(FileStoreOptions.FILE_FORMAT, "avro");
+        tableOptions.set(CoreOptions.FILE_FORMAT, "avro");
         tableOptions.setString("avro.codec", codec);
-        return FileFormat.fromTableOptions(tableOptions, FileStoreOptions.FILE_FORMAT);
+        return FileFormat.fromTableOptions(tableOptions, CoreOptions.FILE_FORMAT);
     }
 }
diff --git a/flink-table-store-core/src/test/java/org/apache/flink/table/store/file/TestFileStore.java b/flink-table-store-core/src/test/java/org/apache/flink/table/store/file/TestFileStore.java
index fb290140..a7ff2a86 100644
--- a/flink-table-store-core/src/test/java/org/apache/flink/table/store/file/TestFileStore.java
+++ b/flink-table-store-core/src/test/java/org/apache/flink/table/store/file/TestFileStore.java
@@ -23,6 +23,7 @@ import org.apache.flink.configuration.MemorySize;
 import org.apache.flink.core.fs.Path;
 import org.apache.flink.table.data.binary.BinaryRowData;
 import org.apache.flink.table.runtime.typeutils.RowDataSerializer;
+import org.apache.flink.table.store.CoreOptions;
 import org.apache.flink.table.store.file.data.DataFileMeta;
 import org.apache.flink.table.store.file.manifest.ManifestCommittable;
 import org.apache.flink.table.store.file.manifest.ManifestEntry;
@@ -31,7 +32,6 @@ import org.apache.flink.table.store.file.manifest.ManifestList;
 import org.apache.flink.table.store.file.memory.HeapMemorySegmentPool;
 import org.apache.flink.table.store.file.memory.MemoryOwner;
 import org.apache.flink.table.store.file.mergetree.Increment;
-import org.apache.flink.table.store.file.mergetree.MergeTreeOptions;
 import org.apache.flink.table.store.file.mergetree.compact.MergeFunction;
 import org.apache.flink.table.store.file.operation.FileStoreCommit;
 import org.apache.flink.table.store.file.operation.FileStoreExpireImpl;
@@ -93,26 +93,26 @@ public class TestFileStore extends KeyValueFileStore {
             MergeFunction mergeFunction) {
         Configuration conf = new Configuration();
 
-        conf.set(MergeTreeOptions.WRITE_BUFFER_SIZE, WRITE_BUFFER_SIZE);
-        conf.set(MergeTreeOptions.PAGE_SIZE, PAGE_SIZE);
-        conf.set(MergeTreeOptions.TARGET_FILE_SIZE, MemorySize.parse("1 kb"));
+        conf.set(CoreOptions.WRITE_BUFFER_SIZE, WRITE_BUFFER_SIZE);
+        conf.set(CoreOptions.PAGE_SIZE, PAGE_SIZE);
+        conf.set(CoreOptions.TARGET_FILE_SIZE, MemorySize.parse("1 kb"));
 
         conf.set(
-                FileStoreOptions.MANIFEST_TARGET_FILE_SIZE,
+                CoreOptions.MANIFEST_TARGET_FILE_SIZE,
                 MemorySize.parse((ThreadLocalRandom.current().nextInt(16) + 1) + "kb"));
 
-        conf.set(FileStoreOptions.FILE_FORMAT, format);
-        conf.set(FileStoreOptions.MANIFEST_FORMAT, format);
-        conf.set(FileStoreOptions.PATH, root);
-        conf.set(FileStoreOptions.BUCKET, numBuckets);
+        conf.set(CoreOptions.FILE_FORMAT, format);
+        conf.set(CoreOptions.MANIFEST_FORMAT, format);
+        conf.set(CoreOptions.PATH, root);
+        conf.set(CoreOptions.BUCKET, numBuckets);
 
         return new TestFileStore(
-                root, new FileStoreOptions(conf), partitionType, keyType, valueType, mergeFunction);
+                root, new CoreOptions(conf), partitionType, keyType, valueType, mergeFunction);
     }
 
     private TestFileStore(
             String root,
-            FileStoreOptions options,
+            CoreOptions options,
             RowType partitionType,
             RowType keyType,
             RowType valueType,
diff --git a/flink-table-store-core/src/test/java/org/apache/flink/table/store/file/data/AppendOnlyWriterTest.java b/flink-table-store-core/src/test/java/org/apache/flink/table/store/file/data/AppendOnlyWriterTest.java
index 5cfb2f73..3fdeb330 100644
--- a/flink-table-store-core/src/test/java/org/apache/flink/table/store/file/data/AppendOnlyWriterTest.java
+++ b/flink-table-store-core/src/test/java/org/apache/flink/table/store/file/data/AppendOnlyWriterTest.java
@@ -25,7 +25,7 @@ import org.apache.flink.table.data.GenericRowData;
 import org.apache.flink.table.data.RowData;
 import org.apache.flink.table.data.StringData;
 import org.apache.flink.table.data.binary.BinaryRowDataUtil;
-import org.apache.flink.table.store.file.FileStoreOptions;
+import org.apache.flink.table.store.CoreOptions;
 import org.apache.flink.table.store.file.mergetree.Increment;
 import org.apache.flink.table.store.file.stats.FieldStatsArraySerializer;
 import org.apache.flink.table.store.file.writer.RecordWriter;
@@ -215,7 +215,7 @@ public class AppendOnlyWriterTest {
                 new Path(tempDir.toString()),
                 "dt=" + PART,
                 1,
-                FileStoreOptions.FILE_FORMAT.defaultValue());
+                CoreOptions.FILE_FORMAT.defaultValue());
     }
 
     private RecordWriter<RowData> createWriter(
diff --git a/flink-table-store-core/src/test/java/org/apache/flink/table/store/file/data/DataFilePathFactoryTest.java b/flink-table-store-core/src/test/java/org/apache/flink/table/store/file/data/DataFilePathFactoryTest.java
index cc4448b0..ad129ede 100644
--- a/flink-table-store-core/src/test/java/org/apache/flink/table/store/file/data/DataFilePathFactoryTest.java
+++ b/flink-table-store-core/src/test/java/org/apache/flink/table/store/file/data/DataFilePathFactoryTest.java
@@ -19,7 +19,7 @@
 package org.apache.flink.table.store.file.data;
 
 import org.apache.flink.core.fs.Path;
-import org.apache.flink.table.store.file.FileStoreOptions;
+import org.apache.flink.table.store.CoreOptions;
 
 import org.junit.jupiter.api.Test;
 import org.junit.jupiter.api.io.TempDir;
@@ -38,7 +38,7 @@ public class DataFilePathFactoryTest {
                         new Path(tempDir.toString()),
                         "",
                         123,
-                        FileStoreOptions.FILE_FORMAT.defaultValue());
+                        CoreOptions.FILE_FORMAT.defaultValue());
         String uuid = pathFactory.uuid();
 
         for (int i = 0; i < 20; i++) {
@@ -51,7 +51,7 @@ public class DataFilePathFactoryTest {
                                             + "-"
                                             + i
                                             + "."
-                                            + FileStoreOptions.FILE_FORMAT.defaultValue()));
+                                            + CoreOptions.FILE_FORMAT.defaultValue()));
         }
         assertThat(pathFactory.toPath("my-data-file-name"))
                 .isEqualTo(new Path(tempDir.toString() + "/bucket-123/my-data-file-name"));
@@ -64,7 +64,7 @@ public class DataFilePathFactoryTest {
                         new Path(tempDir.toString()),
                         "dt=20211224",
                         123,
-                        FileStoreOptions.FILE_FORMAT.defaultValue());
+                        CoreOptions.FILE_FORMAT.defaultValue());
         String uuid = pathFactory.uuid();
 
         for (int i = 0; i < 20; i++) {
@@ -77,7 +77,7 @@ public class DataFilePathFactoryTest {
                                             + "-"
                                             + i
                                             + "."
-                                            + FileStoreOptions.FILE_FORMAT.defaultValue()));
+                                            + CoreOptions.FILE_FORMAT.defaultValue()));
         }
         assertThat(pathFactory.toPath("my-data-file-name"))
                 .isEqualTo(
diff --git a/flink-table-store-core/src/test/java/org/apache/flink/table/store/file/manifest/ManifestFileMetaTest.java b/flink-table-store-core/src/test/java/org/apache/flink/table/store/file/manifest/ManifestFileMetaTest.java
index 43811b7a..8bb8194f 100644
--- a/flink-table-store-core/src/test/java/org/apache/flink/table/store/file/manifest/ManifestFileMetaTest.java
+++ b/flink-table-store-core/src/test/java/org/apache/flink/table/store/file/manifest/ManifestFileMetaTest.java
@@ -23,7 +23,7 @@ import org.apache.flink.core.fs.FileSystem;
 import org.apache.flink.core.fs.Path;
 import org.apache.flink.table.data.binary.BinaryRowData;
 import org.apache.flink.table.data.writer.BinaryRowWriter;
-import org.apache.flink.table.store.file.FileStoreOptions;
+import org.apache.flink.table.store.CoreOptions;
 import org.apache.flink.table.store.file.data.DataFileMeta;
 import org.apache.flink.table.store.file.schema.SchemaManager;
 import org.apache.flink.table.store.file.stats.StatsTestUtils;
@@ -146,7 +146,7 @@ public class ManifestFileMetaTest {
                                 new Path(path),
                                 PARTITION_TYPE,
                                 "default",
-                                FileStoreOptions.FILE_FORMAT.defaultValue()),
+                                CoreOptions.FILE_FORMAT.defaultValue()),
                         Long.MAX_VALUE)
                 .create();
     }
diff --git a/flink-table-store-core/src/test/java/org/apache/flink/table/store/file/manifest/ManifestFileTest.java b/flink-table-store-core/src/test/java/org/apache/flink/table/store/file/manifest/ManifestFileTest.java
index 3ad75724..98e83fdb 100644
--- a/flink-table-store-core/src/test/java/org/apache/flink/table/store/file/manifest/ManifestFileTest.java
+++ b/flink-table-store-core/src/test/java/org/apache/flink/table/store/file/manifest/ManifestFileTest.java
@@ -21,7 +21,7 @@ package org.apache.flink.table.store.file.manifest;
 import org.apache.flink.configuration.Configuration;
 import org.apache.flink.core.fs.FileSystem;
 import org.apache.flink.core.fs.Path;
-import org.apache.flink.table.store.file.FileStoreOptions;
+import org.apache.flink.table.store.CoreOptions;
 import org.apache.flink.table.store.file.schema.SchemaManager;
 import org.apache.flink.table.store.file.stats.StatsTestUtils;
 import org.apache.flink.table.store.file.utils.FailingAtomicRenameFileSystem;
@@ -97,7 +97,7 @@ public class ManifestFileTest {
                         new Path(path),
                         DEFAULT_PART_TYPE,
                         "default",
-                        FileStoreOptions.FILE_FORMAT.defaultValue());
+                        CoreOptions.FILE_FORMAT.defaultValue());
         int suggestedFileSize = ThreadLocalRandom.current().nextInt(8192) + 1024;
         return new ManifestFile.Factory(
                         new SchemaManager(new Path(path)),
diff --git a/flink-table-store-core/src/test/java/org/apache/flink/table/store/file/manifest/ManifestListTest.java b/flink-table-store-core/src/test/java/org/apache/flink/table/store/file/manifest/ManifestListTest.java
index 6064c1cf..95bfca51 100644
--- a/flink-table-store-core/src/test/java/org/apache/flink/table/store/file/manifest/ManifestListTest.java
+++ b/flink-table-store-core/src/test/java/org/apache/flink/table/store/file/manifest/ManifestListTest.java
@@ -21,7 +21,7 @@ package org.apache.flink.table.store.file.manifest;
 import org.apache.flink.configuration.Configuration;
 import org.apache.flink.core.fs.FileSystem;
 import org.apache.flink.core.fs.Path;
-import org.apache.flink.table.store.file.FileStoreOptions;
+import org.apache.flink.table.store.CoreOptions;
 import org.apache.flink.table.store.file.TestKeyValueGenerator;
 import org.apache.flink.table.store.file.utils.FailingAtomicRenameFileSystem;
 import org.apache.flink.table.store.file.utils.FileStorePathFactory;
@@ -94,7 +94,7 @@ public class ManifestListTest {
                         new Path(path),
                         TestKeyValueGenerator.DEFAULT_PART_TYPE,
                         "default",
-                        FileStoreOptions.FILE_FORMAT.defaultValue());
+                        CoreOptions.FILE_FORMAT.defaultValue());
         return new ManifestList.Factory(TestKeyValueGenerator.DEFAULT_PART_TYPE, avro, pathFactory)
                 .create();
     }
diff --git a/flink-table-store-core/src/test/java/org/apache/flink/table/store/file/mergetree/MergeTreeTest.java b/flink-table-store-core/src/test/java/org/apache/flink/table/store/file/mergetree/MergeTreeTest.java
index 18ad6073..2de27d14 100644
--- a/flink-table-store-core/src/test/java/org/apache/flink/table/store/file/mergetree/MergeTreeTest.java
+++ b/flink-table-store-core/src/test/java/org/apache/flink/table/store/file/mergetree/MergeTreeTest.java
@@ -25,6 +25,7 @@ import org.apache.flink.core.fs.Path;
 import org.apache.flink.table.data.GenericRowData;
 import org.apache.flink.table.data.RowData;
 import org.apache.flink.table.data.binary.BinaryRowDataUtil;
+import org.apache.flink.table.store.CoreOptions;
 import org.apache.flink.table.store.file.KeyValue;
 import org.apache.flink.table.store.file.data.DataFileMeta;
 import org.apache.flink.table.store.file.data.DataFileReader;
@@ -81,7 +82,7 @@ public class MergeTreeTest {
     private FileStorePathFactory pathFactory;
     private Comparator<RowData> comparator;
 
-    private MergeTreeOptions options;
+    private CoreOptions options;
     private DataFileReader dataFileReader;
     private DataFileWriter dataFileWriter;
     private RecordWriter<KeyValue> writer;
@@ -98,10 +99,10 @@ public class MergeTreeTest {
 
     private void recreateMergeTree(long targetFileSize) {
         Configuration configuration = new Configuration();
-        configuration.set(MergeTreeOptions.WRITE_BUFFER_SIZE, new MemorySize(4096 * 3));
-        configuration.set(MergeTreeOptions.PAGE_SIZE, new MemorySize(4096));
-        configuration.set(MergeTreeOptions.TARGET_FILE_SIZE, new MemorySize(targetFileSize));
-        options = new MergeTreeOptions(configuration);
+        configuration.set(CoreOptions.WRITE_BUFFER_SIZE, new MemorySize(4096 * 3));
+        configuration.set(CoreOptions.PAGE_SIZE, new MemorySize(4096));
+        configuration.set(CoreOptions.TARGET_FILE_SIZE, new MemorySize(targetFileSize));
+        options = new CoreOptions(configuration);
         RowType keyType = new RowType(singletonList(new RowType.RowField("k", new IntType())));
         RowType valueType = new RowType(singletonList(new RowType.RowField("v", new IntType())));
         FileFormat flushingAvro = new FlushingFileFormat("avro");
diff --git a/flink-table-store-core/src/test/java/org/apache/flink/table/store/file/utils/FileStorePathFactoryTest.java b/flink-table-store-core/src/test/java/org/apache/flink/table/store/file/utils/FileStorePathFactoryTest.java
index 154ec613..c2ca6a68 100644
--- a/flink-table-store-core/src/test/java/org/apache/flink/table/store/file/utils/FileStorePathFactoryTest.java
+++ b/flink-table-store-core/src/test/java/org/apache/flink/table/store/file/utils/FileStorePathFactoryTest.java
@@ -22,7 +22,7 @@ import org.apache.flink.core.fs.Path;
 import org.apache.flink.table.data.StringData;
 import org.apache.flink.table.data.binary.BinaryRowData;
 import org.apache.flink.table.data.writer.BinaryRowWriter;
-import org.apache.flink.table.store.file.FileStoreOptions;
+import org.apache.flink.table.store.CoreOptions;
 import org.apache.flink.table.store.file.data.DataFilePathFactory;
 import org.apache.flink.table.types.logical.IntType;
 import org.apache.flink.table.types.logical.LogicalType;
@@ -84,7 +84,7 @@ public class FileStorePathFactoryTest {
                                 new LogicalType[] {new VarCharType(10), new IntType()},
                                 new String[] {"dt", "hr"}),
                         "default",
-                        FileStoreOptions.FILE_FORMAT.defaultValue());
+                        CoreOptions.FILE_FORMAT.defaultValue());
 
         assertPartition("20211224", 16, pathFactory, "/dt=20211224/hr=16");
         assertPartition("20211224", null, pathFactory, "/dt=20211224/hr=default");
diff --git a/flink-table-store-core/src/test/java/org/apache/flink/table/store/table/AppendOnlyFileStoreTableTest.java b/flink-table-store-core/src/test/java/org/apache/flink/table/store/table/AppendOnlyFileStoreTableTest.java
index 85579d09..67778c2d 100644
--- a/flink-table-store-core/src/test/java/org/apache/flink/table/store/table/AppendOnlyFileStoreTableTest.java
+++ b/flink-table-store-core/src/test/java/org/apache/flink/table/store/table/AppendOnlyFileStoreTableTest.java
@@ -21,7 +21,7 @@ package org.apache.flink.table.store.table;
 import org.apache.flink.configuration.Configuration;
 import org.apache.flink.core.fs.Path;
 import org.apache.flink.table.data.GenericRowData;
-import org.apache.flink.table.store.file.FileStoreOptions;
+import org.apache.flink.table.store.CoreOptions;
 import org.apache.flink.table.store.file.WriteMode;
 import org.apache.flink.table.store.file.predicate.Predicate;
 import org.apache.flink.table.store.file.predicate.PredicateBuilder;
@@ -163,9 +163,9 @@ public class AppendOnlyFileStoreTableTest extends FileStoreTableTestBase {
     protected FileStoreTable createFileStoreTable() throws Exception {
         Path tablePath = new Path(tempDir.toString());
         Configuration conf = new Configuration();
-        conf.set(FileStoreOptions.PATH, tablePath.toString());
-        conf.set(FileStoreOptions.FILE_FORMAT, "avro");
-        conf.set(FileStoreOptions.WRITE_MODE, WriteMode.APPEND_ONLY);
+        conf.set(CoreOptions.PATH, tablePath.toString());
+        conf.set(CoreOptions.FILE_FORMAT, "avro");
+        conf.set(CoreOptions.WRITE_MODE, WriteMode.APPEND_ONLY);
         SchemaManager schemaManager = new SchemaManager(tablePath);
         TableSchema tableSchema =
                 schemaManager.commitNewVersion(
diff --git a/flink-table-store-core/src/test/java/org/apache/flink/table/store/table/ChangelogValueCountFileStoreTableTest.java b/flink-table-store-core/src/test/java/org/apache/flink/table/store/table/ChangelogValueCountFileStoreTableTest.java
index fcc1a3f8..7e5422dc 100644
--- a/flink-table-store-core/src/test/java/org/apache/flink/table/store/table/ChangelogValueCountFileStoreTableTest.java
+++ b/flink-table-store-core/src/test/java/org/apache/flink/table/store/table/ChangelogValueCountFileStoreTableTest.java
@@ -21,7 +21,7 @@ package org.apache.flink.table.store.table;
 import org.apache.flink.configuration.Configuration;
 import org.apache.flink.core.fs.Path;
 import org.apache.flink.table.data.GenericRowData;
-import org.apache.flink.table.store.file.FileStoreOptions;
+import org.apache.flink.table.store.CoreOptions;
 import org.apache.flink.table.store.file.WriteMode;
 import org.apache.flink.table.store.file.predicate.Predicate;
 import org.apache.flink.table.store.file.predicate.PredicateBuilder;
@@ -166,9 +166,9 @@ public class ChangelogValueCountFileStoreTableTest extends FileStoreTableTestBas
     protected FileStoreTable createFileStoreTable() throws Exception {
         Path tablePath = new Path(tempDir.toString());
         Configuration conf = new Configuration();
-        conf.set(FileStoreOptions.PATH, tablePath.toString());
-        conf.set(FileStoreOptions.FILE_FORMAT, "avro");
-        conf.set(FileStoreOptions.WRITE_MODE, WriteMode.CHANGE_LOG);
+        conf.set(CoreOptions.PATH, tablePath.toString());
+        conf.set(CoreOptions.FILE_FORMAT, "avro");
+        conf.set(CoreOptions.WRITE_MODE, WriteMode.CHANGE_LOG);
         SchemaManager schemaManager = new SchemaManager(tablePath);
         TableSchema tableSchema =
                 schemaManager.commitNewVersion(
diff --git a/flink-table-store-core/src/test/java/org/apache/flink/table/store/table/ChangelogWithKeyFileStoreTableTest.java b/flink-table-store-core/src/test/java/org/apache/flink/table/store/table/ChangelogWithKeyFileStoreTableTest.java
index 6d12a2cd..63d238ff 100644
--- a/flink-table-store-core/src/test/java/org/apache/flink/table/store/table/ChangelogWithKeyFileStoreTableTest.java
+++ b/flink-table-store-core/src/test/java/org/apache/flink/table/store/table/ChangelogWithKeyFileStoreTableTest.java
@@ -21,9 +21,8 @@ package org.apache.flink.table.store.table;
 import org.apache.flink.configuration.Configuration;
 import org.apache.flink.core.fs.Path;
 import org.apache.flink.table.data.GenericRowData;
-import org.apache.flink.table.store.file.FileStoreOptions;
+import org.apache.flink.table.store.CoreOptions;
 import org.apache.flink.table.store.file.WriteMode;
-import org.apache.flink.table.store.file.mergetree.MergeTreeOptions;
 import org.apache.flink.table.store.file.predicate.Predicate;
 import org.apache.flink.table.store.file.predicate.PredicateBuilder;
 import org.apache.flink.table.store.file.schema.SchemaManager;
@@ -194,10 +193,10 @@ public class ChangelogWithKeyFileStoreTableTest extends FileStoreTableTestBase {
     protected FileStoreTable createFileStoreTable(boolean changelogFile) throws Exception {
         Path tablePath = new Path(tempDir.toString());
         Configuration conf = new Configuration();
-        conf.set(FileStoreOptions.PATH, tablePath.toString());
-        conf.set(FileStoreOptions.FILE_FORMAT, "avro");
-        conf.set(FileStoreOptions.WRITE_MODE, WriteMode.CHANGE_LOG);
-        conf.set(MergeTreeOptions.CHANGELOG_FILE, changelogFile);
+        conf.set(CoreOptions.PATH, tablePath.toString());
+        conf.set(CoreOptions.FILE_FORMAT, "avro");
+        conf.set(CoreOptions.WRITE_MODE, WriteMode.CHANGE_LOG);
+        conf.set(CoreOptions.CHANGELOG_FILE, changelogFile);
         SchemaManager schemaManager = new SchemaManager(tablePath);
         TableSchema tableSchema =
                 schemaManager.commitNewVersion(
diff --git a/flink-table-store-core/src/test/java/org/apache/flink/table/store/table/WritePreemptMemoryTest.java b/flink-table-store-core/src/test/java/org/apache/flink/table/store/table/WritePreemptMemoryTest.java
index 4c6d02fe..b9f77bbd 100644
--- a/flink-table-store-core/src/test/java/org/apache/flink/table/store/table/WritePreemptMemoryTest.java
+++ b/flink-table-store-core/src/test/java/org/apache/flink/table/store/table/WritePreemptMemoryTest.java
@@ -22,9 +22,8 @@ import org.apache.flink.configuration.Configuration;
 import org.apache.flink.configuration.MemorySize;
 import org.apache.flink.core.fs.Path;
 import org.apache.flink.table.data.GenericRowData;
-import org.apache.flink.table.store.file.FileStoreOptions;
+import org.apache.flink.table.store.CoreOptions;
 import org.apache.flink.table.store.file.WriteMode;
-import org.apache.flink.table.store.file.mergetree.MergeTreeOptions;
 import org.apache.flink.table.store.file.schema.SchemaManager;
 import org.apache.flink.table.store.file.schema.TableSchema;
 import org.apache.flink.table.store.file.schema.UpdateSchema;
@@ -89,11 +88,11 @@ public class WritePreemptMemoryTest extends FileStoreTableTestBase {
     protected FileStoreTable createFileStoreTable() throws Exception {
         Path tablePath = new Path(tempDir.toString());
         Configuration conf = new Configuration();
-        conf.set(FileStoreOptions.PATH, tablePath.toString());
-        conf.set(FileStoreOptions.FILE_FORMAT, "avro");
-        conf.set(FileStoreOptions.WRITE_MODE, WriteMode.CHANGE_LOG);
-        conf.set(MergeTreeOptions.WRITE_BUFFER_SIZE, new MemorySize(30 * 1024));
-        conf.set(MergeTreeOptions.PAGE_SIZE, new MemorySize(1024));
+        conf.set(CoreOptions.PATH, tablePath.toString());
+        conf.set(CoreOptions.FILE_FORMAT, "avro");
+        conf.set(CoreOptions.WRITE_MODE, WriteMode.CHANGE_LOG);
+        conf.set(CoreOptions.WRITE_BUFFER_SIZE, new MemorySize(30 * 1024));
+        conf.set(CoreOptions.PAGE_SIZE, new MemorySize(1024));
         SchemaManager schemaManager = new SchemaManager(tablePath);
         TableSchema schema =
                 schemaManager.commitNewVersion(
diff --git a/flink-table-store-hive/flink-table-store-hive-connector/src/main/java/org/apache/flink/table/store/mapred/TableStoreInputFormat.java b/flink-table-store-hive/flink-table-store-hive-connector/src/main/java/org/apache/flink/table/store/mapred/TableStoreInputFormat.java
index 98f502ac..1e63b051 100644
--- a/flink-table-store-hive/flink-table-store-hive-connector/src/main/java/org/apache/flink/table/store/mapred/TableStoreInputFormat.java
+++ b/flink-table-store-hive/flink-table-store-hive-connector/src/main/java/org/apache/flink/table/store/mapred/TableStoreInputFormat.java
@@ -19,10 +19,10 @@
 package org.apache.flink.table.store.mapred;
 
 import org.apache.flink.configuration.Configuration;
+import org.apache.flink.table.store.CoreOptions;
 import org.apache.flink.table.store.RowDataContainer;
 import org.apache.flink.table.store.SearchArgumentToPredicateConverter;
 import org.apache.flink.table.store.TableStoreJobConf;
-import org.apache.flink.table.store.file.FileStoreOptions;
 import org.apache.flink.table.store.file.predicate.Predicate;
 import org.apache.flink.table.store.file.schema.TableSchema;
 import org.apache.flink.table.store.table.FileStoreTable;
@@ -71,7 +71,7 @@ public class TableStoreInputFormat implements InputFormat<Void, RowDataContainer
     private FileStoreTable createFileStoreTable(JobConf jobConf) {
         TableStoreJobConf wrapper = new TableStoreJobConf(jobConf);
         Configuration conf = new Configuration();
-        conf.set(FileStoreOptions.PATH, wrapper.getLocation());
+        conf.set(CoreOptions.PATH, wrapper.getLocation());
         return FileStoreTableFactory.create(conf, wrapper.getFileStoreUser());
     }
 
diff --git a/flink-table-store-hive/flink-table-store-hive-connector/src/test/java/org/apache/flink/table/store/FileStoreTestUtils.java b/flink-table-store-hive/flink-table-store-hive-connector/src/test/java/org/apache/flink/table/store/FileStoreTestUtils.java
index 10721be9..05ef9fbf 100644
--- a/flink-table-store-hive/flink-table-store-hive-connector/src/test/java/org/apache/flink/table/store/FileStoreTestUtils.java
+++ b/flink-table-store-hive/flink-table-store-hive-connector/src/test/java/org/apache/flink/table/store/FileStoreTestUtils.java
@@ -20,7 +20,6 @@ package org.apache.flink.table.store;
 
 import org.apache.flink.configuration.Configuration;
 import org.apache.flink.core.fs.Path;
-import org.apache.flink.table.store.file.FileStoreOptions;
 import org.apache.flink.table.store.file.schema.SchemaManager;
 import org.apache.flink.table.store.file.schema.UpdateSchema;
 import org.apache.flink.table.store.table.FileStoreTable;
@@ -29,7 +28,7 @@ import org.apache.flink.table.types.logical.RowType;
 
 import java.util.List;
 
-import static org.apache.flink.table.store.file.FileStoreOptions.PATH;
+import static org.apache.flink.table.store.CoreOptions.PATH;
 
 /** Test utils related to {@link org.apache.flink.table.store.file.FileStore}. */
 public class FileStoreTestUtils {
@@ -40,7 +39,7 @@ public class FileStoreTestUtils {
             List<String> partitionKeys,
             List<String> primaryKeys)
             throws Exception {
-        Path tablePath = FileStoreOptions.path(conf);
+        Path tablePath = CoreOptions.path(conf);
         new SchemaManager(tablePath)
                 .commitNewVersion(
                         new UpdateSchema(rowType, partitionKeys, primaryKeys, conf.toMap(), ""));
diff --git a/flink-table-store-hive/flink-table-store-hive-connector/src/test/java/org/apache/flink/table/store/hive/TableStoreHiveStorageHandlerITCase.java b/flink-table-store-hive/flink-table-store-hive-connector/src/test/java/org/apache/flink/table/store/hive/TableStoreHiveStorageHandlerITCase.java
index 00e19636..05cff356 100644
--- a/flink-table-store-hive/flink-table-store-hive-connector/src/test/java/org/apache/flink/table/store/hive/TableStoreHiveStorageHandlerITCase.java
+++ b/flink-table-store-hive/flink-table-store-hive-connector/src/test/java/org/apache/flink/table/store/hive/TableStoreHiveStorageHandlerITCase.java
@@ -24,8 +24,8 @@ import org.apache.flink.table.api.DataTypes;
 import org.apache.flink.table.data.GenericRowData;
 import org.apache.flink.table.data.StringData;
 import org.apache.flink.table.data.TimestampData;
+import org.apache.flink.table.store.CoreOptions;
 import org.apache.flink.table.store.FileStoreTestUtils;
-import org.apache.flink.table.store.file.FileStoreOptions;
 import org.apache.flink.table.store.hive.objectinspector.TableStoreObjectInspectorFactory;
 import org.apache.flink.table.store.table.FileStoreTable;
 import org.apache.flink.table.store.table.sink.TableWrite;
@@ -84,9 +84,9 @@ public class TableStoreHiveStorageHandlerITCase {
     public void testReadExternalTableWithPk() throws Exception {
         String path = folder.newFolder().toURI().toString();
         Configuration conf = new Configuration();
-        conf.setString(FileStoreOptions.PATH, path);
-        conf.setInteger(FileStoreOptions.BUCKET, 2);
-        conf.setString(FileStoreOptions.FILE_FORMAT, "avro");
+        conf.setString(CoreOptions.PATH, path);
+        conf.setInteger(CoreOptions.BUCKET, 2);
+        conf.setString(CoreOptions.FILE_FORMAT, "avro");
         FileStoreTable table =
                 FileStoreTestUtils.createFileStoreTable(
                         conf,
@@ -126,9 +126,9 @@ public class TableStoreHiveStorageHandlerITCase {
     public void testReadExternalTableWithoutPk() throws Exception {
         String path = folder.newFolder().toURI().toString();
         Configuration conf = new Configuration();
-        conf.setString(FileStoreOptions.PATH, path);
-        conf.setInteger(FileStoreOptions.BUCKET, 2);
-        conf.setString(FileStoreOptions.FILE_FORMAT, "avro");
+        conf.setString(CoreOptions.PATH, path);
+        conf.setInteger(CoreOptions.BUCKET, 2);
+        conf.setString(CoreOptions.FILE_FORMAT, "avro");
         FileStoreTable table =
                 FileStoreTestUtils.createFileStoreTable(
                         conf,
@@ -169,8 +169,8 @@ public class TableStoreHiveStorageHandlerITCase {
     public void testReadAllSupportedTypes() throws Exception {
         String root = folder.newFolder().toString();
         Configuration conf = new Configuration();
-        conf.setString(FileStoreOptions.PATH, root);
-        conf.setString(FileStoreOptions.FILE_FORMAT, "avro");
+        conf.setString(CoreOptions.PATH, root);
+        conf.setString(CoreOptions.FILE_FORMAT, "avro");
         FileStoreTable table =
                 FileStoreTestUtils.createFileStoreTable(
                         conf,
@@ -284,8 +284,8 @@ public class TableStoreHiveStorageHandlerITCase {
     public void testPredicatePushDown() throws Exception {
         String path = folder.newFolder().toURI().toString();
         Configuration conf = new Configuration();
-        conf.setString(FileStoreOptions.PATH, path);
-        conf.setString(FileStoreOptions.FILE_FORMAT, "avro");
+        conf.setString(CoreOptions.PATH, path);
+        conf.setString(CoreOptions.FILE_FORMAT, "avro");
         FileStoreTable table =
                 FileStoreTestUtils.createFileStoreTable(
                         conf,
@@ -374,8 +374,8 @@ public class TableStoreHiveStorageHandlerITCase {
     public void testDateAndTimestamp() throws Exception {
         String path = folder.newFolder().toURI().toString();
         Configuration conf = new Configuration();
-        conf.setString(FileStoreOptions.PATH, path);
-        conf.setString(FileStoreOptions.FILE_FORMAT, "avro");
+        conf.setString(CoreOptions.PATH, path);
+        conf.setString(CoreOptions.FILE_FORMAT, "avro");
         FileStoreTable table =
                 FileStoreTestUtils.createFileStoreTable(
                         conf,
diff --git a/flink-table-store-hive/flink-table-store-hive-connector/src/test/java/org/apache/flink/table/store/mapred/TableStoreRecordReaderTest.java b/flink-table-store-hive/flink-table-store-hive-connector/src/test/java/org/apache/flink/table/store/mapred/TableStoreRecordReaderTest.java
index 6befdfff..610dea4f 100644
--- a/flink-table-store-hive/flink-table-store-hive-connector/src/test/java/org/apache/flink/table/store/mapred/TableStoreRecordReaderTest.java
+++ b/flink-table-store-hive/flink-table-store-hive-connector/src/test/java/org/apache/flink/table/store/mapred/TableStoreRecordReaderTest.java
@@ -26,9 +26,9 @@ import org.apache.flink.table.data.RowData;
 import org.apache.flink.table.data.StringData;
 import org.apache.flink.table.data.binary.BinaryRowData;
 import org.apache.flink.table.data.binary.BinaryRowDataUtil;
+import org.apache.flink.table.store.CoreOptions;
 import org.apache.flink.table.store.FileStoreTestUtils;
 import org.apache.flink.table.store.RowDataContainer;
-import org.apache.flink.table.store.file.FileStoreOptions;
 import org.apache.flink.table.store.file.data.DataFileMeta;
 import org.apache.flink.table.store.file.utils.RecordReader;
 import org.apache.flink.table.store.table.FileStoreTable;
@@ -57,8 +57,8 @@ public class TableStoreRecordReaderTest {
     @Test
     public void testPk() throws Exception {
         Configuration conf = new Configuration();
-        conf.setString(FileStoreOptions.PATH, tempDir.toString());
-        conf.setString(FileStoreOptions.FILE_FORMAT, "avro");
+        conf.setString(CoreOptions.PATH, tempDir.toString());
+        conf.setString(CoreOptions.FILE_FORMAT, "avro");
         FileStoreTable table =
                 FileStoreTestUtils.createFileStoreTable(
                         conf,
@@ -98,8 +98,8 @@ public class TableStoreRecordReaderTest {
     @Test
     public void testValueCount() throws Exception {
         Configuration conf = new Configuration();
-        conf.setString(FileStoreOptions.PATH, tempDir.toString());
-        conf.setString(FileStoreOptions.FILE_FORMAT, "avro");
+        conf.setString(CoreOptions.PATH, tempDir.toString());
+        conf.setString(CoreOptions.FILE_FORMAT, "avro");
         FileStoreTable table =
                 FileStoreTestUtils.createFileStoreTable(
                         conf,
diff --git a/flink-table-store-kafka/src/main/java/org/apache/flink/table/store/kafka/KafkaLogSerializationSchema.java b/flink-table-store-kafka/src/main/java/org/apache/flink/table/store/kafka/KafkaLogSerializationSchema.java
index bf0aa4c5..22d9d3f5 100644
--- a/flink-table-store-kafka/src/main/java/org/apache/flink/table/store/kafka/KafkaLogSerializationSchema.java
+++ b/flink-table-store-kafka/src/main/java/org/apache/flink/table/store/kafka/KafkaLogSerializationSchema.java
@@ -22,7 +22,7 @@ import org.apache.flink.api.common.serialization.SerializationSchema;
 import org.apache.flink.connector.kafka.sink.KafkaRecordSerializationSchema;
 import org.apache.flink.streaming.connectors.kafka.KafkaSerializationSchema;
 import org.apache.flink.table.data.RowData;
-import org.apache.flink.table.store.log.LogOptions.LogChangelogMode;
+import org.apache.flink.table.store.CoreOptions.LogChangelogMode;
 import org.apache.flink.table.store.table.sink.SinkRecord;
 import org.apache.flink.types.RowKind;
 
diff --git a/flink-table-store-kafka/src/main/java/org/apache/flink/table/store/kafka/KafkaLogSinkProvider.java b/flink-table-store-kafka/src/main/java/org/apache/flink/table/store/kafka/KafkaLogSinkProvider.java
index 7365e44d..7698bcfe 100644
--- a/flink-table-store-kafka/src/main/java/org/apache/flink/table/store/kafka/KafkaLogSinkProvider.java
+++ b/flink-table-store-kafka/src/main/java/org/apache/flink/table/store/kafka/KafkaLogSinkProvider.java
@@ -22,8 +22,8 @@ import org.apache.flink.annotation.VisibleForTesting;
 import org.apache.flink.api.common.serialization.SerializationSchema;
 import org.apache.flink.streaming.connectors.kafka.FlinkKafkaProducer.Semantic;
 import org.apache.flink.table.data.RowData;
-import org.apache.flink.table.store.log.LogOptions.LogChangelogMode;
-import org.apache.flink.table.store.log.LogOptions.LogConsistency;
+import org.apache.flink.table.store.CoreOptions.LogChangelogMode;
+import org.apache.flink.table.store.CoreOptions.LogConsistency;
 import org.apache.flink.table.store.log.LogSinkProvider;
 import org.apache.flink.table.store.table.sink.LogSinkFunction;
 
diff --git a/flink-table-store-kafka/src/main/java/org/apache/flink/table/store/kafka/KafkaLogSourceProvider.java b/flink-table-store-kafka/src/main/java/org/apache/flink/table/store/kafka/KafkaLogSourceProvider.java
index 861f77bb..59c0d4ab 100644
--- a/flink-table-store-kafka/src/main/java/org/apache/flink/table/store/kafka/KafkaLogSourceProvider.java
+++ b/flink-table-store-kafka/src/main/java/org/apache/flink/table/store/kafka/KafkaLogSourceProvider.java
@@ -24,8 +24,8 @@ import org.apache.flink.connector.kafka.source.KafkaSource;
 import org.apache.flink.connector.kafka.source.enumerator.initializer.OffsetsInitializer;
 import org.apache.flink.connector.kafka.source.reader.deserializer.KafkaRecordDeserializationSchema;
 import org.apache.flink.table.data.RowData;
-import org.apache.flink.table.store.log.LogOptions.LogConsistency;
-import org.apache.flink.table.store.log.LogOptions.LogStartupMode;
+import org.apache.flink.table.store.CoreOptions.LogConsistency;
+import org.apache.flink.table.store.CoreOptions.LogStartupMode;
 import org.apache.flink.table.store.log.LogSourceProvider;
 import org.apache.flink.table.types.DataType;
 
diff --git a/flink-table-store-kafka/src/main/java/org/apache/flink/table/store/kafka/KafkaLogStoreFactory.java b/flink-table-store-kafka/src/main/java/org/apache/flink/table/store/kafka/KafkaLogStoreFactory.java
index b247f575..85870e9f 100644
--- a/flink-table-store-kafka/src/main/java/org/apache/flink/table/store/kafka/KafkaLogStoreFactory.java
+++ b/flink-table-store-kafka/src/main/java/org/apache/flink/table/store/kafka/KafkaLogStoreFactory.java
@@ -55,16 +55,16 @@ import java.util.Set;
 import java.util.concurrent.ExecutionException;
 
 import static org.apache.flink.table.factories.FactoryUtil.createTableFactoryHelper;
+import static org.apache.flink.table.store.CoreOptions.LOG_CHANGELOG_MODE;
+import static org.apache.flink.table.store.CoreOptions.LOG_CONSISTENCY;
+import static org.apache.flink.table.store.CoreOptions.LOG_FORMAT;
+import static org.apache.flink.table.store.CoreOptions.LOG_KEY_FORMAT;
+import static org.apache.flink.table.store.CoreOptions.LOG_RETENTION;
+import static org.apache.flink.table.store.CoreOptions.LOG_SCAN;
+import static org.apache.flink.table.store.CoreOptions.LOG_SCAN_TIMESTAMP_MILLS;
+import static org.apache.flink.table.store.CoreOptions.LogConsistency;
 import static org.apache.flink.table.store.kafka.KafkaLogOptions.BOOTSTRAP_SERVERS;
 import static org.apache.flink.table.store.kafka.KafkaLogOptions.TOPIC;
-import static org.apache.flink.table.store.log.LogOptions.CHANGELOG_MODE;
-import static org.apache.flink.table.store.log.LogOptions.CONSISTENCY;
-import static org.apache.flink.table.store.log.LogOptions.FORMAT;
-import static org.apache.flink.table.store.log.LogOptions.KEY_FORMAT;
-import static org.apache.flink.table.store.log.LogOptions.LogConsistency;
-import static org.apache.flink.table.store.log.LogOptions.RETENTION;
-import static org.apache.flink.table.store.log.LogOptions.SCAN;
-import static org.apache.flink.table.store.log.LogOptions.SCAN_TIMESTAMP_MILLS;
 import static org.apache.kafka.clients.consumer.ConsumerConfig.ISOLATION_LEVEL_CONFIG;
 
 /** The Kafka {@link LogStoreTableFactory} implementation. */
@@ -89,14 +89,14 @@ public class KafkaLogStoreFactory implements LogStoreTableFactory {
     @Override
     public Set<ConfigOption<?>> optionalOptions() {
         Set<ConfigOption<?>> options = new HashSet<>();
-        options.add(SCAN);
+        options.add(LOG_SCAN);
         options.add(TOPIC);
-        options.add(SCAN_TIMESTAMP_MILLS);
-        options.add(RETENTION);
-        options.add(CONSISTENCY);
-        options.add(CHANGELOG_MODE);
-        options.add(KEY_FORMAT);
-        options.add(FORMAT);
+        options.add(LOG_SCAN_TIMESTAMP_MILLS);
+        options.add(LOG_RETENTION);
+        options.add(LOG_CONSISTENCY);
+        options.add(LOG_CHANGELOG_MODE);
+        options.add(LOG_KEY_FORMAT);
+        options.add(LOG_FORMAT);
         return options;
     }
 
@@ -124,7 +124,7 @@ public class KafkaLogStoreFactory implements LogStoreTableFactory {
         try (AdminClient adminClient = AdminClient.create(toKafkaProperties(helper.getOptions()))) {
             Map<String, String> configs = new HashMap<>();
             helper.getOptions()
-                    .getOptional(RETENTION)
+                    .getOptional(LOG_RETENTION)
                     .ifPresent(
                             retention ->
                                     configs.put(
@@ -210,9 +210,9 @@ public class KafkaLogStoreFactory implements LogStoreTableFactory {
                 primaryKeyDeserializer,
                 valueDeserializer,
                 projectFields,
-                helper.getOptions().get(CONSISTENCY),
-                helper.getOptions().get(SCAN),
-                helper.getOptions().get(SCAN_TIMESTAMP_MILLS));
+                helper.getOptions().get(LOG_CONSISTENCY),
+                helper.getOptions().get(LOG_SCAN),
+                helper.getOptions().get(LOG_SCAN_TIMESTAMP_MILLS));
     }
 
     @Override
@@ -237,8 +237,8 @@ public class KafkaLogStoreFactory implements LogStoreTableFactory {
                 toKafkaProperties(helper.getOptions()),
                 primaryKeySerializer,
                 valueSerializer,
-                helper.getOptions().get(CONSISTENCY),
-                helper.getOptions().get(CHANGELOG_MODE));
+                helper.getOptions().get(LOG_CONSISTENCY),
+                helper.getOptions().get(LOG_CHANGELOG_MODE));
     }
 
     private int[] getPrimaryKeyIndexes(ResolvedSchema schema) {
@@ -261,7 +261,7 @@ public class KafkaLogStoreFactory implements LogStoreTableFactory {
                                         optionMap.get(key)));
 
         // Add read committed for transactional consistency mode.
-        if (options.get(CONSISTENCY) == LogConsistency.TRANSACTIONAL) {
+        if (options.get(LOG_CONSISTENCY) == LogConsistency.TRANSACTIONAL) {
             properties.setProperty(ISOLATION_LEVEL_CONFIG, "read_committed");
         }
         return properties;
diff --git a/flink-table-store-kafka/src/test/java/org/apache/flink/table/store/kafka/KafkaLogITCase.java b/flink-table-store-kafka/src/test/java/org/apache/flink/table/store/kafka/KafkaLogITCase.java
index 30c66cfa..e6f39c90 100644
--- a/flink-table-store-kafka/src/test/java/org/apache/flink/table/store/kafka/KafkaLogITCase.java
+++ b/flink-table-store-kafka/src/test/java/org/apache/flink/table/store/kafka/KafkaLogITCase.java
@@ -23,9 +23,9 @@ import org.apache.flink.configuration.Configuration;
 import org.apache.flink.connector.kafka.source.KafkaSource;
 import org.apache.flink.table.data.RowData;
 import org.apache.flink.table.factories.DynamicTableFactory.Context;
+import org.apache.flink.table.store.CoreOptions.LogChangelogMode;
+import org.apache.flink.table.store.CoreOptions.LogConsistency;
 import org.apache.flink.table.store.file.utils.BlockingIterator;
-import org.apache.flink.table.store.log.LogOptions.LogChangelogMode;
-import org.apache.flink.table.store.log.LogOptions.LogConsistency;
 import org.apache.flink.types.RowKind;
 
 import org.junit.Assert;
diff --git a/flink-table-store-kafka/src/test/java/org/apache/flink/table/store/kafka/KafkaLogSerializationTest.java b/flink-table-store-kafka/src/test/java/org/apache/flink/table/store/kafka/KafkaLogSerializationTest.java
index 35ff9960..7f4f5df7 100644
--- a/flink-table-store-kafka/src/test/java/org/apache/flink/table/store/kafka/KafkaLogSerializationTest.java
+++ b/flink-table-store-kafka/src/test/java/org/apache/flink/table/store/kafka/KafkaLogSerializationTest.java
@@ -21,7 +21,7 @@ package org.apache.flink.table.store.kafka;
 import org.apache.flink.connector.kafka.source.reader.deserializer.KafkaRecordDeserializationSchema;
 import org.apache.flink.table.data.RowData;
 import org.apache.flink.table.factories.DynamicTableFactory;
-import org.apache.flink.table.store.log.LogOptions.LogChangelogMode;
+import org.apache.flink.table.store.CoreOptions.LogChangelogMode;
 import org.apache.flink.table.store.table.sink.SinkRecord;
 import org.apache.flink.types.RowKind;
 import org.apache.flink.util.Collector;
diff --git a/flink-table-store-kafka/src/test/java/org/apache/flink/table/store/kafka/KafkaLogTestUtils.java b/flink-table-store-kafka/src/test/java/org/apache/flink/table/store/kafka/KafkaLogTestUtils.java
index ac225429..343525e6 100644
--- a/flink-table-store-kafka/src/test/java/org/apache/flink/table/store/kafka/KafkaLogTestUtils.java
+++ b/flink-table-store-kafka/src/test/java/org/apache/flink/table/store/kafka/KafkaLogTestUtils.java
@@ -54,13 +54,13 @@ import java.util.stream.Collectors;
 import java.util.stream.IntStream;
 
 import static org.apache.flink.table.data.binary.BinaryRowDataUtil.EMPTY_ROW;
+import static org.apache.flink.table.store.CoreOptions.LOG_CHANGELOG_MODE;
+import static org.apache.flink.table.store.CoreOptions.LOG_CONSISTENCY;
+import static org.apache.flink.table.store.CoreOptions.LogChangelogMode;
+import static org.apache.flink.table.store.CoreOptions.LogConsistency;
 import static org.apache.flink.table.store.file.mergetree.compact.CompactManagerTest.row;
 import static org.apache.flink.table.store.kafka.KafkaLogOptions.BOOTSTRAP_SERVERS;
 import static org.apache.flink.table.store.kafka.KafkaLogOptions.TOPIC;
-import static org.apache.flink.table.store.log.LogOptions.CHANGELOG_MODE;
-import static org.apache.flink.table.store.log.LogOptions.CONSISTENCY;
-import static org.apache.flink.table.store.log.LogOptions.LogChangelogMode;
-import static org.apache.flink.table.store.log.LogOptions.LogConsistency;
 
 /** Utils for the test of {@link KafkaLogStoreFactory}. */
 public class KafkaLogTestUtils {
@@ -188,8 +188,8 @@ public class KafkaLogTestUtils {
             RowType type,
             int[] keys) {
         Map<String, String> options = new HashMap<>();
-        options.put(CHANGELOG_MODE.key(), changelogMode.toString());
-        options.put(CONSISTENCY.key(), consistency.toString());
+        options.put(LOG_CHANGELOG_MODE.key(), changelogMode.toString());
+        options.put(LOG_CONSISTENCY.key(), consistency.toString());
         options.put(BOOTSTRAP_SERVERS.key(), servers);
         options.put(TOPIC.key(), UUID.randomUUID().toString());
         return createContext(name, type, keys, options);
diff --git a/flink-table-store-spark/src/test/java/org/apache/flink/table/store/spark/SimpleTableTestHelper.java b/flink-table-store-spark/src/test/java/org/apache/flink/table/store/spark/SimpleTableTestHelper.java
index 7231752f..14656cb5 100644
--- a/flink-table-store-spark/src/test/java/org/apache/flink/table/store/spark/SimpleTableTestHelper.java
+++ b/flink-table-store-spark/src/test/java/org/apache/flink/table/store/spark/SimpleTableTestHelper.java
@@ -21,7 +21,7 @@ package org.apache.flink.table.store.spark;
 import org.apache.flink.configuration.Configuration;
 import org.apache.flink.core.fs.Path;
 import org.apache.flink.table.data.RowData;
-import org.apache.flink.table.store.file.FileStoreOptions;
+import org.apache.flink.table.store.CoreOptions;
 import org.apache.flink.table.store.file.schema.SchemaManager;
 import org.apache.flink.table.store.file.schema.UpdateSchema;
 import org.apache.flink.table.store.table.FileStoreTable;
@@ -44,7 +44,7 @@ public class SimpleTableTestHelper {
     public SimpleTableTestHelper(Path path, RowType rowType) throws Exception {
         Map<String, String> options = new HashMap<>();
         // orc is shaded, can not find shaded classes in ide
-        options.put(FileStoreOptions.FILE_FORMAT.key(), "avro");
+        options.put(CoreOptions.FILE_FORMAT.key(), "avro");
         new SchemaManager(path)
                 .commitNewVersion(
                         new UpdateSchema(