You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by lz...@apache.org on 2022/07/05 06:43:41 UTC

[flink-table-store] branch master updated: [FLINK-28387] Organizing Options in table store

This is an automated email from the ASF dual-hosted git repository.

lzljs3620320 pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/flink-table-store.git


The following commit(s) were added to refs/heads/master by this push:
     new c233679e [FLINK-28387] Organizing Options in table store
c233679e is described below

commit c233679e5e0a3dea7d98c4d3487466d1139da6b8
Author: Jingsong Lee <ji...@gmail.com>
AuthorDate: Tue Jul 5 14:43:37 2022 +0800

    [FLINK-28387] Organizing Options in table store
    
    This closes #197
---
 docs/content/docs/development/create-table.md      |   4 +-
 .../store/connector/TableStoreManagedFactory.java  |  30 +--
 .../store/connector/AbstractTableStoreFactory.java |  51 ++--
 ...toryOptions.java => FlinkConnectorOptions.java} |  41 ++-
 .../store/connector/sink/FlinkSinkBuilder.java     |   6 +-
 .../table/store/connector/sink/TableStoreSink.java |  30 +--
 .../store/connector/source/FlinkSourceBuilder.java |  25 +-
 .../store/connector/source/TableStoreSource.java   |  33 ++-
 .../ComputedColumnAndWatermarkTableITCase.java     |  16 +-
 .../table/store/connector/CreateTableITCase.java   |   7 +-
 .../table/store/connector/DropTableITCase.java     |   4 +-
 .../store/connector/FileStoreTableITCase.java      |   6 +-
 .../store/connector/FileSystemCatalogITCase.java   |   8 +-
 .../store/connector/ReadWriteTableITCase.java      |   6 +-
 .../store/connector/ReadWriteTableTestBase.java    |  23 +-
 .../store/connector/StreamingWarehouseITCase.java  |   2 +-
 .../connector/TableStoreManagedFactoryTest.java    |  44 +---
 .../table/store/connector/TableStoreTestBase.java  |  14 +-
 .../store/connector/sink/LogStoreSinkITCase.java   |   9 +-
 .../source/TestChangelogDataReadWrite.java         |   2 +-
 .../apache/flink/table/store/CatalogOptions.java   |  25 +-
 .../org/apache/flink/table/store/CoreOptions.java  | 200 ++++++--------
 .../table/store/file/AppendOnlyFileStore.java      |   2 +-
 .../flink/table/store/file/FileStoreOptions.java   | 289 ---------------------
 .../flink/table/store/file/KeyValueFileStore.java  |   2 +
 .../table/store/file/catalog/CatalogFactory.java   |  16 +-
 .../store/file/mergetree/MergeTreeOptions.java     | 178 -------------
 .../file/operation/KeyValueFileStoreWrite.java     |  26 +-
 .../apache/flink/table/store/log/LogOptions.java   | 194 --------------
 .../table/store/log/LogStoreTableFactory.java      |  24 +-
 .../table/store/table/sink/MemoryTableWrite.java   |   2 +-
 .../table/store/file/mergetree/MergeTreeTest.java  |  19 +-
 .../flink/table/store/tests/LogStoreE2eTest.java   |   2 +-
 .../flink/table/store/hive/HiveCatalogFactory.java |  10 +-
 .../flink/table/store/kafka/KafkaLogOptions.java   |   2 +-
 .../table/store/kafka/KafkaLogStoreFactory.java    |  21 +-
 .../flink/table/store/kafka/KafkaLogTestUtils.java |   4 +-
 37 files changed, 310 insertions(+), 1067 deletions(-)

diff --git a/docs/content/docs/development/create-table.md b/docs/content/docs/development/create-table.md
index cba5a9da..cec6c481 100644
--- a/docs/content/docs/development/create-table.md
+++ b/docs/content/docs/development/create-table.md
@@ -115,14 +115,14 @@ Important options include the following:
       <td>The log system used to keep changes of the table, supports 'kafka'.</td>
     </tr>
     <tr>
-      <td><h5>log.kafka.bootstrap.servers</h5></td>
+      <td><h5>kafka.bootstrap.servers</h5></td>
       <td>No</td>
       <td style="word-wrap: break-word;">(none)</td>
       <td>String</td>
       <td>Required Kafka server connection string for log store.</td>
     </tr>
     <tr>
-      <td><h5>log.topic</h5></td>
+      <td><h5>kafka.topic</h5></td>
       <td>No</td>
       <td style="word-wrap: break-word;">(none)</td>
       <td>String</td>
diff --git a/flink-table-store-connector/src/main/1.15.0/org/apache/flink/table/store/connector/TableStoreManagedFactory.java b/flink-table-store-connector/src/main/1.15.0/org/apache/flink/table/store/connector/TableStoreManagedFactory.java
index 431e5c9b..37dfda61 100644
--- a/flink-table-store-connector/src/main/1.15.0/org/apache/flink/table/store/connector/TableStoreManagedFactory.java
+++ b/flink-table-store-connector/src/main/1.15.0/org/apache/flink/table/store/connector/TableStoreManagedFactory.java
@@ -18,11 +18,9 @@
 
 package org.apache.flink.table.store.connector;
 
-import org.apache.flink.annotation.VisibleForTesting;
 import org.apache.flink.core.fs.Path;
 import org.apache.flink.table.api.TableException;
 import org.apache.flink.table.catalog.CatalogPartitionSpec;
-import org.apache.flink.table.catalog.ObjectIdentifier;
 import org.apache.flink.table.factories.ManagedTableFactory;
 import org.apache.flink.table.store.CoreOptions;
 import org.apache.flink.table.store.file.schema.SchemaManager;
@@ -31,6 +29,7 @@ import org.apache.flink.table.store.file.utils.JsonSerdeUtil;
 import org.apache.flink.table.store.log.LogStoreTableFactory;
 import org.apache.flink.util.Preconditions;
 
+
 import java.io.IOException;
 import java.io.UncheckedIOException;
 import java.util.HashMap;
@@ -39,13 +38,13 @@ import java.util.Objects;
 import java.util.Optional;
 
 import static org.apache.flink.table.store.CoreOptions.BUCKET;
-import static org.apache.flink.table.store.CoreOptions.LOG_PREFIX;
 import static org.apache.flink.table.store.CoreOptions.PATH;
-import static org.apache.flink.table.store.CoreOptions.TABLE_STORE_PREFIX;
 import static org.apache.flink.table.store.CoreOptions.WRITE_MODE;
-import static org.apache.flink.table.store.connector.TableStoreFactoryOptions.COMPACTION_MANUAL_TRIGGERED;
-import static org.apache.flink.table.store.connector.TableStoreFactoryOptions.COMPACTION_PARTITION_SPEC;
-import static org.apache.flink.table.store.connector.TableStoreFactoryOptions.ROOT_PATH;
+import static org.apache.flink.table.store.connector.FlinkConnectorOptions.COMPACTION_MANUAL_TRIGGERED;
+import static org.apache.flink.table.store.connector.FlinkConnectorOptions.COMPACTION_PARTITION_SPEC;
+import static org.apache.flink.table.store.connector.FlinkConnectorOptions.ROOT_PATH;
+import static org.apache.flink.table.store.connector.FlinkConnectorOptions.TABLE_STORE_PREFIX;
+import static org.apache.flink.table.store.connector.FlinkConnectorOptions.relativeTablePath;
 import static org.apache.flink.table.store.file.WriteMode.APPEND_ONLY;
 
 /** Default implementation of {@link ManagedTableFactory}. */
@@ -85,21 +84,12 @@ public class TableStoreManagedFactory extends AbstractTableStoreFactory
                 createOptionalLogStoreFactory(context.getClassLoader(), enrichedOptions);
         logFactory.ifPresent(
                 factory ->
-                        factory.enrichOptions(createLogContext(context, enrichedOptions))
-                                .forEach((k, v) -> enrichedOptions.putIfAbsent(LOG_PREFIX + k, v)));
+                        factory.enrichOptions(new TableStoreDynamicContext(context, enrichedOptions))
+                                .forEach(enrichedOptions::putIfAbsent));
 
         return enrichedOptions;
     }
 
-    @VisibleForTesting
-    static String relativeTablePath(ObjectIdentifier tableIdentifier) {
-        return String.format(
-                "%s.catalog/%s.db/%s",
-                tableIdentifier.getCatalogName(),
-                tableIdentifier.getDatabaseName(),
-                tableIdentifier.getObjectName());
-    }
-
     @Override
     public void onCreateTable(Context context, boolean ignoreIfExists) {
         Map<String, String> options = context.getCatalogTable().getOptions();
@@ -151,7 +141,7 @@ public class TableStoreManagedFactory extends AbstractTableStoreFactory
                 .ifPresent(
                         factory ->
                                 factory.onCreateTable(
-                                        createLogContext(context),
+                                        context,
                                         Integer.parseInt(
                                                 options.getOrDefault(
                                                         BUCKET.key(),
@@ -179,7 +169,7 @@ public class TableStoreManagedFactory extends AbstractTableStoreFactory
         createOptionalLogStoreFactory(context)
                 .ifPresent(
                         factory ->
-                                factory.onDropTable(createLogContext(context), ignoreIfNotExists));
+                                factory.onDropTable(context, ignoreIfNotExists));
     }
 
     @Override
diff --git a/flink-table-store-connector/src/main/java/org/apache/flink/table/store/connector/AbstractTableStoreFactory.java b/flink-table-store-connector/src/main/java/org/apache/flink/table/store/connector/AbstractTableStoreFactory.java
index 54de82ff..4f729bf6 100644
--- a/flink-table-store-connector/src/main/java/org/apache/flink/table/store/connector/AbstractTableStoreFactory.java
+++ b/flink-table-store-connector/src/main/java/org/apache/flink/table/store/connector/AbstractTableStoreFactory.java
@@ -21,13 +21,15 @@ package org.apache.flink.table.store.connector;
 import org.apache.flink.api.common.RuntimeExecutionMode;
 import org.apache.flink.configuration.ConfigOption;
 import org.apache.flink.configuration.Configuration;
-import org.apache.flink.configuration.DelegatingConfiguration;
 import org.apache.flink.configuration.ExecutionOptions;
 import org.apache.flink.table.api.ValidationException;
 import org.apache.flink.table.factories.DynamicTableFactory;
 import org.apache.flink.table.factories.DynamicTableSinkFactory;
 import org.apache.flink.table.factories.DynamicTableSourceFactory;
 import org.apache.flink.table.store.CoreOptions;
+import org.apache.flink.table.store.CoreOptions.LogChangelogMode;
+import org.apache.flink.table.store.CoreOptions.LogConsistency;
+import org.apache.flink.table.store.CoreOptions.LogStartupMode;
 import org.apache.flink.table.store.connector.sink.TableStoreSink;
 import org.apache.flink.table.store.connector.source.TableStoreSource;
 import org.apache.flink.table.store.file.schema.TableSchema;
@@ -39,17 +41,16 @@ import org.apache.flink.table.types.logical.RowType;
 import org.apache.flink.util.Preconditions;
 
 import java.util.Collections;
+import java.util.HashSet;
 import java.util.List;
 import java.util.Map;
 import java.util.Optional;
 import java.util.Set;
-import java.util.stream.Collectors;
 
 import static org.apache.flink.table.store.CoreOptions.LOG_CHANGELOG_MODE;
 import static org.apache.flink.table.store.CoreOptions.LOG_CONSISTENCY;
-import static org.apache.flink.table.store.CoreOptions.LOG_PREFIX;
 import static org.apache.flink.table.store.CoreOptions.LOG_SCAN;
-import static org.apache.flink.table.store.connector.TableStoreFactoryOptions.LOG_SYSTEM;
+import static org.apache.flink.table.store.connector.FlinkConnectorOptions.LOG_SYSTEM;
 import static org.apache.flink.table.store.log.LogStoreTableFactory.discoverLogStoreFactory;
 
 /** Abstract table store factory to create table source and table sink. */
@@ -63,7 +64,7 @@ public abstract class AbstractTableStoreFactory
                 buildFileStoreTable(context),
                 context.getConfiguration().get(ExecutionOptions.RUNTIME_MODE)
                         == RuntimeExecutionMode.STREAMING,
-                createLogContext(context),
+                context,
                 createOptionalLogStoreFactory(context).orElse(null));
     }
 
@@ -72,7 +73,7 @@ public abstract class AbstractTableStoreFactory
         return new TableStoreSink(
                 context.getObjectIdentifier(),
                 buildFileStoreTable(context),
-                createLogContext(context),
+                context,
                 createOptionalLogStoreFactory(context).orElse(null));
     }
 
@@ -83,9 +84,9 @@ public abstract class AbstractTableStoreFactory
 
     @Override
     public Set<ConfigOption<?>> optionalOptions() {
-        Set<ConfigOption<?>> options = CoreOptions.allOptions();
-        options.addAll(CoreOptions.allOptions());
-        options.addAll(TableStoreFactoryOptions.allOptions());
+        Set<ConfigOption<?>> options = new HashSet<>();
+        options.addAll(FlinkConnectorOptions.getOptions());
+        options.addAll(CoreOptions.getOptions());
         return options;
     }
 
@@ -112,44 +113,24 @@ public abstract class AbstractTableStoreFactory
     }
 
     private static void validateFileStoreContinuous(Configuration options) {
-        Configuration logOptions = new DelegatingConfiguration(options, LOG_PREFIX);
-        CoreOptions.LogChangelogMode changelogMode = logOptions.get(LOG_CHANGELOG_MODE);
-        if (changelogMode == CoreOptions.LogChangelogMode.UPSERT) {
+        LogChangelogMode changelogMode = options.get(LOG_CHANGELOG_MODE);
+        if (changelogMode == LogChangelogMode.UPSERT) {
             throw new ValidationException(
                     "File store continuous reading dose not support upsert changelog mode.");
         }
-        CoreOptions.LogConsistency consistency = logOptions.get(LOG_CONSISTENCY);
-        if (consistency == CoreOptions.LogConsistency.EVENTUAL) {
+        LogConsistency consistency = options.get(LOG_CONSISTENCY);
+        if (consistency == LogConsistency.EVENTUAL) {
             throw new ValidationException(
                     "File store continuous reading dose not support eventual consistency mode.");
         }
-        CoreOptions.LogStartupMode startupMode = logOptions.get(LOG_SCAN);
-        if (startupMode == CoreOptions.LogStartupMode.FROM_TIMESTAMP) {
+        LogStartupMode startupMode = options.get(LOG_SCAN);
+        if (startupMode == LogStartupMode.FROM_TIMESTAMP) {
             throw new ValidationException(
                     "File store continuous reading dose not support from_timestamp scan mode, "
                             + "you can add timestamp filters instead.");
         }
     }
 
-    static DynamicTableFactory.Context createLogContext(DynamicTableFactory.Context context) {
-        return createLogContext(context, context.getCatalogTable().getOptions());
-    }
-
-    static DynamicTableFactory.Context createLogContext(
-            DynamicTableFactory.Context context, Map<String, String> options) {
-        return new TableStoreDynamicContext(context, filterLogStoreOptions(options));
-    }
-
-    static Map<String, String> filterLogStoreOptions(Map<String, String> options) {
-        return options.entrySet().stream()
-                .filter(entry -> !entry.getKey().equals(LOG_SYSTEM.key())) // exclude log.system
-                .filter(entry -> entry.getKey().startsWith(LOG_PREFIX))
-                .collect(
-                        Collectors.toMap(
-                                entry -> entry.getKey().substring(LOG_PREFIX.length()),
-                                Map.Entry::getValue));
-    }
-
     static FileStoreTable buildFileStoreTable(DynamicTableFactory.Context context) {
         FileStoreTable table =
                 FileStoreTableFactory.create(
diff --git a/flink-table-store-connector/src/main/java/org/apache/flink/table/store/connector/TableStoreFactoryOptions.java b/flink-table-store-connector/src/main/java/org/apache/flink/table/store/connector/FlinkConnectorOptions.java
similarity index 71%
rename from flink-table-store-connector/src/main/java/org/apache/flink/table/store/connector/TableStoreFactoryOptions.java
rename to flink-table-store-connector/src/main/java/org/apache/flink/table/store/connector/FlinkConnectorOptions.java
index 38d6a209..e74a8e5d 100644
--- a/flink-table-store-connector/src/main/java/org/apache/flink/table/store/connector/TableStoreFactoryOptions.java
+++ b/flink-table-store-connector/src/main/java/org/apache/flink/table/store/connector/FlinkConnectorOptions.java
@@ -21,14 +21,19 @@ package org.apache.flink.table.store.connector;
 import org.apache.flink.annotation.Internal;
 import org.apache.flink.configuration.ConfigOption;
 import org.apache.flink.configuration.ConfigOptions;
+import org.apache.flink.table.catalog.ObjectIdentifier;
 import org.apache.flink.table.factories.FactoryUtil;
 
-import java.util.HashSet;
-import java.util.Set;
+import java.lang.reflect.Field;
+import java.util.ArrayList;
+import java.util.List;
 
-/** Options for {@link TableStoreManagedFactory}. */
-public class TableStoreFactoryOptions {
+/** Options for flink connector. */
+public class FlinkConnectorOptions {
 
+    public static final String TABLE_STORE_PREFIX = "table-store.";
+
+    @Internal
     public static final ConfigOption<String> ROOT_PATH =
             ConfigOptions.key("root-path")
                     .stringType()
@@ -68,11 +73,27 @@ public class TableStoreFactoryOptions {
                                     + "By default, if this option is not defined, the planner will derive the parallelism "
                                     + "for each statement individually by also considering the global configuration.");
 
-    public static Set<ConfigOption<?>> allOptions() {
-        Set<ConfigOption<?>> allOptions = new HashSet<>();
-        allOptions.add(LOG_SYSTEM);
-        allOptions.add(SINK_PARALLELISM);
-        allOptions.add(SCAN_PARALLELISM);
-        return allOptions;
+    public static String relativeTablePath(ObjectIdentifier tableIdentifier) {
+        return String.format(
+                "%s.catalog/%s.db/%s",
+                tableIdentifier.getCatalogName(),
+                tableIdentifier.getDatabaseName(),
+                tableIdentifier.getObjectName());
+    }
+
+    @Internal
+    public static List<ConfigOption<?>> getOptions() {
+        final Field[] fields = FlinkConnectorOptions.class.getFields();
+        final List<ConfigOption<?>> list = new ArrayList<>(fields.length);
+        for (Field field : fields) {
+            if (ConfigOption.class.isAssignableFrom(field.getType())) {
+                try {
+                    list.add((ConfigOption<?>) field.get(FlinkConnectorOptions.class));
+                } catch (IllegalAccessException e) {
+                    throw new RuntimeException(e);
+                }
+            }
+        }
+        return list;
     }
 }
diff --git a/flink-table-store-connector/src/main/java/org/apache/flink/table/store/connector/sink/FlinkSinkBuilder.java b/flink-table-store-connector/src/main/java/org/apache/flink/table/store/connector/sink/FlinkSinkBuilder.java
index d0d9cccb..fa3cfeb4 100644
--- a/flink-table-store-connector/src/main/java/org/apache/flink/table/store/connector/sink/FlinkSinkBuilder.java
+++ b/flink-table-store-connector/src/main/java/org/apache/flink/table/store/connector/sink/FlinkSinkBuilder.java
@@ -25,7 +25,7 @@ import org.apache.flink.streaming.api.transformations.PartitionTransformation;
 import org.apache.flink.table.catalog.ObjectIdentifier;
 import org.apache.flink.table.data.RowData;
 import org.apache.flink.table.store.CoreOptions;
-import org.apache.flink.table.store.connector.TableStoreFactoryOptions;
+import org.apache.flink.table.store.connector.FlinkConnectorOptions;
 import org.apache.flink.table.store.file.utils.JsonSerdeUtil;
 import org.apache.flink.table.store.table.FileStoreTable;
 import org.apache.flink.table.store.table.sink.LogSinkFunction;
@@ -81,7 +81,7 @@ public class FlinkSinkBuilder {
     @SuppressWarnings("unchecked")
     @Nullable
     private Map<String, String> getCompactPartSpec() {
-        String json = conf.get(TableStoreFactoryOptions.COMPACTION_PARTITION_SPEC);
+        String json = conf.get(FlinkConnectorOptions.COMPACTION_PARTITION_SPEC);
         if (json == null) {
             return null;
         }
@@ -103,7 +103,7 @@ public class FlinkSinkBuilder {
                 new StoreSink(
                         tableIdentifier,
                         table,
-                        conf.get(TableStoreFactoryOptions.COMPACTION_MANUAL_TRIGGERED),
+                        conf.get(FlinkConnectorOptions.COMPACTION_MANUAL_TRIGGERED),
                         getCompactPartSpec(),
                         lockFactory,
                         overwritePartition,
diff --git a/flink-table-store-connector/src/main/java/org/apache/flink/table/store/connector/sink/TableStoreSink.java b/flink-table-store-connector/src/main/java/org/apache/flink/table/store/connector/sink/TableStoreSink.java
index 0f4e5dac..f2fcb843 100644
--- a/flink-table-store-connector/src/main/java/org/apache/flink/table/store/connector/sink/TableStoreSink.java
+++ b/flink-table-store-connector/src/main/java/org/apache/flink/table/store/connector/sink/TableStoreSink.java
@@ -19,7 +19,6 @@
 package org.apache.flink.table.store.connector.sink;
 
 import org.apache.flink.configuration.Configuration;
-import org.apache.flink.configuration.DelegatingConfiguration;
 import org.apache.flink.streaming.api.datastream.DataStream;
 import org.apache.flink.table.api.ValidationException;
 import org.apache.flink.table.catalog.ObjectIdentifier;
@@ -28,9 +27,9 @@ import org.apache.flink.table.connector.sink.DynamicTableSink;
 import org.apache.flink.table.connector.sink.abilities.SupportsOverwrite;
 import org.apache.flink.table.connector.sink.abilities.SupportsPartitioning;
 import org.apache.flink.table.factories.DynamicTableFactory;
-import org.apache.flink.table.store.CoreOptions;
+import org.apache.flink.table.store.CoreOptions.LogChangelogMode;
+import org.apache.flink.table.store.connector.FlinkConnectorOptions;
 import org.apache.flink.table.store.connector.TableStoreDataStreamSinkProvider;
-import org.apache.flink.table.store.connector.TableStoreFactoryOptions;
 import org.apache.flink.table.store.log.LogSinkProvider;
 import org.apache.flink.table.store.log.LogStoreTableFactory;
 import org.apache.flink.table.store.table.AppendOnlyFileStoreTable;
@@ -45,12 +44,14 @@ import javax.annotation.Nullable;
 import java.util.HashMap;
 import java.util.Map;
 
+import static org.apache.flink.table.store.CoreOptions.LOG_CHANGELOG_MODE;
+
 /** Table sink to create {@link StoreSink}. */
 public class TableStoreSink implements DynamicTableSink, SupportsOverwrite, SupportsPartitioning {
 
     private final ObjectIdentifier tableIdentifier;
     private final FileStoreTable table;
-    private final DynamicTableFactory.Context logStoreContext;
+    private final DynamicTableFactory.Context context;
     @Nullable private final LogStoreTableFactory logStoreTableFactory;
 
     private Map<String, String> staticPartitions = new HashMap<>();
@@ -60,11 +61,11 @@ public class TableStoreSink implements DynamicTableSink, SupportsOverwrite, Supp
     public TableStoreSink(
             ObjectIdentifier tableIdentifier,
             FileStoreTable table,
-            DynamicTableFactory.Context logStoreContext,
+            DynamicTableFactory.Context context,
             @Nullable LogStoreTableFactory logStoreTableFactory) {
         this.tableIdentifier = tableIdentifier;
         this.table = table;
-        this.logStoreContext = logStoreContext;
+        this.context = context;
         this.logStoreTableFactory = logStoreTableFactory;
     }
 
@@ -76,12 +77,8 @@ public class TableStoreSink implements DynamicTableSink, SupportsOverwrite, Supp
             // no primary key, sink all changelogs
             return requestedMode;
         } else if (table instanceof ChangelogWithKeyFileStoreTable) {
-            Configuration logOptions =
-                    new DelegatingConfiguration(
-                            Configuration.fromMap(table.schema().options()),
-                            CoreOptions.LOG_PREFIX);
-            if (logOptions.get(CoreOptions.LOG_CHANGELOG_MODE)
-                    != CoreOptions.LogChangelogMode.ALL) {
+            Configuration options = Configuration.fromMap(table.schema().options());
+            if (options.get(LOG_CHANGELOG_MODE) != LogChangelogMode.ALL) {
                 // with primary key, default sink upsert
                 ChangelogMode.Builder builder = ChangelogMode.newBuilder();
                 for (RowKind kind : requestedMode.getContainedKinds()) {
@@ -110,13 +107,13 @@ public class TableStoreSink implements DynamicTableSink, SupportsOverwrite, Supp
     public SinkRuntimeProvider getSinkRuntimeProvider(Context context) {
         LogSinkProvider logSinkProvider = null;
         if (logStoreTableFactory != null) {
-            logSinkProvider = logStoreTableFactory.createSinkProvider(logStoreContext, context);
+            logSinkProvider = logStoreTableFactory.createSinkProvider(this.context, context);
         }
 
         Configuration conf = Configuration.fromMap(table.schema().options());
         // Do not sink to log store when overwrite mode
         final LogSinkFunction logSinkFunction =
-                overwrite || conf.get(TableStoreFactoryOptions.COMPACTION_MANUAL_TRIGGERED)
+                overwrite || conf.get(FlinkConnectorOptions.COMPACTION_MANUAL_TRIGGERED)
                         ? null
                         : (logSinkProvider == null ? null : logSinkProvider.createSink());
         return new TableStoreDataStreamSinkProvider(
@@ -129,15 +126,14 @@ public class TableStoreSink implements DynamicTableSink, SupportsOverwrite, Supp
                                 .withLockFactory(lockFactory)
                                 .withLogSinkFunction(logSinkFunction)
                                 .withOverwritePartition(overwrite ? staticPartitions : null)
-                                .withParallelism(
-                                        conf.get(TableStoreFactoryOptions.SINK_PARALLELISM))
+                                .withParallelism(conf.get(FlinkConnectorOptions.SINK_PARALLELISM))
                                 .build());
     }
 
     @Override
     public DynamicTableSink copy() {
         TableStoreSink copied =
-                new TableStoreSink(tableIdentifier, table, logStoreContext, logStoreTableFactory);
+                new TableStoreSink(tableIdentifier, table, context, logStoreTableFactory);
         copied.staticPartitions = new HashMap<>(staticPartitions);
         copied.overwrite = overwrite;
         copied.lockFactory = lockFactory;
diff --git a/flink-table-store-connector/src/main/java/org/apache/flink/table/store/connector/source/FlinkSourceBuilder.java b/flink-table-store-connector/src/main/java/org/apache/flink/table/store/connector/source/FlinkSourceBuilder.java
index 396f6d94..2bdaf7c3 100644
--- a/flink-table-store-connector/src/main/java/org/apache/flink/table/store/connector/source/FlinkSourceBuilder.java
+++ b/flink-table-store-connector/src/main/java/org/apache/flink/table/store/connector/source/FlinkSourceBuilder.java
@@ -22,7 +22,6 @@ import org.apache.flink.api.common.eventtime.WatermarkStrategy;
 import org.apache.flink.api.connector.source.Boundedness;
 import org.apache.flink.api.connector.source.Source;
 import org.apache.flink.configuration.Configuration;
-import org.apache.flink.configuration.DelegatingConfiguration;
 import org.apache.flink.connector.base.source.hybrid.HybridSource;
 import org.apache.flink.streaming.api.datastream.DataStreamSource;
 import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
@@ -30,8 +29,8 @@ import org.apache.flink.table.api.ValidationException;
 import org.apache.flink.table.catalog.ObjectIdentifier;
 import org.apache.flink.table.data.RowData;
 import org.apache.flink.table.runtime.typeutils.InternalTypeInfo;
-import org.apache.flink.table.store.CoreOptions;
-import org.apache.flink.table.store.connector.TableStoreFactoryOptions;
+import org.apache.flink.table.store.CoreOptions.LogStartupMode;
+import org.apache.flink.table.store.CoreOptions.MergeEngine;
 import org.apache.flink.table.store.file.predicate.Predicate;
 import org.apache.flink.table.store.log.LogSourceProvider;
 import org.apache.flink.table.store.table.FileStoreTable;
@@ -43,6 +42,11 @@ import javax.annotation.Nullable;
 
 import java.util.Optional;
 
+import static org.apache.flink.table.store.CoreOptions.CONTINUOUS_DISCOVERY_INTERVAL;
+import static org.apache.flink.table.store.CoreOptions.LOG_SCAN;
+import static org.apache.flink.table.store.CoreOptions.MERGE_ENGINE;
+import static org.apache.flink.table.store.connector.FlinkConnectorOptions.COMPACTION_MANUAL_TRIGGERED;
+
 /** Source builder to build a Flink {@link Source}. */
 public class FlinkSourceBuilder {
 
@@ -94,7 +98,7 @@ public class FlinkSourceBuilder {
     }
 
     private long discoveryIntervalMills() {
-        return conf.get(CoreOptions.CONTINUOUS_DISCOVERY_INTERVAL).toMillis();
+        return conf.get(CONTINUOUS_DISCOVERY_INTERVAL).toMillis();
     }
 
     private FileStoreSource buildFileSource(boolean isContinuous, boolean continuousScanLatest) {
@@ -111,19 +115,16 @@ public class FlinkSourceBuilder {
         if (isContinuous) {
             // TODO move validation to a dedicated method
             if (table.schema().primaryKeys().size() > 0
-                    && conf.get(CoreOptions.MERGE_ENGINE)
-                            == CoreOptions.MergeEngine.PARTIAL_UPDATE) {
+                    && conf.get(MERGE_ENGINE) == MergeEngine.PARTIAL_UPDATE) {
                 throw new ValidationException(
                         "Partial update continuous reading is not supported.");
             }
 
-            CoreOptions.LogStartupMode startupMode =
-                    new DelegatingConfiguration(conf, CoreOptions.LOG_PREFIX)
-                            .get(CoreOptions.LOG_SCAN);
+            LogStartupMode startupMode = conf.get(LOG_SCAN);
             if (logSourceProvider == null) {
-                return buildFileSource(true, startupMode == CoreOptions.LogStartupMode.LATEST);
+                return buildFileSource(true, startupMode == LogStartupMode.LATEST);
             } else {
-                if (startupMode != CoreOptions.LogStartupMode.FULL) {
+                if (startupMode != LogStartupMode.FULL) {
                     return logSourceProvider.createSource(null);
                 }
                 return HybridSource.<RowData, StaticFileStoreSplitEnumerator>builder(
@@ -151,7 +152,7 @@ public class FlinkSourceBuilder {
                         .orElse(rowType);
         DataStreamSource<RowData> dataStream =
                 env.fromSource(
-                        conf.get(TableStoreFactoryOptions.COMPACTION_MANUAL_TRIGGERED)
+                        conf.get(COMPACTION_MANUAL_TRIGGERED)
                                 ? new FileStoreEmptySource()
                                 : buildSource(),
                         WatermarkStrategy.noWatermarks(),
diff --git a/flink-table-store-connector/src/main/java/org/apache/flink/table/store/connector/source/TableStoreSource.java b/flink-table-store-connector/src/main/java/org/apache/flink/table/store/connector/source/TableStoreSource.java
index 880c6a9c..c5545658 100644
--- a/flink-table-store-connector/src/main/java/org/apache/flink/table/store/connector/source/TableStoreSource.java
+++ b/flink-table-store-connector/src/main/java/org/apache/flink/table/store/connector/source/TableStoreSource.java
@@ -19,7 +19,6 @@
 package org.apache.flink.table.store.connector.source;
 
 import org.apache.flink.configuration.Configuration;
-import org.apache.flink.configuration.DelegatingConfiguration;
 import org.apache.flink.table.catalog.ObjectIdentifier;
 import org.apache.flink.table.connector.ChangelogMode;
 import org.apache.flink.table.connector.source.DynamicTableSource;
@@ -28,9 +27,10 @@ import org.apache.flink.table.connector.source.abilities.SupportsFilterPushDown;
 import org.apache.flink.table.connector.source.abilities.SupportsProjectionPushDown;
 import org.apache.flink.table.expressions.ResolvedExpression;
 import org.apache.flink.table.factories.DynamicTableFactory;
-import org.apache.flink.table.store.CoreOptions;
+import org.apache.flink.table.store.CoreOptions.LogChangelogMode;
+import org.apache.flink.table.store.CoreOptions.LogConsistency;
+import org.apache.flink.table.store.connector.FlinkConnectorOptions;
 import org.apache.flink.table.store.connector.TableStoreDataStreamScanProvider;
-import org.apache.flink.table.store.connector.TableStoreFactoryOptions;
 import org.apache.flink.table.store.file.predicate.Predicate;
 import org.apache.flink.table.store.file.predicate.PredicateBuilder;
 import org.apache.flink.table.store.file.predicate.PredicateConverter;
@@ -47,6 +47,9 @@ import javax.annotation.Nullable;
 import java.util.ArrayList;
 import java.util.List;
 
+import static org.apache.flink.table.store.CoreOptions.LOG_CHANGELOG_MODE;
+import static org.apache.flink.table.store.CoreOptions.LOG_CONSISTENCY;
+
 /**
  * Table source to create {@link FileStoreSource} under batch mode or change-tracking is disabled.
  * For streaming mode with change-tracking enabled and FULL scan mode, it will create a {@link
@@ -59,7 +62,7 @@ public class TableStoreSource
     private final ObjectIdentifier tableIdentifier;
     private final FileStoreTable table;
     private final boolean streaming;
-    private final DynamicTableFactory.Context logStoreContext;
+    private final DynamicTableFactory.Context context;
     @Nullable private final LogStoreTableFactory logStoreTableFactory;
 
     @Nullable private Predicate predicate;
@@ -69,12 +72,12 @@ public class TableStoreSource
             ObjectIdentifier tableIdentifier,
             FileStoreTable table,
             boolean streaming,
-            DynamicTableFactory.Context logStoreContext,
+            DynamicTableFactory.Context context,
             @Nullable LogStoreTableFactory logStoreTableFactory) {
         this.tableIdentifier = tableIdentifier;
         this.table = table;
         this.streaming = streaming;
-        this.logStoreContext = logStoreContext;
+        this.context = context;
         this.logStoreTableFactory = logStoreTableFactory;
     }
 
@@ -92,14 +95,9 @@ public class TableStoreSource
         } else if (table instanceof ChangelogWithKeyFileStoreTable) {
             // optimization: transaction consistency and all changelog mode avoid the generation of
             // normalized nodes. See TableStoreSink.getChangelogMode validation.
-            Configuration logOptions =
-                    new DelegatingConfiguration(
-                            Configuration.fromMap(table.schema().options()),
-                            CoreOptions.LOG_PREFIX);
-            return logOptions.get(CoreOptions.LOG_CONSISTENCY)
-                                    == CoreOptions.LogConsistency.TRANSACTIONAL
-                            && logOptions.get(CoreOptions.LOG_CHANGELOG_MODE)
-                                    == CoreOptions.LogChangelogMode.ALL
+            Configuration options = Configuration.fromMap(table.schema().options());
+            return options.get(LOG_CONSISTENCY) == LogConsistency.TRANSACTIONAL
+                            && options.get(LOG_CHANGELOG_MODE) == LogChangelogMode.ALL
                     ? ChangelogMode.all()
                     : ChangelogMode.upsert();
         } else {
@@ -113,8 +111,7 @@ public class TableStoreSource
         LogSourceProvider logSourceProvider = null;
         if (logStoreTableFactory != null) {
             logSourceProvider =
-                    logStoreTableFactory.createSourceProvider(
-                            logStoreContext, scanContext, projectFields);
+                    logStoreTableFactory.createSourceProvider(context, scanContext, projectFields);
         }
 
         FlinkSourceBuilder sourceBuilder =
@@ -125,7 +122,7 @@ public class TableStoreSource
                         .withPredicate(predicate)
                         .withParallelism(
                                 Configuration.fromMap(table.schema().options())
-                                        .get(TableStoreFactoryOptions.SCAN_PARALLELISM));
+                                        .get(FlinkConnectorOptions.SCAN_PARALLELISM));
 
         return new TableStoreDataStreamScanProvider(
                 !streaming, env -> sourceBuilder.withEnv(env).build());
@@ -135,7 +132,7 @@ public class TableStoreSource
     public DynamicTableSource copy() {
         TableStoreSource copied =
                 new TableStoreSource(
-                        tableIdentifier, table, streaming, logStoreContext, logStoreTableFactory);
+                        tableIdentifier, table, streaming, context, logStoreTableFactory);
         copied.predicate = predicate;
         copied.projectFields = projectFields;
         return copied;
diff --git a/flink-table-store-connector/src/test/java/org/apache/flink/table/store/connector/ComputedColumnAndWatermarkTableITCase.java b/flink-table-store-connector/src/test/java/org/apache/flink/table/store/connector/ComputedColumnAndWatermarkTableITCase.java
index be277d35..febc784d 100644
--- a/flink-table-store-connector/src/test/java/org/apache/flink/table/store/connector/ComputedColumnAndWatermarkTableITCase.java
+++ b/flink-table-store-connector/src/test/java/org/apache/flink/table/store/connector/ComputedColumnAndWatermarkTableITCase.java
@@ -19,7 +19,7 @@
 package org.apache.flink.table.store.connector;
 
 import org.apache.flink.api.java.tuple.Tuple2;
-import org.apache.flink.table.store.CoreOptions;
+import org.apache.flink.table.store.CoreOptions.LogStartupMode;
 
 import org.junit.Test;
 
@@ -29,7 +29,7 @@ import java.util.Collections;
 import java.util.stream.Collectors;
 
 import static org.apache.flink.table.planner.factories.TestValuesTableFactory.changelogRow;
-import static org.apache.flink.table.store.CoreOptions.LOG_PREFIX;
+import static org.apache.flink.table.store.CoreOptions.LOG_SCAN;
 import static org.apache.flink.table.store.connector.ReadWriteTableTestUtil.rates;
 import static org.apache.flink.table.store.connector.ReadWriteTableTestUtil.ratesWithTimestamp;
 
@@ -146,8 +146,7 @@ public class ComputedColumnAndWatermarkTableITCase extends ReadWriteTableTestBas
                         null,
                         false,
                         Collections.singletonMap(
-                                LOG_PREFIX + CoreOptions.LOG_SCAN.key(),
-                                CoreOptions.LogStartupMode.LATEST.name().toLowerCase()),
+                                LOG_SCAN.key(), LogStartupMode.LATEST.name().toLowerCase()),
                         "rate_by_to_currency IS NULL",
                         Arrays.asList(
                                 "corrected_rate_by_to_currency",
@@ -191,8 +190,7 @@ public class ComputedColumnAndWatermarkTableITCase extends ReadWriteTableTestBas
                         WatermarkSpec.of("ts", "ts - INTERVAL '3' YEAR"),
                         false,
                         Collections.singletonMap(
-                                LOG_PREFIX + CoreOptions.LOG_SCAN.key(),
-                                CoreOptions.LogStartupMode.LATEST.name().toLowerCase()),
+                                LOG_SCAN.key(), LogStartupMode.LATEST.name().toLowerCase()),
                         lateEventFilter,
                         Collections.emptyList(), // projection
                         Collections.singletonList(
@@ -215,8 +213,7 @@ public class ComputedColumnAndWatermarkTableITCase extends ReadWriteTableTestBas
                         WatermarkSpec.of("ts1", "ts1 - INTERVAL '3' YEAR"),
                         false,
                         Collections.singletonMap(
-                                LOG_PREFIX + CoreOptions.LOG_SCAN.key(),
-                                CoreOptions.LogStartupMode.LATEST.name().toLowerCase()),
+                                LOG_SCAN.key(), LogStartupMode.LATEST.name().toLowerCase()),
                         lateEventFilter.replaceAll("ts", "ts1"),
                         Arrays.asList("currency", "rate", "ts1"),
                         Collections.singletonList(
@@ -240,8 +237,7 @@ public class ComputedColumnAndWatermarkTableITCase extends ReadWriteTableTestBas
                         WatermarkSpec.of("ts", "ts - INTERVAL '3' YEAR"),
                         false,
                         Collections.singletonMap(
-                                LOG_PREFIX + CoreOptions.LOG_SCAN.key(),
-                                CoreOptions.LogStartupMode.LATEST.name().toLowerCase()),
+                                LOG_SCAN.key(), LogStartupMode.LATEST.name().toLowerCase()),
                         lateEventFilter,
                         Arrays.asList(
                                 "currency",
diff --git a/flink-table-store-connector/src/test/java/org/apache/flink/table/store/connector/CreateTableITCase.java b/flink-table-store-connector/src/test/java/org/apache/flink/table/store/connector/CreateTableITCase.java
index 9e3be19f..68703685 100644
--- a/flink-table-store-connector/src/test/java/org/apache/flink/table/store/connector/CreateTableITCase.java
+++ b/flink-table-store-connector/src/test/java/org/apache/flink/table/store/connector/CreateTableITCase.java
@@ -26,7 +26,6 @@ import org.apache.flink.table.catalog.ResolvedCatalogTable;
 import org.apache.flink.table.catalog.exceptions.DatabaseNotExistException;
 import org.apache.flink.table.catalog.exceptions.TableAlreadyExistException;
 import org.apache.flink.table.catalog.exceptions.TableNotExistException;
-import org.apache.flink.table.store.CoreOptions;
 import org.apache.flink.table.types.logical.IntType;
 import org.apache.flink.table.types.logical.RowType;
 import org.apache.flink.table.types.logical.VarCharType;
@@ -42,6 +41,7 @@ import java.util.List;
 import java.util.UUID;
 
 import static org.apache.flink.table.store.CoreOptions.BUCKET;
+import static org.apache.flink.table.store.connector.FlinkConnectorOptions.relativeTablePath;
 import static org.assertj.core.api.Assertions.assertThat;
 import static org.assertj.core.api.Assertions.assertThatThrownBy;
 
@@ -79,8 +79,7 @@ public class CreateTableITCase extends TableStoreTestBase {
             assertThat(((TableEnvironmentImpl) tEnv).getCatalogManager().getTable(tableIdentifier))
                     .isPresent();
             // check table store
-            assertThat(Paths.get(rootPath, CoreOptions.relativeTablePath(tableIdentifier)).toFile())
-                    .exists();
+            assertThat(Paths.get(rootPath, relativeTablePath(tableIdentifier)).toFile()).exists();
             // check log store
             assertThat(topicExists(tableIdentifier.asSummaryString())).isEqualTo(enableLogStore);
         } else {
@@ -129,7 +128,7 @@ public class CreateTableITCase extends TableStoreTestBase {
             }
         } else if (expectedResult.expectedMessage.startsWith("Failed to create file store path.")) {
             // failed when creating file store
-            Paths.get(rootPath, CoreOptions.relativeTablePath(tableIdentifier)).toFile().mkdirs();
+            Paths.get(rootPath, relativeTablePath(tableIdentifier)).toFile().mkdirs();
         } else if (expectedResult.expectedMessage.startsWith("Failed to create kafka topic.")) {
             // failed when creating log store
             createTopicIfNotExists(tableIdentifier.asSummaryString(), BUCKET.defaultValue());
diff --git a/flink-table-store-connector/src/test/java/org/apache/flink/table/store/connector/DropTableITCase.java b/flink-table-store-connector/src/test/java/org/apache/flink/table/store/connector/DropTableITCase.java
index e7278976..177d16c4 100644
--- a/flink-table-store-connector/src/test/java/org/apache/flink/table/store/connector/DropTableITCase.java
+++ b/flink-table-store-connector/src/test/java/org/apache/flink/table/store/connector/DropTableITCase.java
@@ -25,7 +25,6 @@ import org.apache.flink.table.api.internal.TableEnvironmentImpl;
 import org.apache.flink.table.catalog.ObjectIdentifier;
 import org.apache.flink.table.catalog.ResolvedCatalogTable;
 import org.apache.flink.table.catalog.exceptions.TableNotExistException;
-import org.apache.flink.table.store.CoreOptions;
 import org.apache.flink.table.types.logical.IntType;
 import org.apache.flink.table.types.logical.RowType;
 import org.apache.flink.table.types.logical.VarCharType;
@@ -40,6 +39,7 @@ import java.util.Collections;
 import java.util.List;
 import java.util.UUID;
 
+import static org.apache.flink.table.store.connector.FlinkConnectorOptions.relativeTablePath;
 import static org.assertj.core.api.Assertions.assertThat;
 import static org.assertj.core.api.Assertions.assertThatThrownBy;
 
@@ -80,7 +80,7 @@ public class DropTableITCase extends TableStoreTestBase {
             assertThat(((TableEnvironmentImpl) tEnv).getCatalogManager().getTable(tableIdentifier))
                     .isNotPresent();
             // check table store
-            assertThat(Paths.get(rootPath, CoreOptions.relativeTablePath(tableIdentifier)).toFile())
+            assertThat(Paths.get(rootPath, relativeTablePath(tableIdentifier)).toFile())
                     .doesNotExist();
             // check log store
             assertThat(topicExists(tableIdentifier.asSummaryString())).isFalse();
diff --git a/flink-table-store-connector/src/test/java/org/apache/flink/table/store/connector/FileStoreTableITCase.java b/flink-table-store-connector/src/test/java/org/apache/flink/table/store/connector/FileStoreTableITCase.java
index 4e313402..754d1354 100644
--- a/flink-table-store-connector/src/test/java/org/apache/flink/table/store/connector/FileStoreTableITCase.java
+++ b/flink-table-store-connector/src/test/java/org/apache/flink/table/store/connector/FileStoreTableITCase.java
@@ -51,9 +51,9 @@ import java.time.Duration;
 import java.util.List;
 
 import static org.apache.flink.streaming.api.environment.ExecutionCheckpointingOptions.CHECKPOINTING_INTERVAL;
-import static org.apache.flink.table.store.CoreOptions.TABLE_STORE_PREFIX;
-import static org.apache.flink.table.store.CoreOptions.relativeTablePath;
-import static org.apache.flink.table.store.connector.TableStoreFactoryOptions.ROOT_PATH;
+import static org.apache.flink.table.store.connector.FlinkConnectorOptions.ROOT_PATH;
+import static org.apache.flink.table.store.connector.FlinkConnectorOptions.TABLE_STORE_PREFIX;
+import static org.apache.flink.table.store.connector.FlinkConnectorOptions.relativeTablePath;
 import static org.junit.jupiter.api.Assertions.fail;
 
 /** ITCase for file store table api. */
diff --git a/flink-table-store-connector/src/test/java/org/apache/flink/table/store/connector/FileSystemCatalogITCase.java b/flink-table-store-connector/src/test/java/org/apache/flink/table/store/connector/FileSystemCatalogITCase.java
index 01e84211..69ee10c2 100644
--- a/flink-table-store-connector/src/test/java/org/apache/flink/table/store/connector/FileSystemCatalogITCase.java
+++ b/flink-table-store-connector/src/test/java/org/apache/flink/table/store/connector/FileSystemCatalogITCase.java
@@ -60,8 +60,8 @@ public class FileSystemCatalogITCase extends KafkaTableTestBase {
                     String.format(
                             "CREATE TABLE T (a STRING, b STRING, c STRING) WITH ("
                                     + "'log.system'='kafka', "
-                                    + "'log.kafka.bootstrap.servers'='%s',"
-                                    + "'log.topic'='%s'"
+                                    + "'kafka.bootstrap.servers'='%s',"
+                                    + "'kafka.topic'='%s'"
                                     + ")",
                             getBootstrapServers(), topic));
             innerTestWriteRead();
@@ -85,8 +85,8 @@ public class FileSystemCatalogITCase extends KafkaTableTestBase {
                                     + "d AS CAST(c as INT) + 1"
                                     + ") WITH ("
                                     + "'log.system'='kafka', "
-                                    + "'log.kafka.bootstrap.servers'='%s',"
-                                    + "'log.topic'='%s'"
+                                    + "'kafka.bootstrap.servers'='%s',"
+                                    + "'kafka.topic'='%s'"
                                     + ")",
                             getBootstrapServers(), topic));
             BlockingIterator<Row, Row> iterator =
diff --git a/flink-table-store-connector/src/test/java/org/apache/flink/table/store/connector/ReadWriteTableITCase.java b/flink-table-store-connector/src/test/java/org/apache/flink/table/store/connector/ReadWriteTableITCase.java
index 10adddfb..0f4d5ea5 100644
--- a/flink-table-store-connector/src/test/java/org/apache/flink/table/store/connector/ReadWriteTableITCase.java
+++ b/flink-table-store-connector/src/test/java/org/apache/flink/table/store/connector/ReadWriteTableITCase.java
@@ -58,14 +58,14 @@ import java.util.concurrent.TimeUnit;
 
 import static org.apache.flink.table.planner.factories.TestValuesTableFactory.changelogRow;
 import static org.apache.flink.table.planner.factories.TestValuesTableFactory.registerData;
+import static org.apache.flink.table.store.connector.FlinkConnectorOptions.ROOT_PATH;
+import static org.apache.flink.table.store.connector.FlinkConnectorOptions.SCAN_PARALLELISM;
+import static org.apache.flink.table.store.connector.FlinkConnectorOptions.SINK_PARALLELISM;
 import static org.apache.flink.table.store.connector.ReadWriteTableTestUtil.dailyRates;
 import static org.apache.flink.table.store.connector.ReadWriteTableTestUtil.dailyRatesChangelogWithUB;
 import static org.apache.flink.table.store.connector.ReadWriteTableTestUtil.dailyRatesChangelogWithoutUB;
 import static org.apache.flink.table.store.connector.ReadWriteTableTestUtil.rates;
 import static org.apache.flink.table.store.connector.ShowCreateUtil.buildSimpleSelectQuery;
-import static org.apache.flink.table.store.connector.TableStoreFactoryOptions.ROOT_PATH;
-import static org.apache.flink.table.store.connector.TableStoreFactoryOptions.SCAN_PARALLELISM;
-import static org.apache.flink.table.store.connector.TableStoreFactoryOptions.SINK_PARALLELISM;
 import static org.apache.flink.table.store.connector.TableStoreTestBase.createResolvedTable;
 import static org.assertj.core.api.Assertions.assertThat;
 import static org.assertj.core.api.Assertions.assertThatThrownBy;
diff --git a/flink-table-store-connector/src/test/java/org/apache/flink/table/store/connector/ReadWriteTableTestBase.java b/flink-table-store-connector/src/test/java/org/apache/flink/table/store/connector/ReadWriteTableTestBase.java
index 33d28349..3c9fcb76 100644
--- a/flink-table-store-connector/src/test/java/org/apache/flink/table/store/connector/ReadWriteTableTestBase.java
+++ b/flink-table-store-connector/src/test/java/org/apache/flink/table/store/connector/ReadWriteTableTestBase.java
@@ -30,7 +30,7 @@ import org.apache.flink.table.catalog.CatalogBaseTable;
 import org.apache.flink.table.catalog.ObjectIdentifier;
 import org.apache.flink.table.catalog.ObjectPath;
 import org.apache.flink.table.catalog.ResolvedCatalogTable;
-import org.apache.flink.table.store.CoreOptions;
+import org.apache.flink.table.store.CoreOptions.LogStartupMode;
 import org.apache.flink.table.store.file.utils.BlockingIterator;
 import org.apache.flink.table.store.kafka.KafkaTableTestBase;
 import org.apache.flink.types.Row;
@@ -49,7 +49,11 @@ import java.util.concurrent.TimeUnit;
 import java.util.concurrent.TimeoutException;
 import java.util.stream.Collectors;
 
-import static org.apache.flink.table.store.CoreOptions.LOG_PREFIX;
+import static org.apache.flink.table.store.CoreOptions.LOG_SCAN;
+import static org.apache.flink.table.store.CoreOptions.LOG_SCAN_TIMESTAMP_MILLS;
+import static org.apache.flink.table.store.connector.FlinkConnectorOptions.LOG_SYSTEM;
+import static org.apache.flink.table.store.connector.FlinkConnectorOptions.ROOT_PATH;
+import static org.apache.flink.table.store.connector.FlinkConnectorOptions.relativeTablePath;
 import static org.apache.flink.table.store.connector.ReadWriteTableTestUtil.prepareHelperSourceWithChangelogRecords;
 import static org.apache.flink.table.store.connector.ReadWriteTableTestUtil.prepareHelperSourceWithInsertOnlyRecords;
 import static org.apache.flink.table.store.connector.ShowCreateUtil.buildInsertIntoQuery;
@@ -57,8 +61,6 @@ import static org.apache.flink.table.store.connector.ShowCreateUtil.buildInsertO
 import static org.apache.flink.table.store.connector.ShowCreateUtil.buildSelectQuery;
 import static org.apache.flink.table.store.connector.ShowCreateUtil.buildSimpleSelectQuery;
 import static org.apache.flink.table.store.connector.ShowCreateUtil.createTableLikeDDL;
-import static org.apache.flink.table.store.connector.TableStoreFactoryOptions.LOG_SYSTEM;
-import static org.apache.flink.table.store.connector.TableStoreFactoryOptions.ROOT_PATH;
 import static org.apache.flink.table.store.kafka.KafkaLogOptions.BOOTSTRAP_SERVERS;
 import static org.assertj.core.api.Assertions.assertThat;
 
@@ -71,7 +73,7 @@ public class ReadWriteTableTestBase extends KafkaTableTestBase {
     protected void checkFileStorePath(
             StreamTableEnvironment tEnv, String managedTable, @Nullable String partitionList) {
         String relativeFilePath =
-                CoreOptions.relativeTablePath(
+                relativeTablePath(
                         ObjectIdentifier.of(
                                 tEnv.getCurrentCatalog(), tEnv.getCurrentDatabase(), managedTable));
         // check snapshot file path
@@ -230,9 +232,7 @@ public class ReadWriteTableTestBase extends KafkaTableTestBase {
             List<Row> expected)
             throws Exception {
         Map<String, String> hints = new HashMap<>();
-        hints.put(
-                LOG_PREFIX + CoreOptions.LOG_SCAN.key(),
-                CoreOptions.LogStartupMode.LATEST.name().toLowerCase());
+        hints.put(LOG_SCAN.key(), LogStartupMode.LATEST.name().toLowerCase());
         collectAndCheckUnderSameEnv(
                         true,
                         true,
@@ -258,9 +258,8 @@ public class ReadWriteTableTestBase extends KafkaTableTestBase {
             List<Row> expected)
             throws Exception {
         Map<String, String> hints = new HashMap<>();
-        hints.put(LOG_PREFIX + CoreOptions.LOG_SCAN.key(), "from-timestamp");
-        hints.put(
-                LOG_PREFIX + CoreOptions.LOG_SCAN_TIMESTAMP_MILLS.key(), String.valueOf(timestamp));
+        hints.put(LOG_SCAN.key(), "from-timestamp");
+        hints.put(LOG_SCAN_TIMESTAMP_MILLS.key(), String.valueOf(timestamp));
         collectAndCheckUnderSameEnv(
                         true,
                         true,
@@ -292,7 +291,7 @@ public class ReadWriteTableTestBase extends KafkaTableTestBase {
         tableOptions.put(ROOT_PATH.key(), rootPath);
         if (enableLogStore) {
             tableOptions.put(LOG_SYSTEM.key(), "kafka");
-            tableOptions.put(LOG_PREFIX + BOOTSTRAP_SERVERS.key(), getBootstrapServers());
+            tableOptions.put(BOOTSTRAP_SERVERS.key(), getBootstrapServers());
         }
         String sourceTable = "source_table_" + UUID.randomUUID();
         String managedTable = "managed_table_" + UUID.randomUUID();
diff --git a/flink-table-store-connector/src/test/java/org/apache/flink/table/store/connector/StreamingWarehouseITCase.java b/flink-table-store-connector/src/test/java/org/apache/flink/table/store/connector/StreamingWarehouseITCase.java
index 598fb75f..15424e02 100644
--- a/flink-table-store-connector/src/test/java/org/apache/flink/table/store/connector/StreamingWarehouseITCase.java
+++ b/flink-table-store-connector/src/test/java/org/apache/flink/table/store/connector/StreamingWarehouseITCase.java
@@ -90,7 +90,7 @@ public class StreamingWarehouseITCase extends ReadWriteTableTestBase {
                                 + "WITH (\n"
                                 + "    'path' = '%s',\n"
                                 + "    'log.system' = 'kafka', "
-                                + "    'log.kafka.bootstrap.servers' = '%s');",
+                                + "    'kafka.bootstrap.servers' = '%s');",
                         rootPath, getBootstrapServers());
         streamTableEnv.executeSql(orderSource);
         streamTableEnv.executeSql(cleanedOrders);
diff --git a/flink-table-store-connector/src/test/java/org/apache/flink/table/store/connector/TableStoreManagedFactoryTest.java b/flink-table-store-connector/src/test/java/org/apache/flink/table/store/connector/TableStoreManagedFactoryTest.java
index 31832f59..843b99f5 100644
--- a/flink-table-store-connector/src/test/java/org/apache/flink/table/store/connector/TableStoreManagedFactoryTest.java
+++ b/flink-table-store-connector/src/test/java/org/apache/flink/table/store/connector/TableStoreManagedFactoryTest.java
@@ -57,13 +57,13 @@ import static java.util.Collections.singletonList;
 import static java.util.Collections.singletonMap;
 import static org.apache.flink.table.store.CoreOptions.BUCKET;
 import static org.apache.flink.table.store.CoreOptions.LOG_CONSISTENCY;
-import static org.apache.flink.table.store.CoreOptions.LOG_PREFIX;
 import static org.apache.flink.table.store.CoreOptions.PATH;
-import static org.apache.flink.table.store.CoreOptions.TABLE_STORE_PREFIX;
 import static org.apache.flink.table.store.CoreOptions.path;
-import static org.apache.flink.table.store.connector.TableStoreFactoryOptions.COMPACTION_MANUAL_TRIGGERED;
-import static org.apache.flink.table.store.connector.TableStoreFactoryOptions.COMPACTION_PARTITION_SPEC;
-import static org.apache.flink.table.store.connector.TableStoreFactoryOptions.ROOT_PATH;
+import static org.apache.flink.table.store.connector.FlinkConnectorOptions.COMPACTION_MANUAL_TRIGGERED;
+import static org.apache.flink.table.store.connector.FlinkConnectorOptions.COMPACTION_PARTITION_SPEC;
+import static org.apache.flink.table.store.connector.FlinkConnectorOptions.ROOT_PATH;
+import static org.apache.flink.table.store.connector.FlinkConnectorOptions.TABLE_STORE_PREFIX;
+import static org.apache.flink.table.store.connector.FlinkConnectorOptions.relativeTablePath;
 import static org.apache.flink.table.store.connector.TableStoreTestBase.createResolvedTable;
 import static org.apache.flink.table.store.file.TestKeyValueGenerator.DEFAULT_PART_TYPE;
 import static org.apache.flink.table.store.file.TestKeyValueGenerator.DEFAULT_ROW_TYPE;
@@ -122,20 +122,20 @@ public class TableStoreManagedFactoryTest {
         Map<String, String> sessionMap = new HashMap<>();
         sessionMap.put("table-store.root-path", "my_path");
         sessionMap.put("table-store.log.system", "kafka");
-        sessionMap.put("table-store.log.topic", "my_topic");
+        sessionMap.put("table-store.kafka.topic", "my_topic");
         context = createNonEnrichedContext(sessionMap, emptyMap());
         assertThatThrownBy(() -> tableStoreManagedFactory.enrichOptions(context))
                 .hasMessage(
                         "Managed table can not contain custom topic. You need to remove topic in table options or session config.");
 
-        sessionMap.remove("table-store.log.topic");
+        sessionMap.remove("table-store.kafka.topic");
         context = createNonEnrichedContext(sessionMap, emptyMap());
         Map<String, String> enriched = tableStoreManagedFactory.enrichOptions(context);
 
         Map<String, String> expected = new HashMap<>();
         expected.put("path", "my_path/catalog.catalog/database.db/table");
         expected.put("log.system", "kafka");
-        expected.put("log.topic", "catalog.database.table");
+        expected.put("kafka.topic", "catalog.database.table");
         assertThat(enriched).containsExactlyEntriesOf(expected);
     }
 
@@ -188,27 +188,6 @@ public class TableStoreManagedFactoryTest {
         }
     }
 
-    @Test
-    public void testFilterLogStoreOptions() {
-        // mix invalid key and leave value to empty to emphasize the deferred validation
-        Map<String, String> expectedLogOptions =
-                of(
-                        CoreOptions.LOG_SCAN.key(),
-                        "",
-                        CoreOptions.LOG_RETENTION.key(),
-                        "",
-                        "dummy.key",
-                        "",
-                        CoreOptions.LOG_CHANGELOG_MODE.key(),
-                        "");
-        Map<String, String> enrichedOptions =
-                addPrefix(expectedLogOptions, LOG_PREFIX, (key) -> true);
-        enrichedOptions.put("foo", "bar");
-
-        assertThat(TableStoreManagedFactory.filterLogStoreOptions(enrichedOptions))
-                .containsExactlyInAnyOrderEntriesOf(expectedLogOptions);
-    }
-
     @ParameterizedTest
     @MethodSource("provideResolvedTable")
     public void testCreateAndCheckTableStore(
@@ -322,9 +301,9 @@ public class TableStoreManagedFactoryTest {
                         BUCKET.defaultValue().toString(),
                         ROOT_PATH.key(),
                         sharedTempDir.toString(),
-                        LOG_PREFIX + BOOTSTRAP_SERVERS.key(),
+                        BOOTSTRAP_SERVERS.key(),
                         "localhost:9092",
-                        LOG_PREFIX + LOG_CONSISTENCY.key(),
+                        LOG_CONSISTENCY.key(),
                         LOG_CONSISTENCY.defaultValue().name());
 
         // set configuration under session level
@@ -365,8 +344,7 @@ public class TableStoreManagedFactoryTest {
         Map<String, String> expected = new HashMap<>(enrichedOptions);
         String rootPath = expected.remove(ROOT_PATH.key());
         if (rootPath != null) {
-            String path =
-                    rootPath + "/" + TableStoreManagedFactory.relativeTablePath(TABLE_IDENTIFIER);
+            String path = rootPath + "/" + relativeTablePath(TABLE_IDENTIFIER);
             expected.put(PATH.key(), path);
         }
         return expected;
diff --git a/flink-table-store-connector/src/test/java/org/apache/flink/table/store/connector/TableStoreTestBase.java b/flink-table-store-connector/src/test/java/org/apache/flink/table/store/connector/TableStoreTestBase.java
index 1f321af9..b3284952 100644
--- a/flink-table-store-connector/src/test/java/org/apache/flink/table/store/connector/TableStoreTestBase.java
+++ b/flink-table-store-connector/src/test/java/org/apache/flink/table/store/connector/TableStoreTestBase.java
@@ -31,7 +31,6 @@ import org.apache.flink.table.catalog.ObjectIdentifier;
 import org.apache.flink.table.catalog.ResolvedCatalogTable;
 import org.apache.flink.table.catalog.ResolvedSchema;
 import org.apache.flink.table.catalog.UniqueConstraint;
-import org.apache.flink.table.store.CoreOptions;
 import org.apache.flink.table.store.kafka.KafkaTableTestBase;
 import org.apache.flink.table.types.DataType;
 import org.apache.flink.table.types.logical.RowType;
@@ -50,10 +49,10 @@ import java.util.Map;
 import java.util.stream.Collectors;
 import java.util.stream.IntStream;
 
-import static org.apache.flink.table.store.CoreOptions.LOG_PREFIX;
-import static org.apache.flink.table.store.CoreOptions.TABLE_STORE_PREFIX;
-import static org.apache.flink.table.store.connector.TableStoreFactoryOptions.LOG_SYSTEM;
-import static org.apache.flink.table.store.connector.TableStoreFactoryOptions.ROOT_PATH;
+import static org.apache.flink.table.store.connector.FlinkConnectorOptions.LOG_SYSTEM;
+import static org.apache.flink.table.store.connector.FlinkConnectorOptions.ROOT_PATH;
+import static org.apache.flink.table.store.connector.FlinkConnectorOptions.TABLE_STORE_PREFIX;
+import static org.apache.flink.table.store.connector.FlinkConnectorOptions.relativeTablePath;
 import static org.apache.flink.table.store.kafka.KafkaLogOptions.BOOTSTRAP_SERVERS;
 
 /** End-to-end test base for table store. */
@@ -111,7 +110,7 @@ public abstract class TableStoreTestBase extends KafkaTableTestBase {
         Configuration configuration = tEnv.getConfig().getConfiguration();
         configuration.setString(TABLE_STORE_PREFIX + ROOT_PATH.key(), rootPath);
         configuration.setString(
-                TABLE_STORE_PREFIX + LOG_PREFIX + BOOTSTRAP_SERVERS.key(), getBootstrapServers());
+                TABLE_STORE_PREFIX + BOOTSTRAP_SERVERS.key(), getBootstrapServers());
         if (enableLogStore) {
             configuration.setString(TABLE_STORE_PREFIX + LOG_SYSTEM.key(), "kafka");
         }
@@ -153,8 +152,7 @@ public abstract class TableStoreTestBase extends KafkaTableTestBase {
     }
 
     protected void deleteTablePath() {
-        FileUtils.deleteQuietly(
-                Paths.get(rootPath, CoreOptions.relativeTablePath(tableIdentifier)).toFile());
+        FileUtils.deleteQuietly(Paths.get(rootPath, relativeTablePath(tableIdentifier)).toFile());
     }
 
     /** Expected result wrapper. */
diff --git a/flink-table-store-connector/src/test/java/org/apache/flink/table/store/connector/sink/LogStoreSinkITCase.java b/flink-table-store-connector/src/test/java/org/apache/flink/table/store/connector/sink/LogStoreSinkITCase.java
index 59465a73..1f4cb4e5 100644
--- a/flink-table-store-connector/src/test/java/org/apache/flink/table/store/connector/sink/LogStoreSinkITCase.java
+++ b/flink-table-store-connector/src/test/java/org/apache/flink/table/store/connector/sink/LogStoreSinkITCase.java
@@ -21,7 +21,8 @@ package org.apache.flink.table.store.connector.sink;
 import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
 import org.apache.flink.table.data.RowData;
 import org.apache.flink.table.factories.DynamicTableFactory;
-import org.apache.flink.table.store.CoreOptions;
+import org.apache.flink.table.store.CoreOptions.LogChangelogMode;
+import org.apache.flink.table.store.CoreOptions.LogConsistency;
 import org.apache.flink.table.store.connector.source.FlinkSourceBuilder;
 import org.apache.flink.table.store.file.utils.BlockingIterator;
 import org.apache.flink.table.store.kafka.KafkaLogSinkProvider;
@@ -102,10 +103,8 @@ public class LogStoreSinkITCase extends KafkaTableTestBase {
                 KafkaLogTestUtils.testContext(
                         name,
                         getBootstrapServers(),
-                        CoreOptions.LogChangelogMode.AUTO,
-                        transaction
-                                ? CoreOptions.LogConsistency.TRANSACTIONAL
-                                : CoreOptions.LogConsistency.EVENTUAL,
+                        LogChangelogMode.AUTO,
+                        transaction ? LogConsistency.TRANSACTIONAL : LogConsistency.EVENTUAL,
                         TABLE_TYPE,
                         hasPk ? new int[] {2} : new int[0]);
 
diff --git a/flink-table-store-connector/src/test/java/org/apache/flink/table/store/connector/source/TestChangelogDataReadWrite.java b/flink-table-store-connector/src/test/java/org/apache/flink/table/store/connector/source/TestChangelogDataReadWrite.java
index 4a4be635..16134667 100644
--- a/flink-table-store-connector/src/test/java/org/apache/flink/table/store/connector/source/TestChangelogDataReadWrite.java
+++ b/flink-table-store-connector/src/test/java/org/apache/flink/table/store/connector/source/TestChangelogDataReadWrite.java
@@ -156,7 +156,7 @@ public class TestChangelogDataReadWrite {
                         .createEmptyWriter(partition, bucket, service);
         ((MemoryOwner) writer)
                 .setMemoryPool(
-                        new HeapMemorySegmentPool(options.writeBufferSize, options.pageSize));
+                        new HeapMemorySegmentPool(options.writeBufferSize(), options.pageSize()));
         return writer;
     }
 }
diff --git a/flink-table-store-kafka/src/main/java/org/apache/flink/table/store/kafka/KafkaLogOptions.java b/flink-table-store-core/src/main/java/org/apache/flink/table/store/CatalogOptions.java
similarity index 58%
copy from flink-table-store-kafka/src/main/java/org/apache/flink/table/store/kafka/KafkaLogOptions.java
copy to flink-table-store-core/src/main/java/org/apache/flink/table/store/CatalogOptions.java
index 8cdb6d0c..925a0d13 100644
--- a/flink-table-store-kafka/src/main/java/org/apache/flink/table/store/kafka/KafkaLogOptions.java
+++ b/flink-table-store-core/src/main/java/org/apache/flink/table/store/CatalogOptions.java
@@ -16,23 +16,30 @@
  * limitations under the License.
  */
 
-package org.apache.flink.table.store.kafka;
+package org.apache.flink.table.store;
 
 import org.apache.flink.configuration.ConfigOption;
 import org.apache.flink.configuration.ConfigOptions;
+import org.apache.flink.table.store.file.catalog.FileSystemCatalogFactory;
 
-/** Options for kafka log. */
-public class KafkaLogOptions {
+/** Catalog options for table store. */
+public class CatalogOptions {
+    private CatalogOptions() {}
 
-    public static final ConfigOption<String> BOOTSTRAP_SERVERS =
-            ConfigOptions.key("kafka.bootstrap.servers")
+    public static final ConfigOption<String> WAREHOUSE =
+            ConfigOptions.key("warehouse")
                     .stringType()
                     .noDefaultValue()
-                    .withDescription("Required Kafka server connection string");
+                    .withDescription("The warehouse root path of catalog.");
 
-    public static final ConfigOption<String> TOPIC =
-            ConfigOptions.key("topic")
+    public static final ConfigOption<String> METASTORE =
+            ConfigOptions.key("metastore")
+                    .stringType()
+                    .defaultValue(FileSystemCatalogFactory.IDENTIFIER);
+
+    public static final ConfigOption<String> URI =
+            ConfigOptions.key("uri")
                     .stringType()
                     .noDefaultValue()
-                    .withDescription("Topic of this kafka table.");
+                    .withDescription("Uri of metastore server.");
 }
diff --git a/flink-table-store-core/src/main/java/org/apache/flink/table/store/CoreOptions.java b/flink-table-store-core/src/main/java/org/apache/flink/table/store/CoreOptions.java
index 1df27218..07557caa 100644
--- a/flink-table-store-core/src/main/java/org/apache/flink/table/store/CoreOptions.java
+++ b/flink-table-store-core/src/main/java/org/apache/flink/table/store/CoreOptions.java
@@ -18,6 +18,7 @@
 
 package org.apache.flink.table.store;
 
+import org.apache.flink.annotation.Internal;
 import org.apache.flink.configuration.ConfigOption;
 import org.apache.flink.configuration.ConfigOptions;
 import org.apache.flink.configuration.Configuration;
@@ -26,16 +27,16 @@ import org.apache.flink.configuration.MemorySize;
 import org.apache.flink.configuration.description.Description;
 import org.apache.flink.configuration.description.InlineElement;
 import org.apache.flink.core.fs.Path;
-import org.apache.flink.table.catalog.ObjectIdentifier;
 import org.apache.flink.table.store.file.WriteMode;
 import org.apache.flink.table.store.format.FileFormat;
 import org.apache.flink.util.Preconditions;
 
 import java.io.Serializable;
+import java.lang.reflect.Field;
 import java.time.Duration;
-import java.util.HashSet;
+import java.util.ArrayList;
+import java.util.List;
 import java.util.Map;
-import java.util.Set;
 
 import static org.apache.flink.configuration.ConfigOptions.key;
 import static org.apache.flink.configuration.description.TextElement.text;
@@ -43,8 +44,6 @@ import static org.apache.flink.table.store.utils.OptionsUtils.formatEnumOption;
 
 /** Core options for table store. */
 public class CoreOptions implements Serializable {
-    public static final String LOG_PREFIX = "log.";
-    public static final String TABLE_STORE_PREFIX = "table-store.";
 
     public static final ConfigOption<Integer> BUCKET =
             ConfigOptions.key("bucket")
@@ -52,6 +51,7 @@ public class CoreOptions implements Serializable {
                     .defaultValue(1)
                     .withDescription("Bucket number for file store.");
 
+    @Internal
     public static final ConfigOption<String> PATH =
             ConfigOptions.key("path")
                     .stringType()
@@ -116,18 +116,17 @@ public class CoreOptions implements Serializable {
                     .defaultValue(Duration.ofSeconds(1))
                     .withDescription("The discovery interval of continuous reading.");
 
-    public static final ConfigOption<CoreOptions.MergeEngine> MERGE_ENGINE =
+    public static final ConfigOption<MergeEngine> MERGE_ENGINE =
             ConfigOptions.key("merge-engine")
-                    .enumType(CoreOptions.MergeEngine.class)
-                    .defaultValue(CoreOptions.MergeEngine.DEDUPLICATE)
+                    .enumType(MergeEngine.class)
+                    .defaultValue(MergeEngine.DEDUPLICATE)
                     .withDescription(
                             Description.builder()
                                     .text("Specify the merge engine for table with primary key.")
                                     .linebreak()
                                     .list(
-                                            formatEnumOption(CoreOptions.MergeEngine.DEDUPLICATE),
-                                            formatEnumOption(
-                                                    CoreOptions.MergeEngine.PARTIAL_UPDATE))
+                                            formatEnumOption(MergeEngine.DEDUPLICATE),
+                                            formatEnumOption(MergeEngine.PARTIAL_UPDATE))
                                     .build());
 
     public static final ConfigOption<WriteMode> WRITE_MODE =
@@ -230,7 +229,7 @@ public class CoreOptions implements Serializable {
                                     + "it can be read directly during stream reads.");
 
     public static final ConfigOption<LogStartupMode> LOG_SCAN =
-            ConfigOptions.key("scan")
+            ConfigOptions.key("log.scan")
                     .enumType(LogStartupMode.class)
                     .defaultValue(LogStartupMode.FULL)
                     .withDescription(
@@ -243,21 +242,21 @@ public class CoreOptions implements Serializable {
                                     .build());
 
     public static final ConfigOption<Long> LOG_SCAN_TIMESTAMP_MILLS =
-            ConfigOptions.key("scan.timestamp-millis")
+            ConfigOptions.key("log.scan.timestamp-millis")
                     .longType()
                     .noDefaultValue()
                     .withDescription(
                             "Optional timestamp used in case of \"from-timestamp\" scan mode");
 
     public static final ConfigOption<Duration> LOG_RETENTION =
-            ConfigOptions.key("retention")
+            ConfigOptions.key("log.retention")
                     .durationType()
                     .noDefaultValue()
                     .withDescription(
                             "It means how long changes log will be kept. The default value is from the log system cluster.");
 
     public static final ConfigOption<LogConsistency> LOG_CONSISTENCY =
-            ConfigOptions.key("consistency")
+            ConfigOptions.key("log.consistency")
                     .enumType(LogConsistency.class)
                     .defaultValue(LogConsistency.TRANSACTIONAL)
                     .withDescription(
@@ -270,7 +269,7 @@ public class CoreOptions implements Serializable {
                                     .build());
 
     public static final ConfigOption<LogChangelogMode> LOG_CHANGELOG_MODE =
-            ConfigOptions.key("changelog-mode")
+            ConfigOptions.key("log.changelog-mode")
                     .enumType(LogChangelogMode.class)
                     .defaultValue(LogChangelogMode.AUTO)
                     .withDescription(
@@ -284,79 +283,27 @@ public class CoreOptions implements Serializable {
                                     .build());
 
     public static final ConfigOption<String> LOG_KEY_FORMAT =
-            ConfigOptions.key("key.format")
+            ConfigOptions.key("log.key.format")
                     .stringType()
                     .defaultValue("json")
                     .withDescription(
                             "Specify the key message format of log system with primary key.");
 
     public static final ConfigOption<String> LOG_FORMAT =
-            ConfigOptions.key("format")
+            ConfigOptions.key("log.format")
                     .stringType()
                     .defaultValue("debezium-json")
                     .withDescription("Specify the message format of log system.");
 
-    public long writeBufferSize;
+    private final Configuration options;
 
-    public int pageSize;
-
-    public long targetFileSize;
-
-    public int numSortedRunCompactionTrigger;
-
-    public int numSortedRunStopTrigger;
-
-    public int numLevels;
-
-    public boolean commitForceCompact;
-
-    public int maxSizeAmplificationPercent;
-
-    public int sizeRatio;
-
-    public boolean enableChangelogFile;
-
-    private Configuration options;
-
-    public CoreOptions(
-            long writeBufferSize,
-            int pageSize,
-            long targetFileSize,
-            int numSortedRunCompactionTrigger,
-            int numSortedRunStopTrigger,
-            Integer numLevels,
-            boolean commitForceCompact,
-            int maxSizeAmplificationPercent,
-            int sizeRatio,
-            boolean enableChangelogFile) {
-        this.writeBufferSize = writeBufferSize;
-        this.pageSize = pageSize;
-        this.targetFileSize = targetFileSize;
-        this.numSortedRunCompactionTrigger = numSortedRunCompactionTrigger;
-        this.numSortedRunStopTrigger =
-                Math.max(numSortedRunCompactionTrigger, numSortedRunStopTrigger);
-        // By default, this ensures that the compaction does not fall to level 0, but at least to
-        // level 1
-        this.numLevels = numLevels == null ? numSortedRunCompactionTrigger + 1 : numLevels;
-        this.commitForceCompact = commitForceCompact;
-        this.maxSizeAmplificationPercent = maxSizeAmplificationPercent;
-        this.sizeRatio = sizeRatio;
-        this.enableChangelogFile = enableChangelogFile;
+    public CoreOptions(Map<String, String> options) {
+        this(Configuration.fromMap(options));
     }
 
-    public CoreOptions(Configuration config) {
-        this(
-                config.get(WRITE_BUFFER_SIZE).getBytes(),
-                (int) config.get(PAGE_SIZE).getBytes(),
-                config.get(TARGET_FILE_SIZE).getBytes(),
-                config.get(NUM_SORTED_RUNS_COMPACTION_TRIGGER),
-                config.get(NUM_SORTED_RUNS_STOP_TRIGGER),
-                config.get(NUM_LEVELS),
-                config.get(COMMIT_FORCE_COMPACT),
-                config.get(COMPACTION_MAX_SIZE_AMPLIFICATION_PERCENT),
-                config.get(COMPACTION_SIZE_RATIO),
-                config.get(CHANGELOG_FILE));
-        this.options = config;
+    public CoreOptions(Configuration options) {
+        this.options = options;
+        // TODO validate all keys
         Preconditions.checkArgument(
                 snapshotNumRetainMin() > 0,
                 SNAPSHOT_NUM_RETAINED_MIN.key() + " should be at least 1");
@@ -367,39 +314,6 @@ public class CoreOptions implements Serializable {
                         + SNAPSHOT_NUM_RETAINED_MAX.key());
     }
 
-    public CoreOptions(Map<String, String> options) {
-        this(Configuration.fromMap(options));
-    }
-
-    public static Set<ConfigOption<?>> allOptions() {
-        Set<ConfigOption<?>> allOptions = new HashSet<>();
-        allOptions.add(BUCKET);
-        allOptions.add(PATH);
-        allOptions.add(FILE_FORMAT);
-        allOptions.add(MANIFEST_FORMAT);
-        allOptions.add(MANIFEST_TARGET_FILE_SIZE);
-        allOptions.add(MANIFEST_MERGE_MIN_COUNT);
-        allOptions.add(PARTITION_DEFAULT_NAME);
-        allOptions.add(SNAPSHOT_NUM_RETAINED_MIN);
-        allOptions.add(SNAPSHOT_NUM_RETAINED_MAX);
-        allOptions.add(SNAPSHOT_TIME_RETAINED);
-        allOptions.add(CONTINUOUS_DISCOVERY_INTERVAL);
-        allOptions.add(MERGE_ENGINE);
-        allOptions.add(WRITE_MODE);
-        allOptions.add(SOURCE_SPLIT_TARGET_SIZE);
-        allOptions.add(SOURCE_SPLIT_OPEN_FILE_COST);
-        allOptions.add(WRITE_BUFFER_SIZE);
-        allOptions.add(PAGE_SIZE);
-        allOptions.add(TARGET_FILE_SIZE);
-        allOptions.add(NUM_SORTED_RUNS_COMPACTION_TRIGGER);
-        allOptions.add(NUM_SORTED_RUNS_STOP_TRIGGER);
-        allOptions.add(NUM_LEVELS);
-        allOptions.add(COMMIT_FORCE_COMPACT);
-        allOptions.add(COMPACTION_MAX_SIZE_AMPLIFICATION_PERCENT);
-        allOptions.add(COMPACTION_SIZE_RATIO);
-        return allOptions;
-    }
-
     public int bucket() {
         return options.get(BUCKET);
     }
@@ -416,14 +330,6 @@ public class CoreOptions implements Serializable {
         return new Path(options.get(PATH));
     }
 
-    public static String relativeTablePath(ObjectIdentifier tableIdentifier) {
-        return String.format(
-                "%s.catalog/%s.db/%s",
-                tableIdentifier.getCatalogName(),
-                tableIdentifier.getDatabaseName(),
-                tableIdentifier.getObjectName());
-    }
-
     public FileFormat fileFormat() {
         return FileFormat.fromTableOptions(options, FILE_FORMAT);
     }
@@ -464,6 +370,50 @@ public class CoreOptions implements Serializable {
         return options.get(SOURCE_SPLIT_OPEN_FILE_COST).getBytes();
     }
 
+    public long writeBufferSize() {
+        return options.get(WRITE_BUFFER_SIZE).getBytes();
+    }
+
+    public int pageSize() {
+        return (int) options.get(PAGE_SIZE).getBytes();
+    }
+
+    public long targetFileSize() {
+        return options.get(TARGET_FILE_SIZE).getBytes();
+    }
+
+    public int numSortedRunCompactionTrigger() {
+        return options.get(NUM_SORTED_RUNS_COMPACTION_TRIGGER);
+    }
+
+    public int numSortedRunStopTrigger() {
+        return Math.max(numSortedRunCompactionTrigger(), options.get(NUM_SORTED_RUNS_STOP_TRIGGER));
+    }
+
+    public int numLevels() {
+        // By default, this ensures that the compaction does not fall to level 0, but at least to
+        // level 1
+        Integer numLevels = options.get(NUM_LEVELS);
+        numLevels = numLevels == null ? numSortedRunCompactionTrigger() + 1 : numLevels;
+        return numLevels;
+    }
+
+    public boolean commitForceCompact() {
+        return options.get(COMMIT_FORCE_COMPACT);
+    }
+
+    public int maxSizeAmplificationPercent() {
+        return options.get(COMPACTION_MAX_SIZE_AMPLIFICATION_PERCENT);
+    }
+
+    public int sizeRatio() {
+        return options.get(COMPACTION_SIZE_RATIO);
+    }
+
+    public boolean enableChangelogFile() {
+        return options.get(CHANGELOG_FILE);
+    }
+
     /** Specifies the merge engine for table with primary key. */
     public enum MergeEngine implements DescribedEnum {
         DEDUPLICATE("deduplicate", "De-duplicate and keep the last row."),
@@ -579,4 +529,20 @@ public class CoreOptions implements Serializable {
             return text(description);
         }
     }
+
+    @Internal
+    public static List<ConfigOption<?>> getOptions() {
+        final Field[] fields = CoreOptions.class.getFields();
+        final List<ConfigOption<?>> list = new ArrayList<>(fields.length);
+        for (Field field : fields) {
+            if (ConfigOption.class.isAssignableFrom(field.getType())) {
+                try {
+                    list.add((ConfigOption<?>) field.get(CoreOptions.class));
+                } catch (IllegalAccessException e) {
+                    throw new RuntimeException(e);
+                }
+            }
+        }
+        return list;
+    }
 }
diff --git a/flink-table-store-core/src/main/java/org/apache/flink/table/store/file/AppendOnlyFileStore.java b/flink-table-store-core/src/main/java/org/apache/flink/table/store/file/AppendOnlyFileStore.java
index 6236a39f..897ab2dc 100644
--- a/flink-table-store-core/src/main/java/org/apache/flink/table/store/file/AppendOnlyFileStore.java
+++ b/flink-table-store-core/src/main/java/org/apache/flink/table/store/file/AppendOnlyFileStore.java
@@ -62,7 +62,7 @@ public class AppendOnlyFileStore extends AbstractFileStore<RowData> {
                 pathFactory(),
                 snapshotManager(),
                 newScan(true),
-                options.targetFileSize);
+                options.targetFileSize());
     }
 
     private AppendOnlyFileStoreScan newScan(boolean checkNumOfBuckets) {
diff --git a/flink-table-store-core/src/main/java/org/apache/flink/table/store/file/FileStoreOptions.java b/flink-table-store-core/src/main/java/org/apache/flink/table/store/file/FileStoreOptions.java
deleted file mode 100644
index 43065064..00000000
--- a/flink-table-store-core/src/main/java/org/apache/flink/table/store/file/FileStoreOptions.java
+++ /dev/null
@@ -1,289 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.table.store.file;
-
-import org.apache.flink.configuration.ConfigOption;
-import org.apache.flink.configuration.ConfigOptions;
-import org.apache.flink.configuration.Configuration;
-import org.apache.flink.configuration.DescribedEnum;
-import org.apache.flink.configuration.MemorySize;
-import org.apache.flink.configuration.description.Description;
-import org.apache.flink.configuration.description.InlineElement;
-import org.apache.flink.core.fs.Path;
-import org.apache.flink.table.catalog.ObjectIdentifier;
-import org.apache.flink.table.store.file.mergetree.MergeTreeOptions;
-import org.apache.flink.table.store.format.FileFormat;
-import org.apache.flink.util.Preconditions;
-
-import java.io.Serializable;
-import java.time.Duration;
-import java.util.HashSet;
-import java.util.Map;
-import java.util.Set;
-
-import static org.apache.flink.configuration.ConfigOptions.key;
-import static org.apache.flink.configuration.description.TextElement.text;
-import static org.apache.flink.table.store.utils.OptionsUtils.formatEnumOption;
-
-/** Options for {@link FileStore}. */
-public class FileStoreOptions implements Serializable {
-
-    public static final String TABLE_STORE_PREFIX = "table-store.";
-
-    public static final ConfigOption<Integer> BUCKET =
-            ConfigOptions.key("bucket")
-                    .intType()
-                    .defaultValue(1)
-                    .withDescription("Bucket number for file store.");
-
-    public static final ConfigOption<String> PATH =
-            ConfigOptions.key("path")
-                    .stringType()
-                    .noDefaultValue()
-                    .withDescription("The file path of this table in the filesystem.");
-
-    public static final ConfigOption<String> FILE_FORMAT =
-            ConfigOptions.key("file.format")
-                    .stringType()
-                    .defaultValue("orc")
-                    .withDescription("Specify the message format of data files.");
-
-    public static final ConfigOption<String> MANIFEST_FORMAT =
-            ConfigOptions.key("manifest.format")
-                    .stringType()
-                    .defaultValue("avro")
-                    .withDescription("Specify the message format of manifest files.");
-
-    public static final ConfigOption<MemorySize> MANIFEST_TARGET_FILE_SIZE =
-            ConfigOptions.key("manifest.target-file-size")
-                    .memoryType()
-                    .defaultValue(MemorySize.ofMebiBytes(8))
-                    .withDescription("Suggested file size of a manifest file.");
-
-    public static final ConfigOption<Integer> MANIFEST_MERGE_MIN_COUNT =
-            ConfigOptions.key("manifest.merge-min-count")
-                    .intType()
-                    .defaultValue(30)
-                    .withDescription(
-                            "To avoid frequent manifest merges, this parameter specifies the minimum number "
-                                    + "of ManifestFileMeta to merge.");
-
-    public static final ConfigOption<String> PARTITION_DEFAULT_NAME =
-            key("partition.default-name")
-                    .stringType()
-                    .defaultValue("__DEFAULT_PARTITION__")
-                    .withDescription(
-                            "The default partition name in case the dynamic partition"
-                                    + " column value is null/empty string.");
-
-    public static final ConfigOption<Integer> SNAPSHOT_NUM_RETAINED_MIN =
-            ConfigOptions.key("snapshot.num-retained.min")
-                    .intType()
-                    .defaultValue(10)
-                    .withDescription("The minimum number of completed snapshots to retain.");
-
-    public static final ConfigOption<Integer> SNAPSHOT_NUM_RETAINED_MAX =
-            ConfigOptions.key("snapshot.num-retained.max")
-                    .intType()
-                    .defaultValue(Integer.MAX_VALUE)
-                    .withDescription("The maximum number of completed snapshots to retain.");
-
-    public static final ConfigOption<Duration> SNAPSHOT_TIME_RETAINED =
-            ConfigOptions.key("snapshot.time-retained")
-                    .durationType()
-                    .defaultValue(Duration.ofHours(1))
-                    .withDescription("The maximum time of completed snapshots to retain.");
-
-    public static final ConfigOption<Duration> CONTINUOUS_DISCOVERY_INTERVAL =
-            ConfigOptions.key("continuous.discovery-interval")
-                    .durationType()
-                    .defaultValue(Duration.ofSeconds(1))
-                    .withDescription("The discovery interval of continuous reading.");
-
-    public static final ConfigOption<MergeEngine> MERGE_ENGINE =
-            ConfigOptions.key("merge-engine")
-                    .enumType(MergeEngine.class)
-                    .defaultValue(MergeEngine.DEDUPLICATE)
-                    .withDescription(
-                            Description.builder()
-                                    .text("Specify the merge engine for table with primary key.")
-                                    .linebreak()
-                                    .list(
-                                            formatEnumOption(MergeEngine.DEDUPLICATE),
-                                            formatEnumOption(MergeEngine.PARTIAL_UPDATE))
-                                    .build());
-
-    public static final ConfigOption<WriteMode> WRITE_MODE =
-            ConfigOptions.key("write-mode")
-                    .enumType(WriteMode.class)
-                    .defaultValue(WriteMode.CHANGE_LOG)
-                    .withDescription(
-                            Description.builder()
-                                    .text("Specify the write mode for table.")
-                                    .linebreak()
-                                    .list(formatEnumOption(WriteMode.APPEND_ONLY))
-                                    .list(formatEnumOption(WriteMode.CHANGE_LOG))
-                                    .build());
-
-    public static final ConfigOption<MemorySize> SOURCE_SPLIT_TARGET_SIZE =
-            ConfigOptions.key("source.split.target-size")
-                    .memoryType()
-                    .defaultValue(MemorySize.ofMebiBytes(128))
-                    .withDescription("Target size of a source split when scanning a bucket.");
-
-    public static final ConfigOption<MemorySize> SOURCE_SPLIT_OPEN_FILE_COST =
-            ConfigOptions.key("source.split.open-file-cost")
-                    .memoryType()
-                    .defaultValue(MemorySize.ofMebiBytes(4))
-                    .withDescription(
-                            "Open file cost of a source file. It is used to avoid reading"
-                                    + " too many files with a source split, which can be very slow.");
-
-    private final Configuration options;
-
-    public static Set<ConfigOption<?>> allOptions() {
-        Set<ConfigOption<?>> allOptions = new HashSet<>();
-        allOptions.add(BUCKET);
-        allOptions.add(PATH);
-        allOptions.add(FILE_FORMAT);
-        allOptions.add(MANIFEST_FORMAT);
-        allOptions.add(MANIFEST_TARGET_FILE_SIZE);
-        allOptions.add(MANIFEST_MERGE_MIN_COUNT);
-        allOptions.add(PARTITION_DEFAULT_NAME);
-        allOptions.add(SNAPSHOT_NUM_RETAINED_MIN);
-        allOptions.add(SNAPSHOT_NUM_RETAINED_MAX);
-        allOptions.add(SNAPSHOT_TIME_RETAINED);
-        allOptions.add(CONTINUOUS_DISCOVERY_INTERVAL);
-        allOptions.add(MERGE_ENGINE);
-        allOptions.add(WRITE_MODE);
-        allOptions.add(SOURCE_SPLIT_TARGET_SIZE);
-        allOptions.add(SOURCE_SPLIT_OPEN_FILE_COST);
-        return allOptions;
-    }
-
-    public FileStoreOptions(Map<String, String> options) {
-        this(Configuration.fromMap(options));
-    }
-
-    public FileStoreOptions(Configuration options) {
-        this.options = options;
-        // TODO validate all keys
-        Preconditions.checkArgument(
-                snapshotNumRetainMin() > 0,
-                SNAPSHOT_NUM_RETAINED_MIN.key() + " should be at least 1");
-        Preconditions.checkArgument(
-                snapshotNumRetainMin() <= snapshotNumRetainMax(),
-                SNAPSHOT_NUM_RETAINED_MIN.key()
-                        + " should not be larger than "
-                        + SNAPSHOT_NUM_RETAINED_MAX.key());
-    }
-
-    public int bucket() {
-        return options.get(BUCKET);
-    }
-
-    public Path path() {
-        return path(options.toMap());
-    }
-
-    public static Path path(Map<String, String> options) {
-        return new Path(options.get(PATH.key()));
-    }
-
-    public static Path path(Configuration options) {
-        return new Path(options.get(PATH));
-    }
-
-    public static String relativeTablePath(ObjectIdentifier tableIdentifier) {
-        return String.format(
-                "%s.catalog/%s.db/%s",
-                tableIdentifier.getCatalogName(),
-                tableIdentifier.getDatabaseName(),
-                tableIdentifier.getObjectName());
-    }
-
-    public FileFormat fileFormat() {
-        return FileFormat.fromTableOptions(options, FILE_FORMAT);
-    }
-
-    public FileFormat manifestFormat() {
-        return FileFormat.fromTableOptions(options, MANIFEST_FORMAT);
-    }
-
-    public MemorySize manifestTargetSize() {
-        return options.get(MANIFEST_TARGET_FILE_SIZE);
-    }
-
-    public String partitionDefaultName() {
-        return options.get(PARTITION_DEFAULT_NAME);
-    }
-
-    public MergeTreeOptions mergeTreeOptions() {
-        return new MergeTreeOptions(options);
-    }
-
-    public int snapshotNumRetainMin() {
-        return options.get(SNAPSHOT_NUM_RETAINED_MIN);
-    }
-
-    public int snapshotNumRetainMax() {
-        return options.get(SNAPSHOT_NUM_RETAINED_MAX);
-    }
-
-    public Duration snapshotTimeRetain() {
-        return options.get(SNAPSHOT_TIME_RETAINED);
-    }
-
-    public int manifestMergeMinCount() {
-        return options.get(MANIFEST_MERGE_MIN_COUNT);
-    }
-
-    public long splitTargetSize() {
-        return options.get(SOURCE_SPLIT_TARGET_SIZE).getBytes();
-    }
-
-    public long splitOpenFileCost() {
-        return options.get(SOURCE_SPLIT_OPEN_FILE_COST).getBytes();
-    }
-
-    /** Specifies the merge engine for table with primary key. */
-    public enum MergeEngine implements DescribedEnum {
-        DEDUPLICATE("deduplicate", "De-duplicate and keep the last row."),
-
-        PARTIAL_UPDATE("partial-update", "Partial update non-null fields.");
-
-        private final String value;
-        private final String description;
-
-        MergeEngine(String value, String description) {
-            this.value = value;
-            this.description = description;
-        }
-
-        @Override
-        public String toString() {
-            return value;
-        }
-
-        @Override
-        public InlineElement getDescription() {
-            return text(description);
-        }
-    }
-}
diff --git a/flink-table-store-core/src/main/java/org/apache/flink/table/store/file/KeyValueFileStore.java b/flink-table-store-core/src/main/java/org/apache/flink/table/store/file/KeyValueFileStore.java
index fb751a73..9443ddd2 100644
--- a/flink-table-store-core/src/main/java/org/apache/flink/table/store/file/KeyValueFileStore.java
+++ b/flink-table-store-core/src/main/java/org/apache/flink/table/store/file/KeyValueFileStore.java
@@ -34,6 +34,8 @@ import java.util.function.Supplier;
 /** {@link FileStore} for querying and updating {@link KeyValue}s. */
 public class KeyValueFileStore extends AbstractFileStore<KeyValue> {
 
+    private static final long serialVersionUID = 1L;
+
     private final RowType keyType;
     private final RowType valueType;
     private final Supplier<Comparator<RowData>> keyComparatorSupplier;
diff --git a/flink-table-store-core/src/main/java/org/apache/flink/table/store/file/catalog/CatalogFactory.java b/flink-table-store-core/src/main/java/org/apache/flink/table/store/file/catalog/CatalogFactory.java
index 403c8be0..608cd266 100644
--- a/flink-table-store-core/src/main/java/org/apache/flink/table/store/file/catalog/CatalogFactory.java
+++ b/flink-table-store-core/src/main/java/org/apache/flink/table/store/file/catalog/CatalogFactory.java
@@ -18,8 +18,6 @@
 
 package org.apache.flink.table.store.file.catalog;
 
-import org.apache.flink.configuration.ConfigOption;
-import org.apache.flink.configuration.ConfigOptions;
 import org.apache.flink.configuration.ReadableConfig;
 import org.apache.flink.util.Preconditions;
 
@@ -28,6 +26,9 @@ import java.util.List;
 import java.util.ServiceLoader;
 import java.util.stream.Collectors;
 
+import static org.apache.flink.table.store.CatalogOptions.METASTORE;
+import static org.apache.flink.table.store.CatalogOptions.WAREHOUSE;
+
 /** Factory to create {@link Catalog}. Each factory should have a unique identifier. */
 public interface CatalogFactory {
 
@@ -35,17 +36,6 @@ public interface CatalogFactory {
 
     Catalog create(String warehouse, ReadableConfig options);
 
-    ConfigOption<String> METASTORE =
-            ConfigOptions.key("metastore")
-                    .stringType()
-                    .defaultValue(FileSystemCatalogFactory.IDENTIFIER);
-
-    ConfigOption<String> WAREHOUSE =
-            ConfigOptions.key("warehouse")
-                    .stringType()
-                    .noDefaultValue()
-                    .withDescription("The warehouse root path of catalog.");
-
     static Catalog createCatalog(ReadableConfig options) {
         // manual validation
         // because different catalog types may have different options
diff --git a/flink-table-store-core/src/main/java/org/apache/flink/table/store/file/mergetree/MergeTreeOptions.java b/flink-table-store-core/src/main/java/org/apache/flink/table/store/file/mergetree/MergeTreeOptions.java
deleted file mode 100644
index 3f3bfa07..00000000
--- a/flink-table-store-core/src/main/java/org/apache/flink/table/store/file/mergetree/MergeTreeOptions.java
+++ /dev/null
@@ -1,178 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.table.store.file.mergetree;
-
-import org.apache.flink.configuration.ConfigOption;
-import org.apache.flink.configuration.ConfigOptions;
-import org.apache.flink.configuration.MemorySize;
-import org.apache.flink.configuration.ReadableConfig;
-
-import java.util.HashSet;
-import java.util.Set;
-
-/** Options for merge tree. */
-public class MergeTreeOptions {
-
-    public static final ConfigOption<MemorySize> WRITE_BUFFER_SIZE =
-            ConfigOptions.key("write-buffer-size")
-                    .memoryType()
-                    .defaultValue(MemorySize.parse("256 mb"))
-                    .withDescription(
-                            "Amount of data to build up in memory before converting to a sorted on-disk file.");
-
-    public static final ConfigOption<MemorySize> PAGE_SIZE =
-            ConfigOptions.key("page-size")
-                    .memoryType()
-                    .defaultValue(MemorySize.parse("64 kb"))
-                    .withDescription("Memory page size.");
-
-    public static final ConfigOption<MemorySize> TARGET_FILE_SIZE =
-            ConfigOptions.key("target-file-size")
-                    .memoryType()
-                    .defaultValue(MemorySize.ofMebiBytes(128))
-                    .withDescription("Target size of a file.");
-
-    public static final ConfigOption<Integer> NUM_SORTED_RUNS_COMPACTION_TRIGGER =
-            ConfigOptions.key("num-sorted-run.compaction-trigger")
-                    .intType()
-                    .defaultValue(5)
-                    .withDescription(
-                            "The sorted run number to trigger compaction. Includes level0 files (one file one sorted run) and "
-                                    + "high-level runs (one level one sorted run).");
-
-    public static final ConfigOption<Integer> NUM_SORTED_RUNS_STOP_TRIGGER =
-            ConfigOptions.key("num-sorted-run.stop-trigger")
-                    .intType()
-                    .defaultValue(10)
-                    .withDescription(
-                            "The number of sorted-runs that trigger the stopping of writes.");
-
-    public static final ConfigOption<Integer> NUM_LEVELS =
-            ConfigOptions.key("num-levels")
-                    .intType()
-                    .noDefaultValue()
-                    .withDescription(
-                            "Total level number, for example, there are 3 levels, including 0,1,2 levels.");
-
-    public static final ConfigOption<Boolean> COMMIT_FORCE_COMPACT =
-            ConfigOptions.key("commit.force-compact")
-                    .booleanType()
-                    .defaultValue(false)
-                    .withDescription("Whether to force a compaction before commit.");
-
-    public static final ConfigOption<Integer> COMPACTION_MAX_SIZE_AMPLIFICATION_PERCENT =
-            ConfigOptions.key("compaction.max-size-amplification-percent")
-                    .intType()
-                    .defaultValue(200)
-                    .withDescription(
-                            "The size amplification is defined as the amount (in percentage) of additional storage "
-                                    + "needed to store a single byte of data in the merge tree.");
-
-    public static final ConfigOption<Integer> COMPACTION_SIZE_RATIO =
-            ConfigOptions.key("compaction.size-ratio")
-                    .intType()
-                    .defaultValue(1)
-                    .withDescription(
-                            "Percentage flexibility while comparing sorted run size. If the candidate sorted run(s) "
-                                    + "size is 1% smaller than the next sorted run's size, then include next sorted run "
-                                    + "into this candidate set.");
-
-    public static final ConfigOption<Boolean> CHANGELOG_FILE =
-            ConfigOptions.key("changelog-file")
-                    .booleanType()
-                    .defaultValue(false)
-                    .withDescription(
-                            "Whether to double write to a changelog file when flushing memory table. "
-                                    + "This changelog file keeps the order of data input and the details of data changes, "
-                                    + "it can be read directly during stream reads.");
-
-    public final long writeBufferSize;
-
-    public final int pageSize;
-
-    public final long targetFileSize;
-
-    public final int numSortedRunCompactionTrigger;
-
-    public final int numSortedRunStopTrigger;
-
-    public final int numLevels;
-
-    public final boolean commitForceCompact;
-
-    public final int maxSizeAmplificationPercent;
-
-    public final int sizeRatio;
-
-    public final boolean enableChangelogFile;
-
-    public MergeTreeOptions(
-            long writeBufferSize,
-            int pageSize,
-            long targetFileSize,
-            int numSortedRunCompactionTrigger,
-            int numSortedRunStopTrigger,
-            Integer numLevels,
-            boolean commitForceCompact,
-            int maxSizeAmplificationPercent,
-            int sizeRatio,
-            boolean enableChangelogFile) {
-        this.writeBufferSize = writeBufferSize;
-        this.pageSize = pageSize;
-        this.targetFileSize = targetFileSize;
-        this.numSortedRunCompactionTrigger = numSortedRunCompactionTrigger;
-        this.numSortedRunStopTrigger =
-                Math.max(numSortedRunCompactionTrigger, numSortedRunStopTrigger);
-        // By default, this ensures that the compaction does not fall to level 0, but at least to
-        // level 1
-        this.numLevels = numLevels == null ? numSortedRunCompactionTrigger + 1 : numLevels;
-        this.commitForceCompact = commitForceCompact;
-        this.maxSizeAmplificationPercent = maxSizeAmplificationPercent;
-        this.sizeRatio = sizeRatio;
-        this.enableChangelogFile = enableChangelogFile;
-    }
-
-    public MergeTreeOptions(ReadableConfig config) {
-        this(
-                config.get(WRITE_BUFFER_SIZE).getBytes(),
-                (int) config.get(PAGE_SIZE).getBytes(),
-                config.get(TARGET_FILE_SIZE).getBytes(),
-                config.get(NUM_SORTED_RUNS_COMPACTION_TRIGGER),
-                config.get(NUM_SORTED_RUNS_STOP_TRIGGER),
-                config.get(NUM_LEVELS),
-                config.get(COMMIT_FORCE_COMPACT),
-                config.get(COMPACTION_MAX_SIZE_AMPLIFICATION_PERCENT),
-                config.get(COMPACTION_SIZE_RATIO),
-                config.get(CHANGELOG_FILE));
-    }
-
-    public static Set<ConfigOption<?>> allOptions() {
-        Set<ConfigOption<?>> allOptions = new HashSet<>();
-        allOptions.add(WRITE_BUFFER_SIZE);
-        allOptions.add(PAGE_SIZE);
-        allOptions.add(TARGET_FILE_SIZE);
-        allOptions.add(NUM_SORTED_RUNS_COMPACTION_TRIGGER);
-        allOptions.add(NUM_SORTED_RUNS_STOP_TRIGGER);
-        allOptions.add(NUM_LEVELS);
-        allOptions.add(COMMIT_FORCE_COMPACT);
-        allOptions.add(COMPACTION_MAX_SIZE_AMPLIFICATION_PERCENT);
-        allOptions.add(COMPACTION_SIZE_RATIO);
-        return allOptions;
-    }
-}
diff --git a/flink-table-store-core/src/main/java/org/apache/flink/table/store/file/operation/KeyValueFileStoreWrite.java b/flink-table-store-core/src/main/java/org/apache/flink/table/store/file/operation/KeyValueFileStoreWrite.java
index b06df389..09cd4604 100644
--- a/flink-table-store-core/src/main/java/org/apache/flink/table/store/file/operation/KeyValueFileStoreWrite.java
+++ b/flink-table-store-core/src/main/java/org/apache/flink/table/store/file/operation/KeyValueFileStoreWrite.java
@@ -85,7 +85,7 @@ public class KeyValueFileStoreWrite extends AbstractFileStoreWrite<KeyValue> {
                         valueType,
                         fileFormat,
                         pathFactory,
-                        options.targetFileSize);
+                        options.targetFileSize());
         this.keyComparatorSupplier = keyComparatorSupplier;
         this.mergeFunction = mergeFunction;
         this.options = options;
@@ -112,10 +112,10 @@ public class KeyValueFileStoreWrite extends AbstractFileStoreWrite<KeyValue> {
         }
         Comparator<RowData> keyComparator = keyComparatorSupplier.get();
         CompactRewriter rewriter = compactRewriter(partition, bucket, keyComparator);
-        Levels levels = new Levels(keyComparator, compactFiles, options.numLevels);
+        Levels levels = new Levels(keyComparator, compactFiles, options.numLevels());
         CompactUnit unit =
                 CompactUnit.fromLevelRuns(levels.numberOfLevels() - 1, levels.levelSortedRuns());
-        return new CompactTask(keyComparator, options.targetFileSize, rewriter, unit, true);
+        return new CompactTask(keyComparator, options.targetFileSize(), rewriter, unit, true);
     }
 
     private MergeTreeWriter createMergeTreeWriter(
@@ -132,18 +132,18 @@ public class KeyValueFileStoreWrite extends AbstractFileStoreWrite<KeyValue> {
                         partition,
                         bucket,
                         new UniversalCompaction(
-                                options.maxSizeAmplificationPercent,
-                                options.sizeRatio,
-                                options.numSortedRunCompactionTrigger),
+                                options.maxSizeAmplificationPercent(),
+                                options.sizeRatio(),
+                                options.numSortedRunCompactionTrigger()),
                         compactExecutor),
-                new Levels(keyComparator, restoreFiles, options.numLevels),
+                new Levels(keyComparator, restoreFiles, options.numLevels()),
                 getMaxSequenceNumber(restoreFiles),
                 keyComparator,
                 mergeFunction.copy(),
                 dataFileWriter,
-                options.commitForceCompact,
-                options.numSortedRunStopTrigger,
-                options.enableChangelogFile);
+                options.commitForceCompact(),
+                options.numSortedRunStopTrigger(),
+                options.enableChangelogFile());
     }
 
     private CompactManager createCompactManager(
@@ -154,7 +154,11 @@ public class KeyValueFileStoreWrite extends AbstractFileStoreWrite<KeyValue> {
         Comparator<RowData> keyComparator = keyComparatorSupplier.get();
         CompactRewriter rewriter = compactRewriter(partition, bucket, keyComparator);
         return new CompactManager(
-                compactExecutor, compactStrategy, keyComparator, options.targetFileSize, rewriter);
+                compactExecutor,
+                compactStrategy,
+                keyComparator,
+                options.targetFileSize(),
+                rewriter);
     }
 
     private CompactRewriter compactRewriter(
diff --git a/flink-table-store-core/src/main/java/org/apache/flink/table/store/log/LogOptions.java b/flink-table-store-core/src/main/java/org/apache/flink/table/store/log/LogOptions.java
deleted file mode 100644
index f975233d..00000000
--- a/flink-table-store-core/src/main/java/org/apache/flink/table/store/log/LogOptions.java
+++ /dev/null
@@ -1,194 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.table.store.log;
-
-import org.apache.flink.configuration.ConfigOption;
-import org.apache.flink.configuration.ConfigOptions;
-import org.apache.flink.configuration.DescribedEnum;
-import org.apache.flink.configuration.description.Description;
-import org.apache.flink.configuration.description.InlineElement;
-
-import java.time.Duration;
-
-import static org.apache.flink.configuration.description.TextElement.text;
-import static org.apache.flink.table.store.utils.OptionsUtils.formatEnumOption;
-
-/** Options for log store. */
-public class LogOptions {
-
-    public static final String LOG_PREFIX = "log.";
-
-    public static final ConfigOption<LogStartupMode> SCAN =
-            ConfigOptions.key("scan")
-                    .enumType(LogStartupMode.class)
-                    .defaultValue(LogStartupMode.FULL)
-                    .withDescription(
-                            Description.builder()
-                                    .text("Specify the startup mode for log consumer.")
-                                    .linebreak()
-                                    .list(formatEnumOption(LogStartupMode.FULL))
-                                    .list(formatEnumOption(LogStartupMode.LATEST))
-                                    .list(formatEnumOption(LogStartupMode.FROM_TIMESTAMP))
-                                    .build());
-
-    public static final ConfigOption<Long> SCAN_TIMESTAMP_MILLS =
-            ConfigOptions.key("scan.timestamp-millis")
-                    .longType()
-                    .noDefaultValue()
-                    .withDescription(
-                            "Optional timestamp used in case of \"from-timestamp\" scan mode");
-
-    public static final ConfigOption<Duration> RETENTION =
-            ConfigOptions.key("retention")
-                    .durationType()
-                    .noDefaultValue()
-                    .withDescription(
-                            "It means how long changes log will be kept. The default value is from the log system cluster.");
-
-    public static final ConfigOption<LogConsistency> CONSISTENCY =
-            ConfigOptions.key("consistency")
-                    .enumType(LogConsistency.class)
-                    .defaultValue(LogConsistency.TRANSACTIONAL)
-                    .withDescription(
-                            Description.builder()
-                                    .text("Specify the log consistency mode for table.")
-                                    .linebreak()
-                                    .list(
-                                            formatEnumOption(LogConsistency.TRANSACTIONAL),
-                                            formatEnumOption(LogConsistency.EVENTUAL))
-                                    .build());
-
-    public static final ConfigOption<LogChangelogMode> CHANGELOG_MODE =
-            ConfigOptions.key("changelog-mode")
-                    .enumType(LogChangelogMode.class)
-                    .defaultValue(LogChangelogMode.AUTO)
-                    .withDescription(
-                            Description.builder()
-                                    .text("Specify the log changelog mode for table.")
-                                    .linebreak()
-                                    .list(
-                                            formatEnumOption(LogChangelogMode.AUTO),
-                                            formatEnumOption(LogChangelogMode.ALL),
-                                            formatEnumOption(LogChangelogMode.UPSERT))
-                                    .build());
-
-    public static final ConfigOption<String> KEY_FORMAT =
-            ConfigOptions.key("key.format")
-                    .stringType()
-                    .defaultValue("json")
-                    .withDescription(
-                            "Specify the key message format of log system with primary key.");
-
-    public static final ConfigOption<String> FORMAT =
-            ConfigOptions.key("format")
-                    .stringType()
-                    .defaultValue("debezium-json")
-                    .withDescription("Specify the message format of log system.");
-
-    /** Specifies the startup mode for log consumer. */
-    public enum LogStartupMode implements DescribedEnum {
-        FULL(
-                "full",
-                "Perform a snapshot on the table upon first startup,"
-                        + " and continue to read the latest changes."),
-
-        LATEST("latest", "Start from the latest."),
-
-        FROM_TIMESTAMP("from-timestamp", "Start from user-supplied timestamp.");
-
-        private final String value;
-        private final String description;
-
-        LogStartupMode(String value, String description) {
-            this.value = value;
-            this.description = description;
-        }
-
-        @Override
-        public String toString() {
-            return value;
-        }
-
-        @Override
-        public InlineElement getDescription() {
-            return text(description);
-        }
-    }
-
-    /** Specifies the log consistency mode for table. */
-    public enum LogConsistency implements DescribedEnum {
-        TRANSACTIONAL(
-                "transactional",
-                "Only the data after the checkpoint can be seen by readers, the latency depends on checkpoint interval."),
-
-        EVENTUAL(
-                "eventual",
-                "Immediate data visibility, you may see some intermediate states, "
-                        + "but eventually the right results will be produced, only works for table with primary key.");
-
-        private final String value;
-        private final String description;
-
-        LogConsistency(String value, String description) {
-            this.value = value;
-            this.description = description;
-        }
-
-        @Override
-        public String toString() {
-            return value;
-        }
-
-        @Override
-        public InlineElement getDescription() {
-            return text(description);
-        }
-    }
-
-    /** Specifies the log changelog mode for table. */
-    public enum LogChangelogMode implements DescribedEnum {
-        AUTO("auto", "Upsert for table with primary key, all for table without primary key.."),
-
-        ALL("all", "The log system stores all changes including UPDATE_BEFORE."),
-
-        UPSERT(
-                "upsert",
-                "The log system does not store the UPDATE_BEFORE changes, the log consumed job"
-                        + " will automatically add the normalized node, relying on the state"
-                        + " to generate the required update_before.");
-
-        private final String value;
-        private final String description;
-
-        LogChangelogMode(String value, String description) {
-            this.value = value;
-            this.description = description;
-        }
-
-        @Override
-        public String toString() {
-            return value;
-        }
-
-        @Override
-        public InlineElement getDescription() {
-            return text(description);
-        }
-    }
-}
diff --git a/flink-table-store-core/src/main/java/org/apache/flink/table/store/log/LogStoreTableFactory.java b/flink-table-store-core/src/main/java/org/apache/flink/table/store/log/LogStoreTableFactory.java
index 4ae4db06..0df149ed 100644
--- a/flink-table-store-core/src/main/java/org/apache/flink/table/store/log/LogStoreTableFactory.java
+++ b/flink-table-store-core/src/main/java/org/apache/flink/table/store/log/LogStoreTableFactory.java
@@ -34,13 +34,15 @@ import org.apache.flink.table.factories.DynamicTableFactory;
 import org.apache.flink.table.factories.FactoryUtil;
 import org.apache.flink.table.factories.FactoryUtil.TableFactoryHelper;
 import org.apache.flink.table.factories.SerializationFormatFactory;
-import org.apache.flink.table.store.CoreOptions;
 import org.apache.flink.types.RowKind;
 
 import javax.annotation.Nullable;
 
 import java.util.Map;
 
+import static org.apache.flink.table.store.CoreOptions.LOG_FORMAT;
+import static org.apache.flink.table.store.CoreOptions.LOG_KEY_FORMAT;
+
 /**
  * Base interface for configuring a default log table connector. The log table is used by managed
  * table factory.
@@ -83,36 +85,32 @@ public interface LogStoreTableFactory extends DynamicTableFactory {
     static DecodingFormat<DeserializationSchema<RowData>> getKeyDecodingFormat(
             TableFactoryHelper helper) {
         DecodingFormat<DeserializationSchema<RowData>> format =
-                helper.discoverDecodingFormat(
-                        DeserializationFormatFactory.class, CoreOptions.LOG_KEY_FORMAT);
-        validateKeyFormat(format, helper.getOptions().get(CoreOptions.LOG_KEY_FORMAT));
+                helper.discoverDecodingFormat(DeserializationFormatFactory.class, LOG_KEY_FORMAT);
+        validateKeyFormat(format, helper.getOptions().get(LOG_KEY_FORMAT));
         return format;
     }
 
     static EncodingFormat<SerializationSchema<RowData>> getKeyEncodingFormat(
             TableFactoryHelper helper) {
         EncodingFormat<SerializationSchema<RowData>> format =
-                helper.discoverEncodingFormat(
-                        SerializationFormatFactory.class, CoreOptions.LOG_KEY_FORMAT);
-        validateKeyFormat(format, helper.getOptions().get(CoreOptions.LOG_KEY_FORMAT));
+                helper.discoverEncodingFormat(SerializationFormatFactory.class, LOG_KEY_FORMAT);
+        validateKeyFormat(format, helper.getOptions().get(LOG_KEY_FORMAT));
         return format;
     }
 
     static DecodingFormat<DeserializationSchema<RowData>> getValueDecodingFormat(
             TableFactoryHelper helper) {
         DecodingFormat<DeserializationSchema<RowData>> format =
-                helper.discoverDecodingFormat(
-                        DeserializationFormatFactory.class, CoreOptions.LOG_FORMAT);
-        validateValueFormat(format, helper.getOptions().get(CoreOptions.LOG_FORMAT));
+                helper.discoverDecodingFormat(DeserializationFormatFactory.class, LOG_FORMAT);
+        validateValueFormat(format, helper.getOptions().get(LOG_FORMAT));
         return format;
     }
 
     static EncodingFormat<SerializationSchema<RowData>> getValueEncodingFormat(
             TableFactoryHelper helper) {
         EncodingFormat<SerializationSchema<RowData>> format =
-                helper.discoverEncodingFormat(
-                        SerializationFormatFactory.class, CoreOptions.LOG_FORMAT);
-        validateValueFormat(format, helper.getOptions().get(CoreOptions.LOG_FORMAT));
+                helper.discoverEncodingFormat(SerializationFormatFactory.class, LOG_FORMAT);
+        validateValueFormat(format, helper.getOptions().get(LOG_FORMAT));
         return format;
     }
 
diff --git a/flink-table-store-core/src/main/java/org/apache/flink/table/store/table/sink/MemoryTableWrite.java b/flink-table-store-core/src/main/java/org/apache/flink/table/store/table/sink/MemoryTableWrite.java
index 1e997477..1f49b1b7 100644
--- a/flink-table-store-core/src/main/java/org/apache/flink/table/store/table/sink/MemoryTableWrite.java
+++ b/flink-table-store-core/src/main/java/org/apache/flink/table/store/table/sink/MemoryTableWrite.java
@@ -42,7 +42,7 @@ public abstract class MemoryTableWrite<T> extends AbstractTableWrite<T> {
         super(write, recordConverter);
 
         HeapMemorySegmentPool memoryPool =
-                new HeapMemorySegmentPool(options.writeBufferSize, options.pageSize);
+                new HeapMemorySegmentPool(options.writeBufferSize(), options.pageSize());
         this.memoryPoolFactory = new MemoryPoolFactory(memoryPool, this::memoryOwners);
     }
 
diff --git a/flink-table-store-core/src/test/java/org/apache/flink/table/store/file/mergetree/MergeTreeTest.java b/flink-table-store-core/src/test/java/org/apache/flink/table/store/file/mergetree/MergeTreeTest.java
index 2de27d14..eecb0f30 100644
--- a/flink-table-store-core/src/test/java/org/apache/flink/table/store/file/mergetree/MergeTreeTest.java
+++ b/flink-table-store-core/src/test/java/org/apache/flink/table/store/file/mergetree/MergeTreeTest.java
@@ -122,7 +122,7 @@ public class MergeTreeTest {
                                 valueType,
                                 flushingAvro,
                                 pathFactory,
-                                options.targetFileSize)
+                                options.targetFileSize())
                         .create(BinaryRowDataUtil.EMPTY_ROW, 0);
         writer = createMergeTreeWriter(Collections.emptyList());
     }
@@ -271,15 +271,16 @@ public class MergeTreeTest {
                         dataFileWriter.keyType(),
                         dataFileWriter.valueType(),
                         createCompactManager(dataFileWriter, service),
-                        new Levels(comparator, files, options.numLevels),
+                        new Levels(comparator, files, options.numLevels()),
                         maxSequenceNumber,
                         comparator,
                         new DeduplicateMergeFunction(),
                         dataFileWriter,
-                        options.commitForceCompact,
-                        options.numSortedRunStopTrigger,
+                        options.commitForceCompact(),
+                        options.numSortedRunStopTrigger(),
                         false);
-        writer.setMemoryPool(new HeapMemorySegmentPool(options.writeBufferSize, options.pageSize));
+        writer.setMemoryPool(
+                new HeapMemorySegmentPool(options.writeBufferSize(), options.pageSize()));
         return writer;
     }
 
@@ -287,9 +288,9 @@ public class MergeTreeTest {
             DataFileWriter dataFileWriter, ExecutorService compactExecutor) {
         CompactStrategy compactStrategy =
                 new UniversalCompaction(
-                        options.maxSizeAmplificationPercent,
-                        options.sizeRatio,
-                        options.numSortedRunCompactionTrigger);
+                        options.maxSizeAmplificationPercent(),
+                        options.sizeRatio(),
+                        options.numSortedRunCompactionTrigger());
         CompactRewriter rewriter =
                 (outputLevel, dropDelete, sections) ->
                         dataFileWriter.write(
@@ -302,7 +303,7 @@ public class MergeTreeTest {
                                                 new DeduplicateMergeFunction())),
                                 outputLevel);
         return new CompactManager(
-                compactExecutor, compactStrategy, comparator, options.targetFileSize, rewriter);
+                compactExecutor, compactStrategy, comparator, options.targetFileSize(), rewriter);
     }
 
     private void mergeCompacted(
diff --git a/flink-table-store-e2e-tests/src/test/java/org/apache/flink/table/store/tests/LogStoreE2eTest.java b/flink-table-store-e2e-tests/src/test/java/org/apache/flink/table/store/tests/LogStoreE2eTest.java
index da385956..487e3198 100644
--- a/flink-table-store-e2e-tests/src/test/java/org/apache/flink/table/store/tests/LogStoreE2eTest.java
+++ b/flink-table-store-e2e-tests/src/test/java/org/apache/flink/table/store/tests/LogStoreE2eTest.java
@@ -74,7 +74,7 @@ public class LogStoreE2eTest extends E2eTestBase {
                         + "    'root-path' = '%s',\n"
                         + "    'log.consistency' = 'eventual',\n"
                         + "    'log.system' = 'kafka',\n"
-                        + "    'log.kafka.bootstrap.servers' = '%s'\n"
+                        + "    'kafka.bootstrap.servers' = '%s'\n"
                         + ");";
         tableStoreStreamDdl =
                 String.format(
diff --git a/flink-table-store-hive/flink-table-store-hive-catalog/src/main/java/org/apache/flink/table/store/hive/HiveCatalogFactory.java b/flink-table-store-hive/flink-table-store-hive-catalog/src/main/java/org/apache/flink/table/store/hive/HiveCatalogFactory.java
index 1e2cbabb..ec0dc2d1 100644
--- a/flink-table-store-hive/flink-table-store-hive-catalog/src/main/java/org/apache/flink/table/store/hive/HiveCatalogFactory.java
+++ b/flink-table-store-hive/flink-table-store-hive-catalog/src/main/java/org/apache/flink/table/store/hive/HiveCatalogFactory.java
@@ -18,24 +18,18 @@
 
 package org.apache.flink.table.store.hive;
 
-import org.apache.flink.configuration.ConfigOption;
-import org.apache.flink.configuration.ConfigOptions;
 import org.apache.flink.configuration.ReadableConfig;
 import org.apache.flink.table.store.file.catalog.Catalog;
 import org.apache.flink.table.store.file.catalog.CatalogFactory;
 import org.apache.flink.util.Preconditions;
 
+import static org.apache.flink.table.store.CatalogOptions.URI;
+
 /** Factory to create {@link HiveCatalog}. */
 public class HiveCatalogFactory implements CatalogFactory {
 
     private static final String IDENTIFIER = "hive";
 
-    private static final ConfigOption<String> URI =
-            ConfigOptions.key("uri")
-                    .stringType()
-                    .noDefaultValue()
-                    .withDescription("Uri of Hive metastore's thrift server.");
-
     @Override
     public String identifier() {
         return IDENTIFIER;
diff --git a/flink-table-store-kafka/src/main/java/org/apache/flink/table/store/kafka/KafkaLogOptions.java b/flink-table-store-kafka/src/main/java/org/apache/flink/table/store/kafka/KafkaLogOptions.java
index 8cdb6d0c..4b4527b8 100644
--- a/flink-table-store-kafka/src/main/java/org/apache/flink/table/store/kafka/KafkaLogOptions.java
+++ b/flink-table-store-kafka/src/main/java/org/apache/flink/table/store/kafka/KafkaLogOptions.java
@@ -31,7 +31,7 @@ public class KafkaLogOptions {
                     .withDescription("Required Kafka server connection string");
 
     public static final ConfigOption<String> TOPIC =
-            ConfigOptions.key("topic")
+            ConfigOptions.key("kafka.topic")
                     .stringType()
                     .noDefaultValue()
                     .withDescription("Topic of this kafka table.");
diff --git a/flink-table-store-kafka/src/main/java/org/apache/flink/table/store/kafka/KafkaLogStoreFactory.java b/flink-table-store-kafka/src/main/java/org/apache/flink/table/store/kafka/KafkaLogStoreFactory.java
index 85870e9f..d05a06cc 100644
--- a/flink-table-store-kafka/src/main/java/org/apache/flink/table/store/kafka/KafkaLogStoreFactory.java
+++ b/flink-table-store-kafka/src/main/java/org/apache/flink/table/store/kafka/KafkaLogStoreFactory.java
@@ -57,8 +57,6 @@ import java.util.concurrent.ExecutionException;
 import static org.apache.flink.table.factories.FactoryUtil.createTableFactoryHelper;
 import static org.apache.flink.table.store.CoreOptions.LOG_CHANGELOG_MODE;
 import static org.apache.flink.table.store.CoreOptions.LOG_CONSISTENCY;
-import static org.apache.flink.table.store.CoreOptions.LOG_FORMAT;
-import static org.apache.flink.table.store.CoreOptions.LOG_KEY_FORMAT;
 import static org.apache.flink.table.store.CoreOptions.LOG_RETENTION;
 import static org.apache.flink.table.store.CoreOptions.LOG_SCAN;
 import static org.apache.flink.table.store.CoreOptions.LOG_SCAN_TIMESTAMP_MILLS;
@@ -88,16 +86,7 @@ public class KafkaLogStoreFactory implements LogStoreTableFactory {
 
     @Override
     public Set<ConfigOption<?>> optionalOptions() {
-        Set<ConfigOption<?>> options = new HashSet<>();
-        options.add(LOG_SCAN);
-        options.add(TOPIC);
-        options.add(LOG_SCAN_TIMESTAMP_MILLS);
-        options.add(LOG_RETENTION);
-        options.add(LOG_CONSISTENCY);
-        options.add(LOG_CHANGELOG_MODE);
-        options.add(LOG_KEY_FORMAT);
-        options.add(LOG_FORMAT);
-        return options;
+        return new HashSet<>();
     }
 
     @Override
@@ -119,12 +108,10 @@ public class KafkaLogStoreFactory implements LogStoreTableFactory {
 
     @Override
     public void onCreateTable(Context context, int numBucket, boolean ignoreIfExists) {
-        FactoryUtil.TableFactoryHelper helper = createTableFactoryHelper(this, context);
-        helper.validateExcept(KAFKA_PREFIX);
-        try (AdminClient adminClient = AdminClient.create(toKafkaProperties(helper.getOptions()))) {
+        Configuration options = Configuration.fromMap(context.getCatalogTable().getOptions());
+        try (AdminClient adminClient = AdminClient.create(toKafkaProperties(options))) {
             Map<String, String> configs = new HashMap<>();
-            helper.getOptions()
-                    .getOptional(LOG_RETENTION)
+            options.getOptional(LOG_RETENTION)
                     .ifPresent(
                             retention ->
                                     configs.put(
diff --git a/flink-table-store-kafka/src/test/java/org/apache/flink/table/store/kafka/KafkaLogTestUtils.java b/flink-table-store-kafka/src/test/java/org/apache/flink/table/store/kafka/KafkaLogTestUtils.java
index 343525e6..8c82a433 100644
--- a/flink-table-store-kafka/src/test/java/org/apache/flink/table/store/kafka/KafkaLogTestUtils.java
+++ b/flink-table-store-kafka/src/test/java/org/apache/flink/table/store/kafka/KafkaLogTestUtils.java
@@ -35,6 +35,8 @@ import org.apache.flink.table.factories.FactoryUtil;
 import org.apache.flink.table.runtime.connector.sink.SinkRuntimeProviderContext;
 import org.apache.flink.table.runtime.connector.source.ScanRuntimeProviderContext;
 import org.apache.flink.table.runtime.typeutils.InternalTypeInfo;
+import org.apache.flink.table.store.CoreOptions.LogChangelogMode;
+import org.apache.flink.table.store.CoreOptions.LogConsistency;
 import org.apache.flink.table.store.log.LogStoreTableFactory;
 import org.apache.flink.table.store.table.sink.SinkRecord;
 import org.apache.flink.table.types.DataType;
@@ -56,8 +58,6 @@ import java.util.stream.IntStream;
 import static org.apache.flink.table.data.binary.BinaryRowDataUtil.EMPTY_ROW;
 import static org.apache.flink.table.store.CoreOptions.LOG_CHANGELOG_MODE;
 import static org.apache.flink.table.store.CoreOptions.LOG_CONSISTENCY;
-import static org.apache.flink.table.store.CoreOptions.LogChangelogMode;
-import static org.apache.flink.table.store.CoreOptions.LogConsistency;
 import static org.apache.flink.table.store.file.mergetree.compact.CompactManagerTest.row;
 import static org.apache.flink.table.store.kafka.KafkaLogOptions.BOOTSTRAP_SERVERS;
 import static org.apache.flink.table.store.kafka.KafkaLogOptions.TOPIC;