You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by lz...@apache.org on 2022/03/15 13:16:22 UTC

[flink-table-store] branch master updated: [FLINK-26535] Introduce StoreTableSource And StoreTableSink

This is an automated email from the ASF dual-hosted git repository.

lzljs3620320 pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/flink-table-store.git


The following commit(s) were added to refs/heads/master by this push:
     new 0139e9e  [FLINK-26535] Introduce StoreTableSource And StoreTableSink
0139e9e is described below

commit 0139e9ef70c353bf441814b045a6372f227abcfd
Author: Jane Chan <55...@users.noreply.github.com>
AuthorDate: Tue Mar 15 21:16:14 2022 +0800

    [FLINK-26535] Introduce StoreTableSource And StoreTableSink
    
    This closes #41
---
 flink-table-store-connector/pom.xml                |  40 ++
 .../flink/table/store/connector/TableStore.java    |  36 +-
 .../table/store/connector/TableStoreFactory.java   | 121 ++++-
 .../table/store/connector/sink/TableStoreSink.java | 168 ++++++
 .../store/connector/source/FileStoreSource.java    |  27 +-
 .../store/connector/source/TableStoreSource.java   | 232 +++++++++
 .../table/store/connector/CreateTableITCase.java   | 238 +++++++++
 .../table/store/connector/DropTableITCase.java     | 242 +++++++++
 .../store/connector/ReadWriteTableITCase.java      | 300 +++++++++++
 .../table/store/connector/ShowCreateUtil.java      | 180 +++++++
 .../store/connector/TableStoreFactoryTest.java     |  48 +-
 .../table/store/connector/TableStoreITCase.java    | 576 ---------------------
 .../table/store/connector/TableStoreTestBase.java  | 198 +++++++
 .../store/file/mergetree/MergeTreeOptions.java     |  16 +
 .../store/file/predicate/PredicateConverter.java   |  13 +
 15 files changed, 1828 insertions(+), 607 deletions(-)

diff --git a/flink-table-store-connector/pom.xml b/flink-table-store-connector/pom.xml
index e4e8f38..f7baacb 100644
--- a/flink-table-store-connector/pom.xml
+++ b/flink-table-store-connector/pom.xml
@@ -151,6 +151,46 @@ under the License.
         </dependency>
 
         <dependency>
+            <groupId>org.apache.flink</groupId>
+            <artifactId>flink-table-planner_2.12</artifactId>
+            <version>${flink.version}</version>
+            <scope>test</scope>
+            <type>test-jar</type>
+        </dependency>
+
+        <dependency>
+            <groupId>org.apache.flink</groupId>
+            <artifactId>flink-orc</artifactId>
+            <version>${flink.version}</version>
+            <scope>test</scope>
+        </dependency>
+
+        <!-- Tests: Hadoop required by ORC -->
+        <dependency>
+            <groupId>org.apache.hadoop</groupId>
+            <artifactId>hadoop-common</artifactId>
+            <version>${hadoop.version}</version>
+            <scope>test</scope>
+            <exclusions>
+                <exclusion>
+                    <groupId>log4j</groupId>
+                    <artifactId>log4j</artifactId>
+                </exclusion>
+                <exclusion>
+                    <groupId>org.slf4j</groupId>
+                    <artifactId>slf4j-log4j12</artifactId>
+                </exclusion>
+            </exclusions>
+        </dependency>
+
+        <dependency>
+            <groupId>org.apache.hadoop</groupId>
+            <artifactId>hadoop-hdfs-client</artifactId>
+            <version>${hadoop.version}</version>
+            <scope>test</scope>
+        </dependency>
+
+        <dependency>
             <groupId>junit</groupId>
             <artifactId>junit</artifactId>
             <version>${junit4.version}</version>
diff --git a/flink-table-store-connector/src/main/java/org/apache/flink/table/store/connector/TableStore.java b/flink-table-store-connector/src/main/java/org/apache/flink/table/store/connector/TableStore.java
index a8667f1..534cbe7 100644
--- a/flink-table-store-connector/src/main/java/org/apache/flink/table/store/connector/TableStore.java
+++ b/flink-table-store-connector/src/main/java/org/apache/flink/table/store/connector/TableStore.java
@@ -108,6 +108,23 @@ public class TableStore {
         return this;
     }
 
+    public boolean partitioned() {
+        return partitions.length > 0;
+    }
+
+    public boolean valueCountMode() {
+        return primaryKeys.length == 0;
+    }
+
+    public List<String> fieldNames() {
+        return type.getFieldNames();
+    }
+
+    public List<String> partitionKeys() {
+        RowType partitionType = ProjectionUtils.project(type, partitions);
+        return partitionType.getFieldNames();
+    }
+
     public SourceBuilder sourceBuilder() {
         return new SourceBuilder();
     }
@@ -154,7 +171,9 @@ public class TableStore {
 
         @Nullable private int[][] projectedFields;
 
-        @Nullable private Predicate predicate;
+        @Nullable private Predicate partitionPredicate;
+
+        @Nullable private Predicate fieldPredicate;
 
         @Nullable private LogSourceProvider logSourceProvider;
 
@@ -163,8 +182,13 @@ public class TableStore {
             return this;
         }
 
-        public SourceBuilder withPredicate(Predicate predicate) {
-            this.predicate = predicate;
+        public SourceBuilder withPartitionPredicate(Predicate partitionPredicate) {
+            this.partitionPredicate = partitionPredicate;
+            return this;
+        }
+
+        public SourceBuilder withFieldPredicate(Predicate fieldPredicate) {
+            this.fieldPredicate = fieldPredicate;
             return this;
         }
 
@@ -186,7 +210,11 @@ public class TableStore {
         private FileStoreSource buildFileStoreSource() {
             FileStore fileStore = buildFileStore();
             return new FileStoreSource(
-                    fileStore, primaryKeys.length == 0, projectedFields, predicate);
+                    fileStore,
+                    primaryKeys.length == 0,
+                    projectedFields,
+                    partitionPredicate,
+                    fieldPredicate);
         }
 
         public Source<RowData, ?, ?> build() {
diff --git a/flink-table-store-connector/src/main/java/org/apache/flink/table/store/connector/TableStoreFactory.java b/flink-table-store-connector/src/main/java/org/apache/flink/table/store/connector/TableStoreFactory.java
index e846a0a..731f5c3 100644
--- a/flink-table-store-connector/src/main/java/org/apache/flink/table/store/connector/TableStoreFactory.java
+++ b/flink-table-store-connector/src/main/java/org/apache/flink/table/store/connector/TableStoreFactory.java
@@ -19,16 +19,30 @@
 package org.apache.flink.table.store.connector;
 
 import org.apache.flink.annotation.VisibleForTesting;
+import org.apache.flink.api.common.RuntimeExecutionMode;
 import org.apache.flink.configuration.ConfigOption;
 import org.apache.flink.configuration.Configuration;
+import org.apache.flink.configuration.ExecutionOptions;
 import org.apache.flink.core.fs.Path;
 import org.apache.flink.table.api.TableException;
 import org.apache.flink.table.catalog.CatalogPartitionSpec;
 import org.apache.flink.table.catalog.ObjectIdentifier;
+import org.apache.flink.table.catalog.ResolvedCatalogTable;
+import org.apache.flink.table.catalog.ResolvedSchema;
+import org.apache.flink.table.connector.sink.DynamicTableSink;
+import org.apache.flink.table.connector.source.DynamicTableSource;
+import org.apache.flink.table.factories.DynamicTableSinkFactory;
+import org.apache.flink.table.factories.DynamicTableSourceFactory;
 import org.apache.flink.table.factories.FactoryUtil;
 import org.apache.flink.table.factories.ManagedTableFactory;
+import org.apache.flink.table.store.connector.sink.TableStoreSink;
+import org.apache.flink.table.store.connector.source.TableStoreSource;
 import org.apache.flink.table.store.file.FileStoreOptions;
+import org.apache.flink.table.store.file.mergetree.MergeTreeOptions;
+import org.apache.flink.table.store.log.LogOptions;
 import org.apache.flink.table.store.log.LogStoreTableFactory;
+import org.apache.flink.table.types.logical.RowType;
+import org.apache.flink.util.Preconditions;
 
 import java.io.IOException;
 import java.io.UncheckedIOException;
@@ -36,17 +50,20 @@ import java.util.Collections;
 import java.util.HashMap;
 import java.util.Map;
 import java.util.Set;
+import java.util.stream.Collectors;
 
 import static org.apache.flink.table.store.connector.TableStoreFactoryOptions.CHANGE_TRACKING;
 import static org.apache.flink.table.store.connector.TableStoreFactoryOptions.LOG_SYSTEM;
 import static org.apache.flink.table.store.file.FileStoreOptions.BUCKET;
 import static org.apache.flink.table.store.file.FileStoreOptions.FILE_PATH;
 import static org.apache.flink.table.store.file.FileStoreOptions.TABLE_STORE_PREFIX;
+import static org.apache.flink.table.store.log.LogOptions.CHANGELOG_MODE;
 import static org.apache.flink.table.store.log.LogOptions.LOG_PREFIX;
 import static org.apache.flink.table.store.log.LogStoreTableFactory.discoverLogStoreFactory;
 
 /** Default implementation of {@link ManagedTableFactory}. */
-public class TableStoreFactory implements ManagedTableFactory {
+public class TableStoreFactory
+        implements ManagedTableFactory, DynamicTableSourceFactory, DynamicTableSinkFactory {
 
     @Override
     public Map<String, String> enrichOptions(Context context) {
@@ -130,6 +147,41 @@ public class TableStoreFactory implements ManagedTableFactory {
     }
 
     @Override
+    public DynamicTableSource createDynamicTableSource(Context context) {
+        boolean streaming =
+                context.getConfiguration().get(ExecutionOptions.RUNTIME_MODE)
+                        == RuntimeExecutionMode.STREAMING;
+        Context logStoreContext = null;
+        LogStoreTableFactory logStoreTableFactory = null;
+
+        if (enableChangeTracking(context.getCatalogTable().getOptions())) {
+            logStoreContext = createLogContext(context);
+            logStoreTableFactory = createLogStoreTableFactory(context);
+        }
+        return new TableStoreSource(
+                buildTableStore(context), streaming, logStoreContext, logStoreTableFactory);
+    }
+
+    @Override
+    public DynamicTableSink createDynamicTableSink(Context context) {
+        Map<String, String> options = context.getCatalogTable().getOptions();
+        Context logStoreContext = null;
+        LogStoreTableFactory logStoreTableFactory = null;
+        if (enableChangeTracking(options)) {
+            logStoreContext = createLogContext(context);
+            logStoreTableFactory = createLogStoreTableFactory(context);
+        }
+        return new TableStoreSink(
+                buildTableStore(context),
+                LogOptions.LogChangelogMode.valueOf(
+                        options.getOrDefault(
+                                LOG_PREFIX + CHANGELOG_MODE.key(),
+                                CHANGELOG_MODE.defaultValue().toString().toUpperCase())),
+                logStoreContext,
+                logStoreTableFactory);
+    }
+
+    @Override
     public Set<ConfigOption<?>> requiredOptions() {
         return Collections.emptySet();
     }
@@ -137,13 +189,14 @@ public class TableStoreFactory implements ManagedTableFactory {
     @Override
     public Set<ConfigOption<?>> optionalOptions() {
         Set<ConfigOption<?>> options = FileStoreOptions.allOptions();
+        options.addAll(MergeTreeOptions.allOptions());
         options.add(CHANGE_TRACKING);
         return options;
     }
 
     // ~ Tools ------------------------------------------------------------------
 
-    private LogStoreTableFactory createLogStoreTableFactory(Context context) {
+    private static LogStoreTableFactory createLogStoreTableFactory(Context context) {
         return discoverLogStoreFactory(
                 context.getClassLoader(),
                 context.getCatalogTable()
@@ -151,7 +204,7 @@ public class TableStoreFactory implements ManagedTableFactory {
                         .getOrDefault(LOG_SYSTEM.key(), LOG_SYSTEM.defaultValue()));
     }
 
-    private Context createLogContext(Context context) {
+    private static Context createLogContext(Context context) {
         return new FactoryUtil.DefaultDynamicTableContext(
                 context.getObjectIdentifier(),
                 context.getCatalogTable()
@@ -163,20 +216,35 @@ public class TableStoreFactory implements ManagedTableFactory {
     }
 
     @VisibleForTesting
-    Map<String, String> filterLogStoreOptions(Map<String, String> enrichedOptions) {
-        Map<String, String> logStoreOptions = new HashMap<>();
-        enrichedOptions.forEach(
-                (k, v) -> {
-                    if (k.startsWith(LOG_PREFIX)) {
-                        logStoreOptions.put(k.substring(LOG_PREFIX.length()), v);
-                    }
-                });
-        return logStoreOptions;
+    static Map<String, String> filterLogStoreOptions(Map<String, String> options) {
+        return options.entrySet().stream()
+                .filter(entry -> entry.getKey().startsWith(LOG_PREFIX))
+                .collect(
+                        Collectors.toMap(
+                                entry -> entry.getKey().substring(LOG_PREFIX.length()),
+                                Map.Entry::getValue));
     }
 
-    private static Path tablePath(Map<String, String> options, ObjectIdentifier identifier) {
+    @VisibleForTesting
+    static Map<String, String> filterFileStoreOptions(Map<String, String> options) {
+        return options.entrySet().stream()
+                .filter(entry -> !entry.getKey().startsWith(LOG_PREFIX))
+                .collect(Collectors.toMap(Map.Entry::getKey, Map.Entry::getValue));
+    }
+
+    @VisibleForTesting
+    static Path tablePath(Map<String, String> options, ObjectIdentifier identifier) {
+        Preconditions.checkArgument(
+                options.containsKey(FILE_PATH.key()),
+                String.format(
+                        "Failed to create file store path. "
+                                + "Please specify a root dir by setting session level configuration "
+                                + "as `SET 'table-store.%s' = '...'`. "
+                                + "Alternatively, you can use a per-table root dir "
+                                + "as `CREATE TABLE ${table} (...) WITH ('%s' = '...')`",
+                        FILE_PATH.key(), FILE_PATH.key()));
         return new Path(
-                new Path(options.get(FILE_PATH.key())),
+                options.get(FILE_PATH.key()),
                 String.format(
                         "root/%s.catalog/%s.db/%s",
                         identifier.getCatalogName(),
@@ -184,9 +252,32 @@ public class TableStoreFactory implements ManagedTableFactory {
                         identifier.getObjectName()));
     }
 
-    private static boolean enableChangeTracking(Map<String, String> options) {
+    @VisibleForTesting
+    static boolean enableChangeTracking(Map<String, String> options) {
         return Boolean.parseBoolean(
                 options.getOrDefault(
                         CHANGE_TRACKING.key(), CHANGE_TRACKING.defaultValue().toString()));
     }
+
+    private TableStore buildTableStore(Context context) {
+        ResolvedCatalogTable catalogTable = context.getCatalogTable();
+        ResolvedSchema schema = catalogTable.getResolvedSchema();
+        RowType rowType = (RowType) schema.toPhysicalRowDataType().getLogicalType();
+        int[] primaryKeys = new int[0];
+        if (schema.getPrimaryKey().isPresent()) {
+            primaryKeys =
+                    schema.getPrimaryKey().get().getColumns().stream()
+                            .mapToInt(rowType.getFieldNames()::indexOf)
+                            .toArray();
+        }
+        return new TableStore(
+                        Configuration.fromMap(filterFileStoreOptions(catalogTable.getOptions())))
+                .withTableIdentifier(context.getObjectIdentifier())
+                .withSchema(rowType)
+                .withPrimaryKeys(primaryKeys)
+                .withPartitions(
+                        catalogTable.getPartitionKeys().stream()
+                                .mapToInt(rowType.getFieldNames()::indexOf)
+                                .toArray());
+    }
 }
diff --git a/flink-table-store-connector/src/main/java/org/apache/flink/table/store/connector/sink/TableStoreSink.java b/flink-table-store-connector/src/main/java/org/apache/flink/table/store/connector/sink/TableStoreSink.java
new file mode 100644
index 0000000..9b442a3
--- /dev/null
+++ b/flink-table-store-connector/src/main/java/org/apache/flink/table/store/connector/sink/TableStoreSink.java
@@ -0,0 +1,168 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.store.connector.sink;
+
+import org.apache.flink.api.common.typeinfo.TypeInformation;
+import org.apache.flink.streaming.api.datastream.DataStream;
+import org.apache.flink.table.catalog.CatalogLock;
+import org.apache.flink.table.connector.ChangelogMode;
+import org.apache.flink.table.connector.RequireCatalogLock;
+import org.apache.flink.table.connector.sink.DataStreamSinkProvider;
+import org.apache.flink.table.connector.sink.DynamicTableSink;
+import org.apache.flink.table.connector.sink.abilities.SupportsOverwrite;
+import org.apache.flink.table.connector.sink.abilities.SupportsPartitioning;
+import org.apache.flink.table.factories.DynamicTableFactory;
+import org.apache.flink.table.store.connector.TableStore;
+import org.apache.flink.table.store.log.LogOptions;
+import org.apache.flink.table.store.log.LogSinkProvider;
+import org.apache.flink.table.store.log.LogStoreTableFactory;
+import org.apache.flink.table.types.DataType;
+import org.apache.flink.table.types.logical.LogicalType;
+import org.apache.flink.types.RowKind;
+
+import javax.annotation.Nullable;
+
+import java.util.HashMap;
+import java.util.Map;
+
+/** Table sink to create {@link StoreSink}. */
+public class TableStoreSink
+        implements DynamicTableSink, SupportsOverwrite, SupportsPartitioning, RequireCatalogLock {
+
+    private final TableStore tableStore;
+    private final LogOptions.LogChangelogMode logChangelogMode;
+    @Nullable private final DynamicTableFactory.Context logStoreContext;
+    @Nullable private final LogStoreTableFactory logStoreTableFactory;
+
+    private Map<String, String> staticPartitions = new HashMap<>();
+    private boolean overwrite = false;
+    @Nullable private CatalogLock.Factory lockFactory;
+
+    public TableStoreSink(
+            TableStore tableStore,
+            LogOptions.LogChangelogMode logChangelogMode,
+            @Nullable DynamicTableFactory.Context logStoreContext,
+            @Nullable LogStoreTableFactory logStoreTableFactory) {
+        this.tableStore = tableStore;
+        this.logChangelogMode = logChangelogMode;
+        this.logStoreContext = logStoreContext;
+        this.logStoreTableFactory = logStoreTableFactory;
+    }
+
+    @Override
+    public ChangelogMode getChangelogMode(ChangelogMode requestedMode) {
+        if (!tableStore.valueCountMode()
+                && logChangelogMode == LogOptions.LogChangelogMode.UPSERT) {
+            ChangelogMode.Builder builder = ChangelogMode.newBuilder();
+            for (RowKind kind : requestedMode.getContainedKinds()) {
+                if (kind != RowKind.UPDATE_BEFORE) {
+                    builder.addContainedKind(kind);
+                }
+            }
+            return builder.build();
+        }
+        return requestedMode;
+    }
+
+    @Override
+    public SinkRuntimeProvider getSinkRuntimeProvider(Context context) {
+        LogSinkProvider logSinkProvider = null;
+        if (logStoreTableFactory != null) {
+            logSinkProvider =
+                    logStoreTableFactory.createSinkProvider(
+                            logStoreContext,
+                            new LogStoreTableFactory.SinkContext() {
+                                @Override
+                                public boolean isBounded() {
+                                    return context.isBounded();
+                                }
+
+                                @Override
+                                public <T> TypeInformation<T> createTypeInformation(
+                                        DataType consumedDataType) {
+                                    return context.createTypeInformation(consumedDataType);
+                                }
+
+                                @Override
+                                public <T> TypeInformation<T> createTypeInformation(
+                                        LogicalType consumedLogicalType) {
+                                    return context.createTypeInformation(consumedLogicalType);
+                                }
+
+                                @Override
+                                public DynamicTableSink.DataStructureConverter
+                                        createDataStructureConverter(DataType consumedDataType) {
+                                    return context.createDataStructureConverter(consumedDataType);
+                                }
+                            });
+        }
+        final LogSinkProvider finalLogSinkProvider = logSinkProvider;
+        return (DataStreamSinkProvider)
+                (providerContext, dataStream) ->
+                        tableStore
+                                .sinkBuilder()
+                                .withInput(
+                                        new DataStream<>(
+                                                dataStream.getExecutionEnvironment(),
+                                                dataStream.getTransformation()))
+                                .withLockFactory(lockFactory)
+                                .withLogSinkProvider(finalLogSinkProvider)
+                                .withOverwritePartition(overwrite ? staticPartitions : null)
+                                .build();
+    }
+
+    @Override
+    public DynamicTableSink copy() {
+        TableStoreSink copied =
+                new TableStoreSink(
+                        tableStore, logChangelogMode, logStoreContext, logStoreTableFactory);
+        copied.staticPartitions = new HashMap<>(staticPartitions);
+        copied.overwrite = overwrite;
+        copied.lockFactory = lockFactory;
+        return copied;
+    }
+
+    @Override
+    public String asSummaryString() {
+        return "TableStoreSink";
+    }
+
+    @Override
+    public void applyStaticPartition(Map<String, String> partition) {
+        tableStore
+                .partitionKeys()
+                .forEach(
+                        partitionKey -> {
+                            if (partition.containsKey(partitionKey)) {
+                                this.staticPartitions.put(
+                                        partitionKey, partition.get(partitionKey));
+                            }
+                        });
+    }
+
+    @Override
+    public void applyOverwrite(boolean overwrite) {
+        this.overwrite = overwrite;
+    }
+
+    @Override
+    public void setLockFactory(@Nullable CatalogLock.Factory lockFactory) {
+        this.lockFactory = lockFactory;
+    }
+}
diff --git a/flink-table-store-connector/src/main/java/org/apache/flink/table/store/connector/source/FileStoreSource.java b/flink-table-store-connector/src/main/java/org/apache/flink/table/store/connector/source/FileStoreSource.java
index b595df5..6a72c55 100644
--- a/flink-table-store-connector/src/main/java/org/apache/flink/table/store/connector/source/FileStoreSource.java
+++ b/flink-table-store-connector/src/main/java/org/apache/flink/table/store/connector/source/FileStoreSource.java
@@ -47,17 +47,21 @@ public class FileStoreSource
 
     @Nullable private final int[][] projectedFields;
 
-    @Nullable private final Predicate predicate;
+    @Nullable private final Predicate partitionPredicate;
+
+    @Nullable private final Predicate fieldPredicate;
 
     public FileStoreSource(
             FileStore fileStore,
             boolean valueCountMode,
             @Nullable int[][] projectedFields,
-            @Nullable Predicate predicate) {
+            @Nullable Predicate partitionPredicate,
+            final Predicate fieldPredicate) {
         this.fileStore = fileStore;
         this.valueCountMode = valueCountMode;
         this.projectedFields = projectedFields;
-        this.predicate = predicate;
+        this.partitionPredicate = partitionPredicate;
+        this.fieldPredicate = fieldPredicate;
     }
 
     @Override
@@ -84,14 +88,15 @@ public class FileStoreSource
     public SplitEnumerator<FileStoreSourceSplit, PendingSplitsCheckpoint> createEnumerator(
             SplitEnumeratorContext<FileStoreSourceSplit> context) {
         FileStoreScan scan = fileStore.newScan();
-        if (predicate != null) {
-            // TODO split predicate into partitionPredicate and fieldsPredicate
-            //            scan.withPartitionFilter(partitionPredicate);
-            //            if (keyAsRecord) {
-            //                scan.withKeyFilter(fieldsPredicate);
-            //            } else {
-            //                scan.withValueFilter(fieldsPredicate);
-            //            }
+        if (partitionPredicate != null) {
+            scan.withPartitionFilter(partitionPredicate);
+        }
+        if (fieldPredicate != null) {
+            if (valueCountMode) {
+                scan.withKeyFilter(fieldPredicate);
+            } else {
+                scan.withValueFilter(fieldPredicate);
+            }
         }
         return new StaticFileStoreSplitEnumerator(context, scan);
     }
diff --git a/flink-table-store-connector/src/main/java/org/apache/flink/table/store/connector/source/TableStoreSource.java b/flink-table-store-connector/src/main/java/org/apache/flink/table/store/connector/source/TableStoreSource.java
new file mode 100644
index 0000000..66c2da2
--- /dev/null
+++ b/flink-table-store-connector/src/main/java/org/apache/flink/table/store/connector/source/TableStoreSource.java
@@ -0,0 +1,232 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.store.connector.source;
+
+import org.apache.flink.api.common.typeinfo.TypeInformation;
+import org.apache.flink.table.connector.ChangelogMode;
+import org.apache.flink.table.connector.source.DynamicTableSource;
+import org.apache.flink.table.connector.source.ScanTableSource;
+import org.apache.flink.table.connector.source.SourceProvider;
+import org.apache.flink.table.connector.source.abilities.SupportsFilterPushDown;
+import org.apache.flink.table.connector.source.abilities.SupportsProjectionPushDown;
+import org.apache.flink.table.expressions.CallExpression;
+import org.apache.flink.table.expressions.Expression;
+import org.apache.flink.table.expressions.ExpressionVisitor;
+import org.apache.flink.table.expressions.FieldReferenceExpression;
+import org.apache.flink.table.expressions.ResolvedExpression;
+import org.apache.flink.table.expressions.TypeLiteralExpression;
+import org.apache.flink.table.expressions.ValueLiteralExpression;
+import org.apache.flink.table.factories.DynamicTableFactory;
+import org.apache.flink.table.store.connector.TableStore;
+import org.apache.flink.table.store.file.predicate.PredicateConverter;
+import org.apache.flink.table.store.log.LogSourceProvider;
+import org.apache.flink.table.store.log.LogStoreTableFactory;
+import org.apache.flink.table.types.DataType;
+import org.apache.flink.table.types.logical.LogicalType;
+
+import javax.annotation.Nullable;
+
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.List;
+import java.util.stream.Collectors;
+
+/**
+ * Table source to create {@link FileStoreSource} under batch mode or change-tracking is disabled.
+ * For streaming mode with change-tracking enabled and FULL scan mode, it will create a {@link
+ * org.apache.flink.connector.base.source.hybrid.HybridSource} of {@link FileStoreSource} and kafka
+ * log source created by {@link LogSourceProvider}.
+ */
+public class TableStoreSource
+        implements ScanTableSource, SupportsFilterPushDown, SupportsProjectionPushDown {
+
+    private final TableStore tableStore;
+    private final boolean streaming;
+    @Nullable private final DynamicTableFactory.Context logStoreContext;
+    @Nullable private final LogStoreTableFactory logStoreTableFactory;
+
+    private List<ResolvedExpression> partitionFilters = new ArrayList<>();
+    private List<ResolvedExpression> fieldFilters = new ArrayList<>();
+    @Nullable private int[][] projectFields;
+
+    public TableStoreSource(
+            TableStore tableStore,
+            boolean streaming,
+            @Nullable DynamicTableFactory.Context logStoreContext,
+            @Nullable LogStoreTableFactory logStoreTableFactory) {
+        this.tableStore = tableStore;
+        this.streaming = streaming;
+        this.logStoreContext = logStoreContext;
+        this.logStoreTableFactory = logStoreTableFactory;
+    }
+
+    @Override
+    public ChangelogMode getChangelogMode() {
+        return streaming
+                ? tableStore.valueCountMode()
+                        ? ChangelogMode.all()
+                        // TODO: optimize upsert when consistency mode is transactional and
+                        // log.changelog-mode is all
+                        : ChangelogMode.upsert()
+                : ChangelogMode.insertOnly();
+    }
+
+    @Override
+    public ScanRuntimeProvider getScanRuntimeProvider(ScanContext scanContext) {
+        LogSourceProvider logSourceProvider = null;
+        if (logStoreTableFactory != null) {
+            logSourceProvider =
+                    logStoreTableFactory.createSourceProvider(
+                            logStoreContext,
+                            new LogStoreTableFactory.SourceContext() {
+                                @Override
+                                public <T> TypeInformation<T> createTypeInformation(
+                                        DataType producedDataType) {
+                                    return scanContext.createTypeInformation(producedDataType);
+                                }
+
+                                @Override
+                                public <T> TypeInformation<T> createTypeInformation(
+                                        LogicalType producedLogicalType) {
+                                    return scanContext.createTypeInformation(producedLogicalType);
+                                }
+
+                                @Override
+                                public DataStructureConverter createDataStructureConverter(
+                                        DataType producedDataType) {
+                                    return scanContext.createDataStructureConverter(
+                                            producedDataType);
+                                }
+                            });
+        }
+        TableStore.SourceBuilder builder =
+                tableStore
+                        .sourceBuilder()
+                        .withContinuousMode(streaming)
+                        .withHybridMode(streaming && logSourceProvider != null)
+                        .withLogSourceProvider(logSourceProvider)
+                        .withProjection(projectFields)
+                        .withPartitionPredicate(PredicateConverter.convert(partitionFilters))
+                        .withFieldPredicate(PredicateConverter.convert(fieldFilters));
+        return SourceProvider.of(builder.build());
+    }
+
+    @Override
+    public DynamicTableSource copy() {
+        TableStoreSource copied =
+                new TableStoreSource(tableStore, streaming, logStoreContext, logStoreTableFactory);
+        copied.partitionFilters = new ArrayList<>(partitionFilters);
+        copied.fieldFilters = new ArrayList<>(fieldFilters);
+        copied.projectFields = projectFields;
+        return copied;
+    }
+
+    @Override
+    public String asSummaryString() {
+        return "TableStoreSource";
+    }
+
+    @Override
+    public Result applyFilters(List<ResolvedExpression> filters) {
+        if (tableStore.partitioned()) {
+            classifyFilters(filters);
+        } else {
+            fieldFilters = filters;
+        }
+        return Result.of(
+                new ArrayList<>(filters),
+                fieldFilters == null ? Collections.emptyList() : fieldFilters);
+    }
+
+    @Override
+    public boolean supportsNestedProjection() {
+        return false;
+    }
+
+    @Override
+    public void applyProjection(int[][] projectedFields, DataType producedDataType) {
+        this.projectFields = projectedFields;
+    }
+
+    private void classifyFilters(List<ResolvedExpression> filters) {
+        List<String> fieldNames = tableStore.fieldNames();
+        List<String> partitionKeys = tableStore.partitionKeys();
+        PartitionIndexVisitor visitor =
+                new PartitionIndexVisitor(
+                        fieldNames.stream().mapToInt(partitionKeys::indexOf).toArray());
+        filters.forEach(
+                filter -> {
+                    try {
+                        partitionFilters.add(filter.accept(visitor));
+                    } catch (FoundFieldReference e) {
+                        fieldFilters.add(filter);
+                    }
+                });
+    }
+
+    private static class PartitionIndexVisitor implements ExpressionVisitor<ResolvedExpression> {
+
+        private final int[] mapping;
+
+        PartitionIndexVisitor(int[] mapping) {
+            this.mapping = mapping;
+        }
+
+        @Override
+        public ResolvedExpression visit(CallExpression call) {
+            return CallExpression.anonymous(
+                    call.getFunctionDefinition(),
+                    call.getResolvedChildren().stream()
+                            .map(e -> e.accept(this))
+                            .collect(Collectors.toList()),
+                    call.getOutputDataType());
+        }
+
+        @Override
+        public ResolvedExpression visit(ValueLiteralExpression valueLiteral) {
+            return valueLiteral;
+        }
+
+        @Override
+        public ResolvedExpression visit(FieldReferenceExpression fieldReference) {
+            int adjustIndex = mapping[fieldReference.getFieldIndex()];
+            if (adjustIndex == -1) {
+                // not a partition field
+                throw new FoundFieldReference();
+            }
+            return new FieldReferenceExpression(
+                    fieldReference.getName(),
+                    fieldReference.getOutputDataType(),
+                    fieldReference.getInputIndex(),
+                    adjustIndex);
+        }
+
+        @Override
+        public ResolvedExpression visit(TypeLiteralExpression typeLiteral) {
+            return typeLiteral;
+        }
+
+        @Override
+        public ResolvedExpression visit(Expression other) {
+            return (ResolvedExpression) other;
+        }
+    }
+
+    private static class FoundFieldReference extends RuntimeException {}
+}
diff --git a/flink-table-store-connector/src/test/java/org/apache/flink/table/store/connector/CreateTableITCase.java b/flink-table-store-connector/src/test/java/org/apache/flink/table/store/connector/CreateTableITCase.java
new file mode 100644
index 0000000..083aaa0
--- /dev/null
+++ b/flink-table-store-connector/src/test/java/org/apache/flink/table/store/connector/CreateTableITCase.java
@@ -0,0 +1,238 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.store.connector;
+
+import org.apache.flink.api.common.RuntimeExecutionMode;
+import org.apache.flink.table.api.TableException;
+import org.apache.flink.table.api.internal.TableEnvironmentImpl;
+import org.apache.flink.table.catalog.ObjectIdentifier;
+import org.apache.flink.table.catalog.ResolvedCatalogTable;
+import org.apache.flink.table.catalog.exceptions.DatabaseNotExistException;
+import org.apache.flink.table.catalog.exceptions.TableAlreadyExistException;
+import org.apache.flink.table.catalog.exceptions.TableNotExistException;
+import org.apache.flink.table.types.logical.IntType;
+import org.apache.flink.table.types.logical.RowType;
+import org.apache.flink.table.types.logical.VarCharType;
+
+import org.junit.Test;
+import org.junit.runner.RunWith;
+import org.junit.runners.Parameterized;
+
+import java.nio.file.Paths;
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.List;
+import java.util.UUID;
+
+import static org.apache.flink.table.store.file.FileStoreOptions.BUCKET;
+import static org.assertj.core.api.Assertions.assertThat;
+import static org.assertj.core.api.Assertions.assertThatThrownBy;
+
+/** IT cases for testing create managed table ddl. */
+@RunWith(Parameterized.class)
+public class CreateTableITCase extends TableStoreTestBase {
+
+    protected final boolean ignoreException;
+
+    protected ResolvedCatalogTable resolvedTable =
+            createResolvedTable(
+                    Collections.emptyMap(),
+                    RowType.of(new IntType(), new VarCharType()),
+                    Collections.emptyList(),
+                    new int[0]);
+
+    public CreateTableITCase(
+            RuntimeExecutionMode executionMode,
+            String tableName,
+            boolean enableChangeTracking,
+            boolean ignoreException,
+            ExpectedResult expectedResult) {
+        super(executionMode, tableName, enableChangeTracking, expectedResult);
+        this.ignoreException = ignoreException;
+    }
+
+    @Test
+    public void testCreateTable() {
+        final String ddl =
+                ShowCreateUtil.buildShowCreateTable(
+                        resolvedTable, tableIdentifier, ignoreException);
+        if (expectedResult.success) {
+            tEnv.executeSql(ddl);
+            // check catalog
+            assertThat(((TableEnvironmentImpl) tEnv).getCatalogManager().getTable(tableIdentifier))
+                    .isPresent();
+            // check table store
+            assertThat(Paths.get(rootPath, getRelativeFileStoreTablePath(tableIdentifier)).toFile())
+                    .exists();
+            // check log store
+            assertThat(topicExists(tableIdentifier.asSummaryString()))
+                    .isEqualTo(enableChangeTracking);
+        } else {
+            // check inconsistency between catalog/file store/log store
+            assertThat(ignoreException).isFalse();
+            assertThatThrownBy(() -> tEnv.executeSql(ddl))
+                    .getCause()
+                    .isInstanceOf(expectedResult.expectedType)
+                    .hasMessageContaining(expectedResult.expectedMessage);
+
+            if (expectedResult.expectedMessage.contains(
+                    String.format("already exists in Catalog %s", CURRENT_CATALOG))) {
+                assertThat(
+                                ((TableEnvironmentImpl) tEnv)
+                                        .getCatalogManager()
+                                        .getTable(tableIdentifier))
+                        .isPresent();
+            } else {
+                // throw exception when creating file path/topic, and catalog meta does not
+                // exist
+                assertThat(
+                                ((TableEnvironmentImpl) tEnv)
+                                        .getCatalogManager()
+                                        .getTable(tableIdentifier))
+                        .isNotPresent();
+            }
+        }
+    }
+
+    @Override
+    public void prepareEnv() {
+        if (expectedResult.success) {
+            // ensure catalog doesn't contain the table meta
+            tEnv.getCatalog(tEnv.getCurrentCatalog())
+                    .ifPresent(
+                            (catalog) -> {
+                                try {
+                                    catalog.dropTable(tableIdentifier.toObjectPath(), false);
+                                } catch (TableNotExistException ignored) {
+                                    // ignored
+                                }
+                            });
+            // ensure log store doesn't exist the topic
+            if (enableChangeTracking && !ignoreException) {
+                deleteTopicIfExists(tableIdentifier.asSummaryString());
+            }
+        } else if (expectedResult.expectedMessage.startsWith("Failed to create file store path.")) {
+            // failed when creating file store
+            Paths.get(rootPath, getRelativeFileStoreTablePath(tableIdentifier)).toFile().mkdirs();
+        } else if (expectedResult.expectedMessage.startsWith("Failed to create kafka topic.")) {
+            // failed when creating log store
+            createTopicIfNotExists(tableIdentifier.asSummaryString(), BUCKET.defaultValue());
+        } else {
+            // failed when registering schema to catalog
+            tEnv.getCatalog(tEnv.getCurrentCatalog())
+                    .ifPresent(
+                            (catalog) -> {
+                                try {
+                                    catalog.createTable(
+                                            tableIdentifier.toObjectPath(), resolvedTable, false);
+                                } catch (TableAlreadyExistException
+                                        | DatabaseNotExistException ignored) {
+                                    // ignored
+                                }
+                            });
+        }
+    }
+
+    @Parameterized.Parameters(
+            name =
+                    "executionMode-{0}, tableName-{1}, enableChangeTracking-{2}, ignoreException-{3}, expectedResult-{4}")
+    public static List<Object[]> data() {
+        List<Object[]> specs = new ArrayList<>();
+        // successful case specs
+        specs.add(
+                new Object[] {
+                    RuntimeExecutionMode.STREAMING,
+                    "table_" + UUID.randomUUID(),
+                    true,
+                    true,
+                    new ExpectedResult().success(true)
+                });
+        specs.add(
+                new Object[] {
+                    RuntimeExecutionMode.STREAMING,
+                    "table_" + UUID.randomUUID(),
+                    false,
+                    true,
+                    new ExpectedResult().success(true)
+                });
+        specs.add(
+                new Object[] {
+                    RuntimeExecutionMode.STREAMING,
+                    "table_" + UUID.randomUUID(),
+                    true,
+                    false,
+                    new ExpectedResult().success(true)
+                });
+        specs.add(
+                new Object[] {
+                    RuntimeExecutionMode.STREAMING,
+                    "table_" + UUID.randomUUID(),
+                    false,
+                    false,
+                    new ExpectedResult().success(true)
+                });
+
+        // failed case specs
+        specs.add(
+                new Object[] {
+                    RuntimeExecutionMode.STREAMING,
+                    "table_" + UUID.randomUUID(),
+                    false,
+                    false,
+                    new ExpectedResult()
+                            .success(false)
+                            .expectedType(TableException.class)
+                            .expectedMessage("Failed to create file store path.")
+                });
+        specs.add(
+                new Object[] {
+                    RuntimeExecutionMode.STREAMING,
+                    "table_" + UUID.randomUUID(),
+                    true,
+                    false,
+                    new ExpectedResult()
+                            .success(false)
+                            .expectedType(TableException.class)
+                            .expectedMessage("Failed to create kafka topic.")
+                });
+
+        final String tableName = "table_" + UUID.randomUUID();
+        specs.add(
+                new Object[] {
+                    RuntimeExecutionMode.STREAMING,
+                    tableName,
+                    true,
+                    false,
+                    new ExpectedResult()
+                            .success(false)
+                            .expectedType(TableAlreadyExistException.class)
+                            .expectedMessage(
+                                    String.format(
+                                            "Table (or view) %s already exists in Catalog %s.",
+                                            ObjectIdentifier.of(
+                                                            CURRENT_CATALOG,
+                                                            CURRENT_DATABASE,
+                                                            tableName)
+                                                    .toObjectPath()
+                                                    .getFullName(),
+                                            CURRENT_CATALOG))
+                });
+        return specs;
+    }
+}
diff --git a/flink-table-store-connector/src/test/java/org/apache/flink/table/store/connector/DropTableITCase.java b/flink-table-store-connector/src/test/java/org/apache/flink/table/store/connector/DropTableITCase.java
new file mode 100644
index 0000000..c683604
--- /dev/null
+++ b/flink-table-store-connector/src/test/java/org/apache/flink/table/store/connector/DropTableITCase.java
@@ -0,0 +1,242 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.store.connector;
+
+import org.apache.flink.api.common.RuntimeExecutionMode;
+import org.apache.flink.table.api.TableException;
+import org.apache.flink.table.api.ValidationException;
+import org.apache.flink.table.api.internal.TableEnvironmentImpl;
+import org.apache.flink.table.catalog.ObjectIdentifier;
+import org.apache.flink.table.catalog.ResolvedCatalogTable;
+import org.apache.flink.table.catalog.exceptions.TableNotExistException;
+import org.apache.flink.table.types.logical.IntType;
+import org.apache.flink.table.types.logical.RowType;
+import org.apache.flink.table.types.logical.VarCharType;
+
+import org.junit.Test;
+import org.junit.runner.RunWith;
+import org.junit.runners.Parameterized;
+
+import java.nio.file.Paths;
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.List;
+import java.util.UUID;
+
+import static org.assertj.core.api.Assertions.assertThat;
+import static org.assertj.core.api.Assertions.assertThatThrownBy;
+
+/** IT cases for testing drop managed table ddl. */
+@RunWith(Parameterized.class)
+public class DropTableITCase extends TableStoreTestBase {
+
+    protected final boolean ignoreException;
+
+    protected ResolvedCatalogTable resolvedTable =
+            createResolvedTable(
+                    Collections.emptyMap(),
+                    RowType.of(new IntType(), new VarCharType()),
+                    Collections.emptyList(),
+                    new int[0]);
+
+    public DropTableITCase(
+            RuntimeExecutionMode executionMode,
+            String tableName,
+            boolean enableChangeTracking,
+            boolean ignoreException,
+            ExpectedResult expectedResult) {
+        super(executionMode, tableName, enableChangeTracking, expectedResult);
+        this.ignoreException = ignoreException;
+    }
+
+    @Test
+    public void testDropTable() {
+        String ddl =
+                String.format(
+                        "DROP TABLE%s%s\n",
+                        ignoreException ? " IF EXISTS " : " ",
+                        tableIdentifier.asSerializableString());
+
+        if (expectedResult.success) {
+            tEnv.executeSql(ddl);
+            // check catalog
+            assertThat(((TableEnvironmentImpl) tEnv).getCatalogManager().getTable(tableIdentifier))
+                    .isNotPresent();
+            // check table store
+            assertThat(Paths.get(rootPath, getRelativeFileStoreTablePath(tableIdentifier)).toFile())
+                    .doesNotExist();
+            // check log store
+            assertThat(topicExists(tableIdentifier.asSummaryString())).isFalse();
+        } else {
+            // check inconsistency between catalog/file store/log store
+            assertThat(ignoreException).isFalse();
+            if (ValidationException.class.isAssignableFrom(expectedResult.expectedType)) {
+                // successfully delete path/topic, but schema doesn't exist in catalog
+                assertThatThrownBy(() -> tEnv.executeSql(ddl))
+                        .isInstanceOf(expectedResult.expectedType)
+                        .hasMessageContaining(expectedResult.expectedMessage);
+                assertThat(
+                                ((TableEnvironmentImpl) tEnv)
+                                        .getCatalogManager()
+                                        .getTable(tableIdentifier))
+                        .isNotPresent();
+            } else {
+                assertThatThrownBy(() -> tEnv.executeSql(ddl))
+                        .getCause()
+                        .isInstanceOf(expectedResult.expectedType)
+                        .hasMessageContaining(expectedResult.expectedMessage);
+                // throw exception when deleting file path/topic, so schema still exists in
+                // catalog
+                assertThat(
+                                ((TableEnvironmentImpl) tEnv)
+                                        .getCatalogManager()
+                                        .getTable(tableIdentifier))
+                        .isPresent();
+            }
+        }
+    }
+
+    @Override
+    public void prepareEnv() {
+        ((TableEnvironmentImpl) tEnv)
+                .getCatalogManager()
+                .createTable(resolvedTable, tableIdentifier, false);
+        if (expectedResult.success) {
+            if (ignoreException) {
+                // delete catalog schema does not affect dropping the table
+                tEnv.getCatalog(tEnv.getCurrentCatalog())
+                        .ifPresent(
+                                (catalog) -> {
+                                    try {
+                                        catalog.dropTable(tableIdentifier.toObjectPath(), false);
+                                    } catch (TableNotExistException ignored) {
+                                        // ignored
+                                    }
+                                });
+                // delete file store path does not affect dropping the table
+                deleteTablePath();
+                // delete log store topic does not affect dropping the table
+                if (enableChangeTracking) {
+                    deleteTopicIfExists(tableIdentifier.asSummaryString());
+                }
+            }
+        } else if (expectedResult.expectedMessage.startsWith("Failed to delete file store path.")) {
+            // failed when deleting file path
+            deleteTablePath();
+        } else if (expectedResult.expectedMessage.startsWith("Failed to delete kafka topic.")) {
+            // failed when deleting topic
+            deleteTopicIfExists(tableIdentifier.asSummaryString());
+        } else {
+            // failed when dropping catalog schema
+            tEnv.getCatalog(tEnv.getCurrentCatalog())
+                    .ifPresent(
+                            (catalog) -> {
+                                try {
+                                    catalog.dropTable(tableIdentifier.toObjectPath(), false);
+                                } catch (TableNotExistException ignored) {
+                                    // ignored
+                                }
+                            });
+        }
+    }
+
+    @Parameterized.Parameters(
+            name =
+                    "executionMode-{0}, tableName-{1}, enableChangeTracking-{2}, ignoreException-{3}, expectedResult-{4}")
+    public static List<Object[]> data() {
+        List<Object[]> specs = new ArrayList<>();
+        // successful case specs
+        specs.add(
+                new Object[] {
+                    RuntimeExecutionMode.STREAMING,
+                    "table_" + UUID.randomUUID(),
+                    true,
+                    true,
+                    new ExpectedResult().success(true)
+                });
+        specs.add(
+                new Object[] {
+                    RuntimeExecutionMode.STREAMING,
+                    "table_" + UUID.randomUUID(),
+                    false,
+                    true,
+                    new ExpectedResult().success(true)
+                });
+        specs.add(
+                new Object[] {
+                    RuntimeExecutionMode.STREAMING,
+                    "table_" + UUID.randomUUID(),
+                    true,
+                    false,
+                    new ExpectedResult().success(true)
+                });
+        specs.add(
+                new Object[] {
+                    RuntimeExecutionMode.STREAMING,
+                    "table_" + UUID.randomUUID(),
+                    false,
+                    false,
+                    new ExpectedResult().success(true)
+                });
+
+        // failed case specs
+        specs.add(
+                new Object[] {
+                    RuntimeExecutionMode.STREAMING,
+                    "table_" + UUID.randomUUID(),
+                    false,
+                    false,
+                    new ExpectedResult()
+                            .success(false)
+                            .expectedType(TableException.class)
+                            .expectedMessage("Failed to delete file store path.")
+                });
+        specs.add(
+                new Object[] {
+                    RuntimeExecutionMode.STREAMING,
+                    "table_" + UUID.randomUUID(),
+                    true,
+                    false,
+                    new ExpectedResult()
+                            .success(false)
+                            .expectedType(TableException.class)
+                            .expectedMessage("Failed to delete kafka topic.")
+                });
+        final String tableName = "table_" + UUID.randomUUID();
+        specs.add(
+                new Object[] {
+                    RuntimeExecutionMode.STREAMING,
+                    tableName,
+                    true,
+                    false,
+                    new ExpectedResult()
+                            .success(false)
+                            .expectedType(ValidationException.class)
+                            .expectedMessage(
+                                    String.format(
+                                            "Table with identifier '%s' does not exist.",
+                                            ObjectIdentifier.of(
+                                                            CURRENT_CATALOG,
+                                                            CURRENT_DATABASE,
+                                                            tableName)
+                                                    .asSummaryString()))
+                });
+        return specs;
+    }
+}
diff --git a/flink-table-store-connector/src/test/java/org/apache/flink/table/store/connector/ReadWriteTableITCase.java b/flink-table-store-connector/src/test/java/org/apache/flink/table/store/connector/ReadWriteTableITCase.java
new file mode 100644
index 0000000..eff7b77
--- /dev/null
+++ b/flink-table-store-connector/src/test/java/org/apache/flink/table/store/connector/ReadWriteTableITCase.java
@@ -0,0 +1,300 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.store.connector;
+
+import org.apache.flink.api.common.RuntimeExecutionMode;
+import org.apache.flink.table.api.TableResult;
+import org.apache.flink.table.planner.runtime.utils.TestData;
+import org.apache.flink.types.Row;
+import org.apache.flink.util.CloseableIterator;
+
+import org.junit.Test;
+import org.junit.runner.RunWith;
+import org.junit.runners.Parameterized;
+
+import javax.annotation.Nullable;
+
+import java.math.BigDecimal;
+import java.nio.file.Paths;
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.List;
+import java.util.UUID;
+import java.util.stream.Collectors;
+import java.util.stream.Stream;
+
+import scala.collection.JavaConverters;
+
+import static org.apache.flink.table.planner.factories.TestValuesTableFactory.changelogRow;
+import static org.apache.flink.table.planner.factories.TestValuesTableFactory.registerData;
+import static org.assertj.core.api.Assertions.assertThat;
+import static org.assertj.core.api.Assertions.assertThatThrownBy;
+
+/** IT cases for testing querying managed table dml. */
+@RunWith(Parameterized.class)
+public class ReadWriteTableITCase extends TableStoreTestBase {
+
+    private final boolean hasPk;
+    @Nullable private final Boolean duplicate;
+
+    public ReadWriteTableITCase(
+            RuntimeExecutionMode executionMode,
+            String tableName,
+            boolean enableChangeTracking,
+            boolean hasPk,
+            @Nullable Boolean duplicate,
+            ExpectedResult expectedResult) {
+        super(executionMode, tableName, enableChangeTracking, expectedResult);
+        this.hasPk = hasPk;
+        this.duplicate = duplicate;
+    }
+
+    @Override
+    public void after() {
+        tEnv.executeSql("DROP TABLE `source_table`");
+        super.after();
+    }
+
+    @Test
+    public void testReadWriteNonPartitioned() throws Exception {
+        String statement =
+                String.format("INSERT INTO %s \nSELECT * FROM `source_table`", tableIdentifier);
+        if (expectedResult.success) {
+            tEnv.executeSql(statement).await();
+            TableResult result =
+                    tEnv.executeSql(String.format("SELECT * FROM %s", tableIdentifier));
+            List<Row> actual = new ArrayList<>();
+            try (CloseableIterator<Row> iterator = result.collect()) {
+                while (iterator.hasNext()) {
+                    actual.add(iterator.next());
+                }
+            }
+            assertThat(actual).containsExactlyInAnyOrderElementsOf(expectedResult.expectedRecords);
+            String relativeFilePath = getRelativeFileStoreTablePath(tableIdentifier);
+            // check snapshot file path
+            assertThat(Paths.get(rootPath, relativeFilePath, "snapshot")).exists();
+            // check manifest file path
+            assertThat(Paths.get(rootPath, relativeFilePath, "manifest")).exists();
+
+            if (enableChangeTracking) {
+                assertThat(topicExists(tableIdentifier.asSummaryString())).isTrue();
+            }
+        } else {
+            assertThatThrownBy(
+                            () -> {
+                                tEnv.executeSql(statement).await();
+                                tEnv.executeSql(String.format("SELECT * FROM %s", tableIdentifier))
+                                        .collect();
+                            })
+                    .isInstanceOf(expectedResult.expectedType)
+                    .hasMessageContaining(expectedResult.expectedMessage);
+        }
+    }
+
+    @Parameterized.Parameters(
+            name =
+                    "executionMode-{0}, tableName-{1}, "
+                            + "enableChangeTracking-{2}, hasPk-{3},"
+                            + " duplicate-{4}, expectedResult-{5}")
+    public static List<Object[]> data() {
+        List<Object[]> specs = new ArrayList<>();
+        // batch cases
+        specs.add(
+                new Object[] {
+                    RuntimeExecutionMode.BATCH,
+                    "table_" + UUID.randomUUID(),
+                    false, // enable change-tracking
+                    false, // has pk
+                    false, // without duplicate
+                    new ExpectedResult().success(true).expectedRecords(insertOnlyCities(false))
+                });
+        specs.add(
+                new Object[] {
+                    RuntimeExecutionMode.BATCH,
+                    "table_" + UUID.randomUUID(),
+                    false, // enable change-tracking
+                    false, // has pk
+                    true, //  with duplicate
+                    new ExpectedResult().success(true).expectedRecords(insertOnlyCities(true))
+                });
+        List<Row> expected = new ArrayList<>(rates());
+        expected.remove(1);
+        specs.add(
+                new Object[] {
+                    RuntimeExecutionMode.BATCH,
+                    "table_" + UUID.randomUUID(),
+                    false, // enable change-tracking
+                    true, // has pk
+                    null, // without delete
+                    new ExpectedResult().success(true).expectedRecords(expected)
+                });
+        // TODO: streaming with change-tracking
+
+        // exception case
+        specs.add(
+                new Object[] {
+                    RuntimeExecutionMode.STREAMING,
+                    "table_" + UUID.randomUUID(),
+                    false, // enable change-tracking
+                    false, // has pk
+                    null, //  with duplicate
+                    new ExpectedResult()
+                            .success(false)
+                            .expectedType(UnsupportedOperationException.class)
+                            .expectedMessage("File store continuous mode is not supported yet.")
+                });
+
+        // TODO: add overwrite case
+
+        return specs;
+    }
+
+    @Override
+    protected void prepareEnv() {
+        if (hasPk) {
+            if (executionMode == RuntimeExecutionMode.STREAMING) {
+                registerUpsertRecordsWithPk();
+            } else {
+                registerInsertOnlyRecordsWithPk();
+            }
+        } else {
+            if (duplicate != null) {
+                registerInsertOnlyRecordsWithoutPk(duplicate);
+            } else {
+                registerInsertUpdateDeleteRecordsWithoutPk();
+            }
+        }
+    }
+
+    private void registerInsertUpdateDeleteRecordsWithoutPk() {
+        tEnv.executeSql(
+                String.format(
+                        "CREATE TABLE source_table (\n"
+                                + "  user_id STRING,\n"
+                                + "  user_name STRING,\n"
+                                + "  email STRING,\n"
+                                + "  balance DECIMAL(18,2)\n"
+                                + ") WITH (\n"
+                                + " 'connector' = 'values',\n"
+                                + " 'bounded' = '%s',\n"
+                                + " 'data-id' = '%s',\n"
+                                + " 'changelog-mode' = 'I,UA,UB,D',\n"
+                                + " 'disable-lookup' = 'true'\n"
+                                + ")",
+                        executionMode == RuntimeExecutionMode.BATCH,
+                        registerData(TestData.userChangelog())));
+        registerTableStoreSink();
+    }
+
+    private void registerInsertOnlyRecordsWithoutPk(boolean duplicate) {
+        tEnv.executeSql(
+                String.format(
+                        "CREATE TABLE source_table (\n"
+                                + "  name STRING NOT NULL,\n"
+                                + "  state STRING NOT NULL,\n"
+                                + "  pop INT NOT NULL\n"
+                                + ") WITH (\n"
+                                + " 'connector' = 'values',\n"
+                                + " 'bounded' = '%s',\n"
+                                + " 'data-id' = '%s',\n"
+                                + " 'changelog-mode' = 'I'\n"
+                                + ")",
+                        executionMode == RuntimeExecutionMode.BATCH,
+                        registerData(insertOnlyCities(duplicate))));
+        registerTableStoreSink();
+    }
+
+    private void registerInsertOnlyRecordsWithPk() {
+        tEnv.executeSql(
+                String.format(
+                        "CREATE TABLE source_table (\n"
+                                + "  currency STRING,\n"
+                                + "  rate BIGINT,\n"
+                                + "  PRIMARY KEY (currency) NOT ENFORCED\n"
+                                + ") WITH (\n"
+                                + "  'connector' = 'values',\n"
+                                + " 'bounded' = '%s',\n"
+                                + "  'data-id' = '%s',\n"
+                                + "  'changelog-mode' = 'I',\n"
+                                + "  'disable-lookup' = 'true'\n"
+                                + ")",
+                        executionMode == RuntimeExecutionMode.BATCH, registerData(rates())));
+        registerTableStoreSink();
+    }
+
+    private void registerUpsertRecordsWithPk() {
+        tEnv.executeSql(
+                String.format(
+                        "CREATE TABLE source_table (\n"
+                                + "  currency STRING,\n"
+                                + "  rate BIGINT,\n"
+                                + "  PRIMARY KEY (currency) NOT ENFORCED\n"
+                                + ") WITH (\n"
+                                + "  'connector' = 'values',\n"
+                                + " 'bounded' = '%s',\n"
+                                + "  'data-id' = '%s',\n"
+                                + "  'changelog-mode' = 'UA,D',\n"
+                                + "  'disable-lookup' = 'true'\n"
+                                + ")",
+                        executionMode == RuntimeExecutionMode.BATCH,
+                        registerData(ratesChangelog())));
+        registerTableStoreSink();
+    }
+
+    private static List<Row> rates() {
+        return Arrays.asList(
+                changelogRow("+I", "US Dollar", 102L),
+                changelogRow("+I", "Euro", 114L),
+                changelogRow("+I", "Yen", 1L),
+                changelogRow("+I", "Euro", 119L));
+    }
+
+    private static List<Row> ratesChangelog() {
+        return Arrays.asList(
+                changelogRow("+I", "US Dollar", 102L),
+                changelogRow("+I", "Euro", 114L),
+                changelogRow("+I", "Yen", 1L),
+                changelogRow("+U", "Euro", 116L),
+                changelogRow("-D", "Euro", 116L),
+                changelogRow("+I", "Euro", 119L),
+                changelogRow("+U", "Euro", 119L),
+                changelogRow("-D", "Yen", 1L));
+    }
+
+    private static List<Row> insertOnlyCities(boolean duplicate) {
+        List<Row> cities = JavaConverters.seqAsJavaList(TestData.citiesData());
+        return duplicate
+                ? Stream.concat(cities.stream(), cities.stream()).collect(Collectors.toList())
+                : cities;
+    }
+
+    private static List<Row> userChangelog() {
+        return Arrays.asList(
+                changelogRow("+I", "user1", "Tom", "tom123@gmail.com", new BigDecimal("8.10")),
+                changelogRow("+I", "user3", "Bailey", "bailey@qq.com", new BigDecimal("9.99")),
+                changelogRow("+I", "user4", "Tina", "tina@gmail.com", new BigDecimal("11.30")));
+    }
+
+    private void registerTableStoreSink() {
+        tEnv.executeSql(
+                String.format(
+                        "CREATE TABLE %s LIKE `source_table` (EXCLUDING OPTIONS)",
+                        tableIdentifier.asSerializableString()));
+    }
+}
diff --git a/flink-table-store-connector/src/test/java/org/apache/flink/table/store/connector/ShowCreateUtil.java b/flink-table-store-connector/src/test/java/org/apache/flink/table/store/connector/ShowCreateUtil.java
new file mode 100644
index 0000000..118d5c3
--- /dev/null
+++ b/flink-table-store-connector/src/test/java/org/apache/flink/table/store/connector/ShowCreateUtil.java
@@ -0,0 +1,180 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.store.connector;
+
+import org.apache.flink.table.api.TableException;
+import org.apache.flink.table.catalog.CatalogBaseTable;
+import org.apache.flink.table.catalog.Column;
+import org.apache.flink.table.catalog.ObjectIdentifier;
+import org.apache.flink.table.catalog.ResolvedCatalogBaseTable;
+import org.apache.flink.table.catalog.ResolvedCatalogTable;
+import org.apache.flink.table.catalog.UniqueConstraint;
+import org.apache.flink.table.utils.EncodingUtils;
+
+import org.apache.commons.lang3.StringUtils;
+
+import java.util.Objects;
+import java.util.Optional;
+import java.util.stream.Collectors;
+
+/**
+ * SHOW CREATE statement Util.
+ *
+ * <p>This code is mostly copied from {@link org.apache.flink.table.api.internal.ShowCreateUtil}.
+ */
+public class ShowCreateUtil {
+
+    private ShowCreateUtil() {}
+
+    static String buildShowCreateTable(
+            ResolvedCatalogBaseTable<?> table,
+            ObjectIdentifier tableIdentifier,
+            boolean ignoreIfExists) {
+        if (table.getTableKind() == CatalogBaseTable.TableKind.VIEW) {
+            throw new TableException(
+                    String.format(
+                            "SHOW CREATE TABLE is only supported for tables, but %s is a view. Please use SHOW CREATE VIEW instead.",
+                            tableIdentifier.asSerializableString()));
+        }
+        final String printIndent = "  ";
+        StringBuilder sb =
+                new StringBuilder()
+                        .append(buildCreateFormattedPrefix(ignoreIfExists, tableIdentifier));
+        sb.append(extractFormattedColumns(table, printIndent));
+        extractFormattedWatermarkSpecs(table, printIndent)
+                .ifPresent(watermarkSpecs -> sb.append(",\n").append(watermarkSpecs));
+        extractFormattedPrimaryKey(table, printIndent).ifPresent(pk -> sb.append(",\n").append(pk));
+        sb.append("\n) ");
+        extractFormattedComment(table)
+                .ifPresent(
+                        c -> sb.append(String.format("COMMENT '%s'%s", c, System.lineSeparator())));
+        extractFormattedPartitionedInfo((ResolvedCatalogTable) table)
+                .ifPresent(
+                        partitionedInfoFormatted ->
+                                sb.append("PARTITIONED BY (")
+                                        .append(partitionedInfoFormatted)
+                                        .append(")\n"));
+        extractFormattedOptions(table, printIndent)
+                .ifPresent(v -> sb.append("WITH (\n").append(v).append("\n)\n"));
+        return sb.toString();
+    }
+
+    static String extractFormattedColumns(ResolvedCatalogBaseTable<?> table, String printIndent) {
+        return table.getResolvedSchema().getColumns().stream()
+                .map(column -> String.format("%s%s", printIndent, getColumnString(column)))
+                .collect(Collectors.joining(",\n"));
+    }
+
+    static Optional<String> extractFormattedWatermarkSpecs(
+            ResolvedCatalogBaseTable<?> table, String printIndent) {
+        if (table.getResolvedSchema().getWatermarkSpecs().isEmpty()) {
+            return Optional.empty();
+        }
+        return Optional.of(
+                table.getResolvedSchema().getWatermarkSpecs().stream()
+                        .map(
+                                watermarkSpec ->
+                                        String.format(
+                                                "%sWATERMARK FOR %s AS %s",
+                                                printIndent,
+                                                EncodingUtils.escapeIdentifier(
+                                                        watermarkSpec.getRowtimeAttribute()),
+                                                watermarkSpec
+                                                        .getWatermarkExpression()
+                                                        .asSerializableString()))
+                        .collect(Collectors.joining("\n")));
+    }
+
+    static Optional<String> extractFormattedComment(ResolvedCatalogBaseTable<?> table) {
+        String comment = table.getComment();
+        if (StringUtils.isNotEmpty(comment)) {
+            return Optional.of(EncodingUtils.escapeSingleQuotes(comment));
+        }
+        return Optional.empty();
+    }
+
+    static Optional<String> extractFormattedPartitionedInfo(ResolvedCatalogTable catalogTable) {
+        if (!catalogTable.isPartitioned()) {
+            return Optional.empty();
+        }
+        return Optional.of(
+                catalogTable.getPartitionKeys().stream()
+                        .map(EncodingUtils::escapeIdentifier)
+                        .collect(Collectors.joining(", ")));
+    }
+
+    static Optional<String> extractFormattedOptions(
+            ResolvedCatalogBaseTable<?> table, String printIndent) {
+        if (Objects.isNull(table.getOptions()) || table.getOptions().isEmpty()) {
+            return Optional.empty();
+        }
+        return Optional.of(
+                table.getOptions().entrySet().stream()
+                        .map(
+                                entry ->
+                                        String.format(
+                                                "%s'%s' = '%s'",
+                                                printIndent,
+                                                EncodingUtils.escapeSingleQuotes(entry.getKey()),
+                                                EncodingUtils.escapeSingleQuotes(entry.getValue())))
+                        .collect(Collectors.joining(",\n")));
+    }
+
+    static String buildCreateFormattedPrefix(boolean ignoreIfExists, ObjectIdentifier identifier) {
+        return String.format(
+                "CREATE TABLE%s %s (%s",
+                ignoreIfExists ? " IF NOT EXISTS " : "",
+                identifier.asSerializableString(),
+                System.lineSeparator());
+    }
+
+    static Optional<String> extractFormattedPrimaryKey(
+            ResolvedCatalogBaseTable<?> table, String printIndent) {
+        Optional<UniqueConstraint> primaryKey = table.getResolvedSchema().getPrimaryKey();
+        return primaryKey.map(
+                uniqueConstraint -> String.format("%s%s", printIndent, uniqueConstraint));
+    }
+
+    static String getColumnString(Column column) {
+        final StringBuilder sb = new StringBuilder();
+        sb.append(EncodingUtils.escapeIdentifier(column.getName()));
+        sb.append(" ");
+        // skip data type for computed column
+        if (column instanceof Column.ComputedColumn) {
+            sb.append(
+                    column.explainExtras()
+                            .orElseThrow(
+                                    () ->
+                                            new TableException(
+                                                    String.format(
+                                                            "Column expression can not be null for computed column '%s'",
+                                                            column.getName()))));
+        } else {
+            sb.append(column.getDataType().getLogicalType().asSerializableString());
+            column.explainExtras()
+                    .ifPresent(
+                            e -> {
+                                sb.append(" ");
+                                sb.append(e);
+                            });
+        }
+        // TODO: Print the column comment until FLINK-18958 is fixed
+        return sb.toString();
+    }
+}
diff --git a/flink-table-store-connector/src/test/java/org/apache/flink/table/store/connector/TableStoreFactoryTest.java b/flink-table-store-connector/src/test/java/org/apache/flink/table/store/connector/TableStoreFactoryTest.java
index e0c9b5b..9b8d006 100644
--- a/flink-table-store-connector/src/test/java/org/apache/flink/table/store/connector/TableStoreFactoryTest.java
+++ b/flink-table-store-connector/src/test/java/org/apache/flink/table/store/connector/TableStoreFactoryTest.java
@@ -158,10 +158,48 @@ public class TableStoreFactoryTest {
                 addPrefix(expectedLogOptions, LOG_PREFIX, (key) -> true);
         enrichedOptions.put("foo", "bar");
 
-        assertThat(((TableStoreFactory) tableStoreFactory).filterLogStoreOptions(enrichedOptions))
+        assertThat(TableStoreFactory.filterLogStoreOptions(enrichedOptions))
                 .containsExactlyInAnyOrderEntriesOf(expectedLogOptions);
     }
 
+    @Test
+    public void testFilterFileStoreOptions() {
+        // mix invalid key and leave value to empty to emphasize the deferred validation
+        Map<String, String> expectedFileStoreOptions =
+                of("dummy.key", "", FILE_PATH.key(), "dummy:/foo/bar");
+        Map<String, String> enrichedOptions = new HashMap<>(expectedFileStoreOptions);
+        enrichedOptions.put("log.foo", "bar");
+        enrichedOptions.put("log.bar", "foo");
+
+        assertThat(TableStoreFactory.filterFileStoreOptions(enrichedOptions))
+                .containsExactlyInAnyOrderEntriesOf(expectedFileStoreOptions);
+    }
+
+    @Test
+    public void testTablePath() {
+        Map<String, String> options = of(FILE_PATH.key(), "dummy:/foo/bar");
+        assertThat(TableStoreFactory.tablePath(options, TABLE_IDENTIFIER))
+                .isEqualTo(
+                        new org.apache.flink.core.fs.Path(
+                                "dummy:/foo/bar/root/catalog.catalog/database.db/table"));
+
+        assertThatThrownBy(
+                        () -> TableStoreFactory.tablePath(Collections.emptyMap(), TABLE_IDENTIFIER))
+                .isInstanceOf(IllegalArgumentException.class)
+                .hasMessageContaining(
+                        "Failed to create file store path. "
+                                + "Please specify a root dir by setting session level configuration "
+                                + "as `SET 'table-store.file.path' = '...'`. "
+                                + "Alternatively, you can use a per-table root dir "
+                                + "as `CREATE TABLE ${table} (...) WITH ('file.path' = '...')`");
+    }
+
+    @ParameterizedTest
+    @MethodSource("providingEnrichedOptionsForChangeTracking")
+    public void testEnableChangeTracking(Map<String, String> options, boolean expected) {
+        assertThat(TableStoreFactory.enableChangeTracking(options)).isEqualTo(expected);
+    }
+
     // ~ Tools ------------------------------------------------------------------
 
     private static Stream<Arguments> providingOptions() {
@@ -240,6 +278,14 @@ public class TableStoreFactoryTest {
                 Arguments.of(enrichedOptions, false));
     }
 
+    private static Stream<Arguments> providingEnrichedOptionsForChangeTracking() {
+        return Stream.of(
+                Arguments.of(Collections.emptyMap(), true),
+                Arguments.of(of(CHANGE_TRACKING.key(), "true"), true),
+                Arguments.of(of(CHANGE_TRACKING.key(), "false"), false),
+                Arguments.of(of(TABLE_STORE_PREFIX + CHANGE_TRACKING.key(), "false"), true));
+    }
+
     private static Map<String, String> addPrefix(
             Map<String, String> options, String prefix, Predicate<String> predicate) {
         Map<String, String> newOptions = new HashMap<>();
diff --git a/flink-table-store-connector/src/test/java/org/apache/flink/table/store/connector/TableStoreITCase.java b/flink-table-store-connector/src/test/java/org/apache/flink/table/store/connector/TableStoreITCase.java
deleted file mode 100644
index 4206970..0000000
--- a/flink-table-store-connector/src/test/java/org/apache/flink/table/store/connector/TableStoreITCase.java
+++ /dev/null
@@ -1,576 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.table.store.connector;
-
-import org.apache.flink.configuration.Configuration;
-import org.apache.flink.table.api.Schema;
-import org.apache.flink.table.api.TableException;
-import org.apache.flink.table.api.ValidationException;
-import org.apache.flink.table.api.internal.TableEnvironmentImpl;
-import org.apache.flink.table.catalog.CatalogTable;
-import org.apache.flink.table.catalog.Column;
-import org.apache.flink.table.catalog.GenericInMemoryCatalog;
-import org.apache.flink.table.catalog.ObjectIdentifier;
-import org.apache.flink.table.catalog.ResolvedCatalogTable;
-import org.apache.flink.table.catalog.ResolvedSchema;
-import org.apache.flink.table.catalog.UniqueConstraint;
-import org.apache.flink.table.catalog.exceptions.DatabaseNotExistException;
-import org.apache.flink.table.catalog.exceptions.TableAlreadyExistException;
-import org.apache.flink.table.catalog.exceptions.TableNotExistException;
-import org.apache.flink.table.store.kafka.KafkaTableTestBase;
-import org.apache.flink.table.types.DataType;
-import org.apache.flink.table.types.logical.IntType;
-import org.apache.flink.table.types.logical.RowType;
-import org.apache.flink.table.types.logical.VarCharType;
-import org.apache.flink.table.types.utils.TypeConversions;
-
-import org.apache.commons.io.FileUtils;
-import org.junit.Assume;
-import org.junit.Before;
-import org.junit.Rule;
-import org.junit.Test;
-import org.junit.rules.TestName;
-import org.junit.runner.RunWith;
-import org.junit.runners.Parameterized;
-
-import java.io.IOException;
-import java.io.UncheckedIOException;
-import java.nio.file.Paths;
-import java.util.ArrayList;
-import java.util.Arrays;
-import java.util.Collection;
-import java.util.Collections;
-import java.util.List;
-import java.util.Map;
-import java.util.UUID;
-import java.util.stream.Collectors;
-import java.util.stream.IntStream;
-import java.util.stream.Stream;
-
-import static org.apache.flink.table.store.connector.TableStoreFactoryOptions.CHANGE_TRACKING;
-import static org.apache.flink.table.store.connector.TableStoreITCase.StatementType.CREATE_STATEMENT;
-import static org.apache.flink.table.store.connector.TableStoreITCase.StatementType.DROP_STATEMENT;
-import static org.apache.flink.table.store.file.FileStoreOptions.BUCKET;
-import static org.apache.flink.table.store.file.FileStoreOptions.FILE_PATH;
-import static org.apache.flink.table.store.file.FileStoreOptions.TABLE_STORE_PREFIX;
-import static org.apache.flink.table.store.kafka.KafkaLogOptions.BOOTSTRAP_SERVERS;
-import static org.apache.flink.table.store.log.LogOptions.LOG_PREFIX;
-import static org.assertj.core.api.Assertions.assertThat;
-import static org.assertj.core.api.Assertions.assertThatThrownBy;
-
-/** End-to-end tests for table store. */
-@RunWith(Parameterized.class)
-public class TableStoreITCase extends KafkaTableTestBase {
-
-    private static final String CURRENT_CATALOG = "catalog";
-    private static final String CURRENT_DATABASE = "database";
-
-    private final ObjectIdentifier tableIdentifier;
-    private final StatementType statementType;
-    private final boolean enableChangeTracking;
-    private final boolean ignoreException;
-    private final ExpectedResult expectedResult;
-
-    private String rootPath;
-    private ResolvedCatalogTable resolvedTable;
-    @Rule public TestName name = new TestName();
-
-    public TableStoreITCase(
-            String tableName,
-            StatementType statementType,
-            boolean enableChangeTracking,
-            boolean ignoreException,
-            ExpectedResult expectedResult) {
-        this.tableIdentifier = ObjectIdentifier.of(CURRENT_CATALOG, CURRENT_DATABASE, tableName);
-        this.statementType = statementType;
-        this.enableChangeTracking = enableChangeTracking;
-        this.ignoreException = ignoreException;
-        this.expectedResult = expectedResult;
-    }
-
-    @Parameterized.Parameters(
-            name =
-                    "tableName-{0}, statementType-{1}, enableChangeTracking-{2}, ignoreException-{3}, expectedResult-{4}")
-    public static Collection<Object[]> data() {
-        return Stream.concat(prepareCreateTableSpecs().stream(), prepareDropTableSpecs().stream())
-                .collect(Collectors.toList());
-    }
-
-    @Before
-    @Override
-    public void setup() {
-        super.setup();
-        ((TableEnvironmentImpl) tEnv)
-                .getCatalogManager()
-                .registerCatalog(
-                        CURRENT_CATALOG,
-                        new GenericInMemoryCatalog(CURRENT_CATALOG, CURRENT_DATABASE));
-        tEnv.useCatalog(CURRENT_CATALOG);
-        resolvedTable =
-                createResolvedTable(
-                        Collections.emptyMap(),
-                        RowType.of(new IntType(), new VarCharType()),
-                        new int[0]);
-        try {
-            rootPath = TEMPORARY_FOLDER.newFolder().getPath();
-        } catch (IOException e) {
-            throw new UncheckedIOException(e);
-        }
-        prepareSessionContext();
-        // match parameter type with test name to conditionally skip before setup, because junit4
-        // doesn't support multiple data providers for different methods
-        if (name.getMethodName().startsWith("testCreateTable")
-                && statementType == CREATE_STATEMENT) {
-            prepareEnvForCreateTable();
-        } else if (name.getMethodName().startsWith("testDropTable")
-                && statementType == DROP_STATEMENT) {
-            prepareEnvForDropTable();
-        }
-    }
-
-    @Test
-    public void testCreateTable() {
-        Assume.assumeTrue(statementType == CREATE_STATEMENT);
-        final String ddl =
-                String.format(
-                        "CREATE TABLE%s%s (f0 INT, f1 STRING)\n",
-                        ignoreException ? " IF NOT EXISTS " : " ",
-                        tableIdentifier.asSerializableString());
-        if (expectedResult.success) {
-            tEnv.executeSql(ddl);
-            // check catalog
-            assertThat(((TableEnvironmentImpl) tEnv).getCatalogManager().getTable(tableIdentifier))
-                    .isPresent();
-            // check table store
-            assertThat(Paths.get(rootPath, getRelativeFileStoreTablePath(tableIdentifier)).toFile())
-                    .exists();
-            // check log store
-            assertThat(topicExists(tableIdentifier.asSummaryString()))
-                    .isEqualTo(enableChangeTracking);
-        } else {
-            // check inconsistency between catalog/file store/log store
-            assertThat(ignoreException).isFalse();
-            assertThatThrownBy(() -> tEnv.executeSql(ddl))
-                    .getCause()
-                    .isInstanceOf(expectedResult.expectedType)
-                    .hasMessageContaining(expectedResult.expectedMessage);
-
-            if (expectedResult.expectedMessage.contains(
-                    String.format("already exists in Catalog %s", CURRENT_CATALOG))) {
-                assertThat(
-                                ((TableEnvironmentImpl) tEnv)
-                                        .getCatalogManager()
-                                        .getTable(tableIdentifier))
-                        .isPresent();
-            } else {
-                // throw exception when creating file path/topic, and catalog meta does not exist
-                assertThat(
-                                ((TableEnvironmentImpl) tEnv)
-                                        .getCatalogManager()
-                                        .getTable(tableIdentifier))
-                        .isNotPresent();
-            }
-        }
-    }
-
-    @Test
-    public void testDropTable() {
-        Assume.assumeTrue(statementType == DROP_STATEMENT);
-        String ddl =
-                String.format(
-                        "DROP TABLE%s%s\n",
-                        ignoreException ? " IF EXISTS " : " ",
-                        tableIdentifier.asSerializableString());
-
-        if (expectedResult.success) {
-            tEnv.executeSql(ddl);
-            // check catalog
-            assertThat(((TableEnvironmentImpl) tEnv).getCatalogManager().getTable(tableIdentifier))
-                    .isNotPresent();
-            // check table store
-            assertThat(Paths.get(rootPath, getRelativeFileStoreTablePath(tableIdentifier)).toFile())
-                    .doesNotExist();
-            // check log store
-            assertThat(topicExists(tableIdentifier.asSummaryString())).isFalse();
-        } else {
-            // check inconsistency between catalog/file store/log store
-            assertThat(ignoreException).isFalse();
-            if (ValidationException.class.isAssignableFrom(expectedResult.expectedType)) {
-                // successfully delete path/topic, but schema doesn't exist in catalog
-                assertThatThrownBy(() -> tEnv.executeSql(ddl))
-                        .isInstanceOf(expectedResult.expectedType)
-                        .hasMessageContaining(expectedResult.expectedMessage);
-                assertThat(
-                                ((TableEnvironmentImpl) tEnv)
-                                        .getCatalogManager()
-                                        .getTable(tableIdentifier))
-                        .isNotPresent();
-            } else {
-                assertThatThrownBy(() -> tEnv.executeSql(ddl))
-                        .getCause()
-                        .isInstanceOf(expectedResult.expectedType)
-                        .hasMessageContaining(expectedResult.expectedMessage);
-                // throw exception when deleting file path/topic, so schema still exists in catalog
-                assertThat(
-                                ((TableEnvironmentImpl) tEnv)
-                                        .getCatalogManager()
-                                        .getTable(tableIdentifier))
-                        .isPresent();
-            }
-        }
-    }
-
-    // ~ Tools ------------------------------------------------------------------
-
-    private static List<Object[]> prepareCreateTableSpecs() {
-        List<Object[]> specs = new ArrayList<>();
-        // successful case specs
-        specs.add(
-                new Object[] {
-                    "table_" + UUID.randomUUID(),
-                    CREATE_STATEMENT,
-                    true,
-                    true,
-                    new ExpectedResult().success(true)
-                });
-        specs.add(
-                new Object[] {
-                    "table_" + UUID.randomUUID(),
-                    CREATE_STATEMENT,
-                    false,
-                    true,
-                    new ExpectedResult().success(true)
-                });
-        specs.add(
-                new Object[] {
-                    "table_" + UUID.randomUUID(),
-                    CREATE_STATEMENT,
-                    true,
-                    false,
-                    new ExpectedResult().success(true)
-                });
-        specs.add(
-                new Object[] {
-                    "table_" + UUID.randomUUID(),
-                    CREATE_STATEMENT,
-                    false,
-                    false,
-                    new ExpectedResult().success(true)
-                });
-
-        // failed case specs
-        specs.add(
-                new Object[] {
-                    "table_" + UUID.randomUUID(),
-                    CREATE_STATEMENT,
-                    false,
-                    false,
-                    new ExpectedResult()
-                            .success(false)
-                            .expectedType(TableException.class)
-                            .expectedMessage("Failed to create file store path.")
-                });
-        specs.add(
-                new Object[] {
-                    "table_" + UUID.randomUUID(),
-                    CREATE_STATEMENT,
-                    true,
-                    false,
-                    new ExpectedResult()
-                            .success(false)
-                            .expectedType(TableException.class)
-                            .expectedMessage("Failed to create kafka topic.")
-                });
-        final String tableName = "table_" + UUID.randomUUID();
-        specs.add(
-                new Object[] {
-                    tableName,
-                    CREATE_STATEMENT,
-                    true,
-                    false,
-                    new ExpectedResult()
-                            .success(false)
-                            .expectedType(TableAlreadyExistException.class)
-                            .expectedMessage(
-                                    String.format(
-                                            "Table (or view) %s already exists in Catalog %s.",
-                                            ObjectIdentifier.of(
-                                                            CURRENT_CATALOG,
-                                                            CURRENT_DATABASE,
-                                                            tableName)
-                                                    .toObjectPath()
-                                                    .getFullName(),
-                                            CURRENT_CATALOG))
-                });
-        return specs;
-    }
-
-    private static List<Object[]> prepareDropTableSpecs() {
-        List<Object[]> specs = new ArrayList<>();
-        // successful case specs
-        specs.add(
-                new Object[] {
-                    "table_" + UUID.randomUUID(),
-                    DROP_STATEMENT,
-                    true,
-                    true,
-                    new ExpectedResult().success(true)
-                });
-        specs.add(
-                new Object[] {
-                    "table_" + UUID.randomUUID(),
-                    DROP_STATEMENT,
-                    false,
-                    true,
-                    new ExpectedResult().success(true)
-                });
-        specs.add(
-                new Object[] {
-                    "table_" + UUID.randomUUID(),
-                    DROP_STATEMENT,
-                    true,
-                    false,
-                    new ExpectedResult().success(true)
-                });
-        specs.add(
-                new Object[] {
-                    "table_" + UUID.randomUUID(),
-                    DROP_STATEMENT,
-                    false,
-                    false,
-                    new ExpectedResult().success(true)
-                });
-
-        // failed case specs
-        specs.add(
-                new Object[] {
-                    "table_" + UUID.randomUUID(),
-                    DROP_STATEMENT,
-                    false,
-                    false,
-                    new ExpectedResult()
-                            .success(false)
-                            .expectedType(TableException.class)
-                            .expectedMessage("Failed to delete file store path.")
-                });
-        specs.add(
-                new Object[] {
-                    "table_" + UUID.randomUUID(),
-                    DROP_STATEMENT,
-                    true,
-                    false,
-                    new ExpectedResult()
-                            .success(false)
-                            .expectedType(TableException.class)
-                            .expectedMessage("Failed to delete kafka topic.")
-                });
-        final String tableName = "table_" + UUID.randomUUID();
-        specs.add(
-                new Object[] {
-                    tableName,
-                    DROP_STATEMENT,
-                    true,
-                    false,
-                    new ExpectedResult()
-                            .success(false)
-                            .expectedType(ValidationException.class)
-                            .expectedMessage(
-                                    String.format(
-                                            "Table with identifier '%s' does not exist.",
-                                            ObjectIdentifier.of(
-                                                            CURRENT_CATALOG,
-                                                            CURRENT_DATABASE,
-                                                            tableName)
-                                                    .asSummaryString()))
-                });
-        return specs;
-    }
-
-    private void prepareSessionContext() {
-        Configuration configuration = tEnv.getConfig().getConfiguration();
-        configuration.setString(TABLE_STORE_PREFIX + FILE_PATH.key(), rootPath);
-        configuration.setString(
-                TABLE_STORE_PREFIX + LOG_PREFIX + BOOTSTRAP_SERVERS.key(), getBootstrapServers());
-        configuration.setBoolean(TABLE_STORE_PREFIX + CHANGE_TRACKING.key(), enableChangeTracking);
-    }
-
-    private void prepareEnvForCreateTable() {
-        if (expectedResult.success) {
-            // ensure catalog doesn't contain the table meta
-            tEnv.getCatalog(tEnv.getCurrentCatalog())
-                    .ifPresent(
-                            (catalog) -> {
-                                try {
-                                    catalog.dropTable(tableIdentifier.toObjectPath(), false);
-                                } catch (TableNotExistException ignored) {
-                                    // ignored
-                                }
-                            });
-            // ensure log store doesn't exist the topic
-            if (enableChangeTracking && !ignoreException) {
-                deleteTopicIfExists(tableIdentifier.asSummaryString());
-            }
-        } else if (expectedResult.expectedMessage.startsWith("Failed to create file store path.")) {
-            // failed when creating file store
-            Paths.get(rootPath, getRelativeFileStoreTablePath(tableIdentifier)).toFile().mkdirs();
-        } else if (expectedResult.expectedMessage.startsWith("Failed to create kafka topic.")) {
-            // failed when creating log store
-            createTopicIfNotExists(tableIdentifier.asSummaryString(), BUCKET.defaultValue());
-        } else {
-            // failed when registering schema to catalog
-            tEnv.getCatalog(tEnv.getCurrentCatalog())
-                    .ifPresent(
-                            (catalog) -> {
-                                try {
-                                    catalog.createTable(
-                                            tableIdentifier.toObjectPath(), resolvedTable, false);
-                                } catch (TableAlreadyExistException
-                                        | DatabaseNotExistException ignored) {
-                                    // ignored
-                                }
-                            });
-        }
-    }
-
-    private void prepareEnvForDropTable() {
-        ((TableEnvironmentImpl) tEnv)
-                .getCatalogManager()
-                .createTable(resolvedTable, tableIdentifier, false);
-        if (expectedResult.success) {
-            if (ignoreException) {
-                // delete catalog schema does not affect dropping the table
-                tEnv.getCatalog(tEnv.getCurrentCatalog())
-                        .ifPresent(
-                                (catalog) -> {
-                                    try {
-                                        catalog.dropTable(tableIdentifier.toObjectPath(), false);
-                                    } catch (TableNotExistException ignored) {
-                                        // ignored
-                                    }
-                                });
-                // delete file store path does not affect dropping the table
-                deleteTablePath();
-                // delete log store topic does not affect dropping the table
-                if (enableChangeTracking) {
-                    deleteTopicIfExists(tableIdentifier.asSummaryString());
-                }
-            }
-        } else if (expectedResult.expectedMessage.startsWith("Failed to delete file store path.")) {
-            // failed when deleting file path
-            deleteTablePath();
-        } else if (expectedResult.expectedMessage.startsWith("Failed to delete kafka topic.")) {
-            // failed when deleting topic
-            deleteTopicIfExists(tableIdentifier.asSummaryString());
-        } else {
-            // failed when dropping catalog schema
-            tEnv.getCatalog(tEnv.getCurrentCatalog())
-                    .ifPresent(
-                            (catalog) -> {
-                                try {
-                                    catalog.dropTable(tableIdentifier.toObjectPath(), false);
-                                } catch (TableNotExistException ignored) {
-                                    // ignored
-                                }
-                            });
-        }
-    }
-
-    private void deleteTablePath() {
-        try {
-            FileUtils.deleteDirectory(
-                    Paths.get(rootPath, getRelativeFileStoreTablePath(tableIdentifier)).toFile());
-        } catch (IOException ignored) {
-            // ignored
-        }
-    }
-
-    static ResolvedCatalogTable createResolvedTable(
-            Map<String, String> options, RowType rowType, int[] pk) {
-        List<String> fieldNames = rowType.getFieldNames();
-        List<DataType> fieldDataTypes =
-                rowType.getChildren().stream()
-                        .map(TypeConversions::fromLogicalToDataType)
-                        .collect(Collectors.toList());
-        CatalogTable origin =
-                CatalogTable.of(
-                        Schema.newBuilder().fromFields(fieldNames, fieldDataTypes).build(),
-                        "a comment",
-                        Collections.emptyList(),
-                        options);
-        List<Column> resolvedColumns =
-                IntStream.range(0, fieldNames.size())
-                        .mapToObj(i -> Column.physical(fieldNames.get(i), fieldDataTypes.get(i)))
-                        .collect(Collectors.toList());
-        UniqueConstraint constraint = null;
-        if (pk.length > 0) {
-            List<String> pkNames =
-                    Arrays.stream(pk).mapToObj(fieldNames::get).collect(Collectors.toList());
-            constraint = UniqueConstraint.primaryKey("pk", pkNames);
-        }
-        return new ResolvedCatalogTable(
-                origin, new ResolvedSchema(resolvedColumns, Collections.emptyList(), constraint));
-    }
-
-    static String getRelativeFileStoreTablePath(ObjectIdentifier tableIdentifier) {
-        return String.format(
-                "root/%s.catalog/%s.db/%s",
-                tableIdentifier.getCatalogName(),
-                tableIdentifier.getDatabaseName(),
-                tableIdentifier.getObjectName());
-    }
-
-    enum StatementType {
-        CREATE_STATEMENT,
-        DROP_STATEMENT
-    }
-
-    private static class ExpectedResult {
-        private boolean success;
-        private Class<? extends Throwable> expectedType;
-        private String expectedMessage;
-
-        ExpectedResult success(boolean success) {
-            this.success = success;
-            return this;
-        }
-
-        ExpectedResult expectedType(Class<? extends Throwable> exceptionClazz) {
-            this.expectedType = exceptionClazz;
-            return this;
-        }
-
-        ExpectedResult expectedMessage(String exceptionMessage) {
-            this.expectedMessage = exceptionMessage;
-            return this;
-        }
-
-        @Override
-        public String toString() {
-            return "ExpectedResult{"
-                    + "success="
-                    + success
-                    + ", expectedType="
-                    + expectedType
-                    + ", expectedMessage='"
-                    + expectedMessage
-                    + '\''
-                    + '}';
-        }
-    }
-}
diff --git a/flink-table-store-connector/src/test/java/org/apache/flink/table/store/connector/TableStoreTestBase.java b/flink-table-store-connector/src/test/java/org/apache/flink/table/store/connector/TableStoreTestBase.java
new file mode 100644
index 0000000..4abc7f9
--- /dev/null
+++ b/flink-table-store-connector/src/test/java/org/apache/flink/table/store/connector/TableStoreTestBase.java
@@ -0,0 +1,198 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.store.connector;
+
+import org.apache.flink.api.common.RuntimeExecutionMode;
+import org.apache.flink.configuration.Configuration;
+import org.apache.flink.table.api.Schema;
+import org.apache.flink.table.api.bridge.java.StreamTableEnvironment;
+import org.apache.flink.table.api.internal.TableEnvironmentImpl;
+import org.apache.flink.table.catalog.CatalogTable;
+import org.apache.flink.table.catalog.Column;
+import org.apache.flink.table.catalog.GenericInMemoryCatalog;
+import org.apache.flink.table.catalog.ObjectIdentifier;
+import org.apache.flink.table.catalog.ResolvedCatalogTable;
+import org.apache.flink.table.catalog.ResolvedSchema;
+import org.apache.flink.table.catalog.UniqueConstraint;
+import org.apache.flink.table.store.kafka.KafkaTableTestBase;
+import org.apache.flink.table.types.DataType;
+import org.apache.flink.table.types.logical.RowType;
+import org.apache.flink.table.types.utils.TypeConversions;
+import org.apache.flink.types.Row;
+
+import org.apache.commons.io.FileUtils;
+import org.junit.Before;
+
+import java.io.IOException;
+import java.io.UncheckedIOException;
+import java.nio.file.Paths;
+import java.util.Arrays;
+import java.util.Collections;
+import java.util.List;
+import java.util.Map;
+import java.util.stream.Collectors;
+import java.util.stream.IntStream;
+
+import static org.apache.flink.table.store.connector.TableStoreFactoryOptions.CHANGE_TRACKING;
+import static org.apache.flink.table.store.file.FileStoreOptions.FILE_PATH;
+import static org.apache.flink.table.store.file.FileStoreOptions.TABLE_STORE_PREFIX;
+import static org.apache.flink.table.store.kafka.KafkaLogOptions.BOOTSTRAP_SERVERS;
+import static org.apache.flink.table.store.log.LogOptions.LOG_PREFIX;
+
+/** End-to-end test base for table store. */
+public abstract class TableStoreTestBase extends KafkaTableTestBase {
+
+    public static final String CURRENT_CATALOG = "catalog";
+    public static final String CURRENT_DATABASE = "database";
+
+    protected final RuntimeExecutionMode executionMode;
+    protected final ObjectIdentifier tableIdentifier;
+    protected final boolean enableChangeTracking;
+    protected final ExpectedResult expectedResult;
+
+    protected String rootPath;
+
+    public TableStoreTestBase(
+            RuntimeExecutionMode executionMode,
+            String tableName,
+            boolean enableChangeTracking,
+            ExpectedResult expectedResult) {
+        this.executionMode = executionMode;
+        this.tableIdentifier = ObjectIdentifier.of(CURRENT_CATALOG, CURRENT_DATABASE, tableName);
+        this.enableChangeTracking = enableChangeTracking;
+        this.expectedResult = expectedResult;
+    }
+
+    protected abstract void prepareEnv();
+
+    @Override
+    @Before
+    public void setup() {
+        super.setup();
+        env.setRuntimeMode(executionMode);
+        if (executionMode == RuntimeExecutionMode.STREAMING) {
+            env.enableCheckpointing(100);
+        }
+        tEnv = StreamTableEnvironment.create(env);
+        ((TableEnvironmentImpl) tEnv)
+                .getCatalogManager()
+                .registerCatalog(
+                        CURRENT_CATALOG,
+                        new GenericInMemoryCatalog(CURRENT_CATALOG, CURRENT_DATABASE));
+        tEnv.useCatalog(CURRENT_CATALOG);
+        try {
+            rootPath = TEMPORARY_FOLDER.newFolder().getPath();
+        } catch (IOException e) {
+            throw new UncheckedIOException(e);
+        }
+        prepareSessionContext();
+        prepareEnv();
+    }
+
+    protected void prepareSessionContext() {
+        Configuration configuration = tEnv.getConfig().getConfiguration();
+        configuration.setString(TABLE_STORE_PREFIX + FILE_PATH.key(), rootPath);
+        configuration.setString(
+                TABLE_STORE_PREFIX + LOG_PREFIX + BOOTSTRAP_SERVERS.key(), getBootstrapServers());
+        configuration.setBoolean(TABLE_STORE_PREFIX + CHANGE_TRACKING.key(), enableChangeTracking);
+    }
+
+    protected static ResolvedCatalogTable createResolvedTable(
+            Map<String, String> options, RowType rowType, List<String> partitionKeys, int[] pk) {
+        List<String> fieldNames = rowType.getFieldNames();
+        List<DataType> fieldDataTypes =
+                rowType.getChildren().stream()
+                        .map(TypeConversions::fromLogicalToDataType)
+                        .collect(Collectors.toList());
+        CatalogTable origin =
+                CatalogTable.of(
+                        Schema.newBuilder().fromFields(fieldNames, fieldDataTypes).build(),
+                        "a comment",
+                        partitionKeys,
+                        options);
+        List<Column> resolvedColumns =
+                IntStream.range(0, fieldNames.size())
+                        .mapToObj(i -> Column.physical(fieldNames.get(i), fieldDataTypes.get(i)))
+                        .collect(Collectors.toList());
+        UniqueConstraint constraint = null;
+        if (pk.length > 0) {
+            List<String> pkNames =
+                    Arrays.stream(pk).mapToObj(fieldNames::get).collect(Collectors.toList());
+            constraint = UniqueConstraint.primaryKey("pk", pkNames);
+        }
+        return new ResolvedCatalogTable(
+                origin, new ResolvedSchema(resolvedColumns, Collections.emptyList(), constraint));
+    }
+
+    protected void deleteTablePath() {
+        FileUtils.deleteQuietly(
+                Paths.get(rootPath, getRelativeFileStoreTablePath(tableIdentifier)).toFile());
+    }
+
+    protected static String getRelativeFileStoreTablePath(ObjectIdentifier tableIdentifier) {
+        return String.format(
+                "root/%s.catalog/%s.db/%s",
+                tableIdentifier.getCatalogName(),
+                tableIdentifier.getDatabaseName(),
+                tableIdentifier.getObjectName());
+    }
+
+    /** Expected result wrapper. */
+    protected static class ExpectedResult {
+        protected boolean success;
+        protected List<Row> expectedRecords;
+        protected Class<? extends Throwable> expectedType;
+        protected String expectedMessage;
+
+        ExpectedResult success(boolean success) {
+            this.success = success;
+            return this;
+        }
+
+        ExpectedResult expectedRecords(List<Row> expectedRecords) {
+            this.expectedRecords = expectedRecords;
+            return this;
+        }
+
+        ExpectedResult expectedType(Class<? extends Throwable> exceptionClazz) {
+            this.expectedType = exceptionClazz;
+            return this;
+        }
+
+        ExpectedResult expectedMessage(String exceptionMessage) {
+            this.expectedMessage = exceptionMessage;
+            return this;
+        }
+
+        @Override
+        public String toString() {
+            return "ExpectedResult{"
+                    + "success="
+                    + success
+                    + ", expectedRecords="
+                    + expectedRecords
+                    + ", expectedType="
+                    + expectedType
+                    + ", expectedMessage='"
+                    + expectedMessage
+                    + '\''
+                    + '}';
+        }
+    }
+}
diff --git a/flink-table-store-core/src/main/java/org/apache/flink/table/store/file/mergetree/MergeTreeOptions.java b/flink-table-store-core/src/main/java/org/apache/flink/table/store/file/mergetree/MergeTreeOptions.java
index d6d1ad9..4cd6170 100644
--- a/flink-table-store-core/src/main/java/org/apache/flink/table/store/file/mergetree/MergeTreeOptions.java
+++ b/flink-table-store-core/src/main/java/org/apache/flink/table/store/file/mergetree/MergeTreeOptions.java
@@ -23,6 +23,9 @@ import org.apache.flink.configuration.ConfigOptions;
 import org.apache.flink.configuration.MemorySize;
 import org.apache.flink.configuration.ReadableConfig;
 
+import java.util.HashSet;
+import java.util.Set;
+
 /** Options for merge tree. */
 public class MergeTreeOptions {
 
@@ -129,4 +132,17 @@ public class MergeTreeOptions {
                 config.get(COMPACTION_MAX_SIZE_AMPLIFICATION_PERCENT),
                 config.get(COMPACTION_SIZE_RATIO));
     }
+
+    public static Set<ConfigOption<?>> allOptions() {
+        Set<ConfigOption<?>> allOptions = new HashSet<>();
+        allOptions.add(WRITE_BUFFER_SIZE);
+        allOptions.add(PAGE_SIZE);
+        allOptions.add(TARGET_FILE_SIZE);
+        allOptions.add(NUM_SORTED_RUNS_MAX);
+        allOptions.add(NUM_LEVELS);
+        allOptions.add(COMMIT_FORCE_COMPACT);
+        allOptions.add(COMPACTION_MAX_SIZE_AMPLIFICATION_PERCENT);
+        allOptions.add(COMPACTION_SIZE_RATIO);
+        return allOptions;
+    }
 }
diff --git a/flink-table-store-core/src/main/java/org/apache/flink/table/store/file/predicate/PredicateConverter.java b/flink-table-store-core/src/main/java/org/apache/flink/table/store/file/predicate/PredicateConverter.java
index f501f59..2875e4e 100644
--- a/flink-table-store-core/src/main/java/org/apache/flink/table/store/file/predicate/PredicateConverter.java
+++ b/flink-table-store-core/src/main/java/org/apache/flink/table/store/file/predicate/PredicateConverter.java
@@ -22,6 +22,7 @@ import org.apache.flink.table.expressions.CallExpression;
 import org.apache.flink.table.expressions.Expression;
 import org.apache.flink.table.expressions.ExpressionVisitor;
 import org.apache.flink.table.expressions.FieldReferenceExpression;
+import org.apache.flink.table.expressions.ResolvedExpression;
 import org.apache.flink.table.expressions.TypeLiteralExpression;
 import org.apache.flink.table.expressions.ValueLiteralExpression;
 import org.apache.flink.table.functions.BuiltInFunctionDefinitions;
@@ -29,6 +30,8 @@ import org.apache.flink.table.functions.FunctionDefinition;
 import org.apache.flink.table.types.DataType;
 import org.apache.flink.table.types.logical.LogicalType;
 
+import javax.annotation.Nullable;
+
 import java.util.List;
 import java.util.Optional;
 import java.util.function.BiFunction;
@@ -174,6 +177,16 @@ public class PredicateConverter implements ExpressionVisitor<Predicate> {
         throw new RuntimeException("Unsupported expression: " + expression.asSummaryString());
     }
 
+    @Nullable
+    public static Predicate convert(List<ResolvedExpression> filters) {
+        return filters != null
+                ? filters.stream()
+                        .map(filter -> filter.accept(PredicateConverter.CONVERTER))
+                        .reduce(And::new)
+                        .orElse(null)
+                : null;
+    }
+
     /** Encounter an unsupported expression, the caller can choose to ignore this filter branch. */
     public static class UnsupportedExpression extends RuntimeException {}
 }