You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by lz...@apache.org on 2022/06/10 10:10:08 UTC

[flink-table-store] branch master updated: [FLINK-27707] Implement ManagedTableFactory#onCompactTable

This is an automated email from the ASF dual-hosted git repository.

lzljs3620320 pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/flink-table-store.git


The following commit(s) were added to refs/heads/master by this push:
     new da742766 [FLINK-27707] Implement ManagedTableFactory#onCompactTable
da742766 is described below

commit da7427662b24f6f0c150b2a19aa7b12c6597a0bd
Author: Jane Chan <55...@users.noreply.github.com>
AuthorDate: Fri Jun 10 18:10:03 2022 +0800

    [FLINK-27707] Implement ManagedTableFactory#onCompactTable
    
    This closes #138
---
 .../connector/StatefulPrecommittingSinkWriter.java |  28 +-
 .../flink/table/store/connector/TableStore.java    |  25 +-
 .../store/connector/TableStoreFactoryOptions.java  |  18 +-
 .../store/connector/TableStoreManagedFactory.java  |  10 +-
 .../table/store/connector/sink/StoreSink.java      |  26 +-
 .../store/connector/sink/StoreSinkCompactor.java   | 149 ++++++++++
 .../store/connector/sink/StoreSinkWriter.java      |   4 +-
 .../table/store/connector/sink/TableStoreSink.java |   3 +-
 .../connector/source/FileStoreEmptySource.java     |  99 +++++++
 .../store/connector/source/FileStoreSource.java    |  26 +-
 .../source/StaticFileStoreSplitEnumerator.java     |   2 +-
 .../store/connector/AlterTableCompactITCase.java   | 292 ++++++++++++++++++++
 .../store/connector/FileStoreTableITCase.java      |   3 +-
 .../connector/TableStoreManagedFactoryTest.java    | 179 ++++++------
 .../table/store/connector/sink/StoreSinkTest.java  |  88 +++++-
 .../table/store/connector/sink/TestFileStore.java  |  10 +
 .../connector/source/FileStoreSourceTest.java      | 299 ---------------------
 .../flink/table/store/file/FileStoreImpl.java      |  21 +-
 .../table/store/file/mergetree/Increment.java      |   6 +
 .../file/mergetree/compact/CompactManager.java     |  43 +--
 .../store/file/mergetree/compact/CompactUnit.java  |   2 +-
 .../store/file/operation/FileStoreCommitImpl.java  |  14 +-
 .../table/store/file/operation/FileStoreWrite.java |   9 +
 .../store/file/operation/FileStoreWriteImpl.java   |  41 ++-
 .../store/file/predicate/PredicateConverter.java   |   1 +
 .../store/file/utils/FileStorePathFactory.java     |  23 +-
 .../store/file/utils/KeyComparatorSupplier.java    |  49 ++++
 .../store/file/utils/PartitionedManifestMeta.java  | 131 ---------
 .../table/store/file/writer/CompactWriter.java     |  82 ++++++
 .../table/store/file/KeyValueSerializerTest.java   |   4 +-
 .../table/store/file/TestKeyValueGenerator.java    | 154 +++++++++--
 .../flink/table/store/file/data/DataFileTest.java  |  10 +-
 .../store/file/data/DataFileTestDataGenerator.java |   2 +-
 .../store/file/manifest/ManifestFileTest.java      |   7 +-
 .../store/file/manifest/ManifestListTest.java      |   4 +-
 .../file/manifest/ManifestTestDataGenerator.java   |   2 +-
 .../store/file/operation/FileStoreCommitTest.java  |  27 +-
 .../store/file/operation/FileStoreExpireTest.java  |   4 +-
 .../store/file/operation/FileStoreReadTest.java    |   4 +-
 .../store/file/operation/FileStoreScanTest.java    |   4 +-
 .../store/file/operation/TestCommitThread.java     |   4 +-
 41 files changed, 1232 insertions(+), 677 deletions(-)

diff --git a/flink-table-store-core/src/main/java/org/apache/flink/table/store/file/operation/FileStoreWrite.java b/flink-table-store-connector/src/main/java/org/apache/flink/table/store/connector/StatefulPrecommittingSinkWriter.java
similarity index 54%
copy from flink-table-store-core/src/main/java/org/apache/flink/table/store/file/operation/FileStoreWrite.java
copy to flink-table-store-connector/src/main/java/org/apache/flink/table/store/connector/StatefulPrecommittingSinkWriter.java
index 253bf8f6..f4266d82 100644
--- a/flink-table-store-core/src/main/java/org/apache/flink/table/store/file/operation/FileStoreWrite.java
+++ b/flink-table-store-connector/src/main/java/org/apache/flink/table/store/connector/StatefulPrecommittingSinkWriter.java
@@ -16,20 +16,18 @@
  * limitations under the License.
  */
 
-package org.apache.flink.table.store.file.operation;
+package org.apache.flink.table.store.connector;
 
-import org.apache.flink.table.data.binary.BinaryRowData;
-import org.apache.flink.table.store.file.writer.RecordWriter;
+import org.apache.flink.api.connector.sink2.StatefulSink;
+import org.apache.flink.api.connector.sink2.TwoPhaseCommittingSink;
+import org.apache.flink.table.data.RowData;
+import org.apache.flink.table.store.connector.sink.Committable;
 
-import java.util.concurrent.ExecutorService;
-
-/** Write operation which provides {@link RecordWriter} creation. */
-public interface FileStoreWrite {
-
-    /** Create a {@link RecordWriter} from partition and bucket. */
-    RecordWriter createWriter(BinaryRowData partition, int bucket, ExecutorService compactExecutor);
-
-    /** Create an empty {@link RecordWriter} from partition and bucket. */
-    RecordWriter createEmptyWriter(
-            BinaryRowData partition, int bucket, ExecutorService compactExecutor);
-}
+/**
+ * The base interface for file store sink writers.
+ *
+ * @param <WriterStateT> The type of the writer's state.
+ */
+public interface StatefulPrecommittingSinkWriter<WriterStateT>
+        extends StatefulSink.StatefulSinkWriter<RowData, WriterStateT>,
+                TwoPhaseCommittingSink.PrecommittingSinkWriter<RowData, Committable> {}
diff --git a/flink-table-store-connector/src/main/java/org/apache/flink/table/store/connector/TableStore.java b/flink-table-store-connector/src/main/java/org/apache/flink/table/store/connector/TableStore.java
index 3e9b1f7c..224949b8 100644
--- a/flink-table-store-connector/src/main/java/org/apache/flink/table/store/connector/TableStore.java
+++ b/flink-table-store-connector/src/main/java/org/apache/flink/table/store/connector/TableStore.java
@@ -41,6 +41,7 @@ import org.apache.flink.table.runtime.typeutils.InternalTypeInfo;
 import org.apache.flink.table.store.connector.sink.BucketStreamPartitioner;
 import org.apache.flink.table.store.connector.sink.StoreSink;
 import org.apache.flink.table.store.connector.sink.global.GlobalCommittingSinkTranslator;
+import org.apache.flink.table.store.connector.source.FileStoreEmptySource;
 import org.apache.flink.table.store.connector.source.FileStoreSource;
 import org.apache.flink.table.store.connector.source.LogHybridSourceFactory;
 import org.apache.flink.table.store.connector.source.StaticFileStoreSplitEnumerator;
@@ -52,6 +53,7 @@ import org.apache.flink.table.store.file.WriteMode;
 import org.apache.flink.table.store.file.predicate.Predicate;
 import org.apache.flink.table.store.file.schema.Schema;
 import org.apache.flink.table.store.file.schema.SchemaManager;
+import org.apache.flink.table.store.file.utils.JsonSerdeUtil;
 import org.apache.flink.table.store.log.LogOptions.LogStartupMode;
 import org.apache.flink.table.store.log.LogSinkProvider;
 import org.apache.flink.table.store.log.LogSourceProvider;
@@ -66,6 +68,8 @@ import java.util.Map;
 import java.util.Optional;
 import java.util.UUID;
 
+import static org.apache.flink.table.store.connector.TableStoreFactoryOptions.COMPACTION_MANUAL_TRIGGERED;
+import static org.apache.flink.table.store.connector.TableStoreFactoryOptions.COMPACTION_PARTITION_SPEC;
 import static org.apache.flink.table.store.file.FileStoreOptions.BUCKET;
 import static org.apache.flink.table.store.file.FileStoreOptions.CONTINUOUS_DISCOVERY_INTERVAL;
 import static org.apache.flink.table.store.file.FileStoreOptions.MERGE_ENGINE;
@@ -118,6 +122,20 @@ public class TableStore {
         return schema.partitionKeys().size() > 0;
     }
 
+    public boolean isCompactionTask() {
+        return options.get(COMPACTION_MANUAL_TRIGGERED);
+    }
+
+    @SuppressWarnings("unchecked")
+    @Nullable
+    public Map<String, String> getCompactPartSpec() {
+        String json = options.get(COMPACTION_PARTITION_SPEC);
+        if (json == null) {
+            return null;
+        }
+        return JsonSerdeUtil.fromJson(json, Map.class);
+    }
+
     public boolean valueCountMode() {
         return trimmedPrimaryKeys().size() == 0;
     }
@@ -308,8 +326,7 @@ public class TableStore {
                     continuousScanLatest,
                     projectedFields,
                     partitionPredicate,
-                    fieldPredicate,
-                    null);
+                    fieldPredicate);
         }
 
         private Source<RowData, ?, ?> buildSource() {
@@ -352,7 +369,7 @@ public class TableStore {
                             .orElse(type);
             DataStreamSource<RowData> dataStream =
                     env.fromSource(
-                            buildSource(),
+                            isCompactionTask() ? new FileStoreEmptySource() : buildSource(),
                             WatermarkStrategy.noWatermarks(),
                             tableIdentifier.asSummaryString(),
                             InternalTypeInfo.of(produceType));
@@ -428,6 +445,8 @@ public class TableStore {
                             trimmedPrimaryKeysIndex(),
                             fullPrimaryKeysIndex(),
                             numBucket,
+                            isCompactionTask(),
+                            getCompactPartSpec(),
                             lockFactory,
                             overwritePartition,
                             logSinkProvider);
diff --git a/flink-table-store-connector/src/main/java/org/apache/flink/table/store/connector/TableStoreFactoryOptions.java b/flink-table-store-connector/src/main/java/org/apache/flink/table/store/connector/TableStoreFactoryOptions.java
index b0aa9302..18d04a68 100644
--- a/flink-table-store-connector/src/main/java/org/apache/flink/table/store/connector/TableStoreFactoryOptions.java
+++ b/flink-table-store-connector/src/main/java/org/apache/flink/table/store/connector/TableStoreFactoryOptions.java
@@ -47,15 +47,20 @@ public class TableStoreFactoryOptions {
                                     + "of a partition/table.");
 
     @Internal
-    public static final ConfigOption<String> COMPACTION_SCANNED_MANIFEST =
-            ConfigOptions.key("compaction.scanned-manifest")
+    public static final ConfigOption<Boolean> COMPACTION_MANUAL_TRIGGERED =
+            ConfigOptions.key("compaction.manual-triggered")
+                    .booleanType()
+                    .defaultValue(false)
+                    .withDescription(
+                            "An internal flag to indicate a manual triggered compaction job.");
+
+    @Internal
+    public static final ConfigOption<String> COMPACTION_PARTITION_SPEC =
+            ConfigOptions.key("compaction.partition-spec")
                     .stringType()
                     .noDefaultValue()
                     .withDescription(
-                            "The serialized json string of manifest entries which are scanned during manual compaction "
-                                    + "planning phase and injected back into enriched options. The json format contains "
-                                    + "snapshot id and each partition's data file meta list (among which "
-                                    + "each data file meta is encoded by Base64 format) tagged with bucket id.");
+                            "An internal json string to record the user-specified partition spec for the manual triggered compaction.");
 
     public static final ConfigOption<String> LOG_SYSTEM =
             ConfigOptions.key("log.system")
@@ -77,7 +82,6 @@ public class TableStoreFactoryOptions {
     public static Set<ConfigOption<?>> allOptions() {
         Set<ConfigOption<?>> allOptions = new HashSet<>();
         allOptions.add(COMPACTION_RESCALE_BUCKET);
-        allOptions.add(COMPACTION_SCANNED_MANIFEST);
         allOptions.add(LOG_SYSTEM);
         allOptions.add(SINK_PARALLELISM);
         allOptions.add(SCAN_PARALLELISM);
diff --git a/flink-table-store-connector/src/main/java/org/apache/flink/table/store/connector/TableStoreManagedFactory.java b/flink-table-store-connector/src/main/java/org/apache/flink/table/store/connector/TableStoreManagedFactory.java
index e858ca5e..bdd6bb3d 100644
--- a/flink-table-store-connector/src/main/java/org/apache/flink/table/store/connector/TableStoreManagedFactory.java
+++ b/flink-table-store-connector/src/main/java/org/apache/flink/table/store/connector/TableStoreManagedFactory.java
@@ -29,6 +29,7 @@ import org.apache.flink.table.store.file.FileStoreOptions;
 import org.apache.flink.table.store.file.WriteMode;
 import org.apache.flink.table.store.file.schema.SchemaManager;
 import org.apache.flink.table.store.file.schema.UpdateSchema;
+import org.apache.flink.table.store.file.utils.JsonSerdeUtil;
 import org.apache.flink.table.store.log.LogStoreTableFactory;
 import org.apache.flink.util.Preconditions;
 
@@ -39,6 +40,8 @@ import java.util.Map;
 import java.util.Objects;
 import java.util.Optional;
 
+import static org.apache.flink.table.store.connector.TableStoreFactoryOptions.COMPACTION_MANUAL_TRIGGERED;
+import static org.apache.flink.table.store.connector.TableStoreFactoryOptions.COMPACTION_PARTITION_SPEC;
 import static org.apache.flink.table.store.connector.TableStoreFactoryOptions.ROOT_PATH;
 import static org.apache.flink.table.store.file.FileStoreOptions.BUCKET;
 import static org.apache.flink.table.store.file.FileStoreOptions.PATH;
@@ -183,6 +186,11 @@ public class TableStoreManagedFactory extends AbstractTableStoreFactory
     @Override
     public Map<String, String> onCompactTable(
             Context context, CatalogPartitionSpec catalogPartitionSpec) {
-        throw new UnsupportedOperationException("Not implement yet");
+        Map<String, String> newOptions = new HashMap<>(context.getCatalogTable().getOptions());
+        newOptions.put(COMPACTION_MANUAL_TRIGGERED.key(), String.valueOf(true));
+        newOptions.put(
+                COMPACTION_PARTITION_SPEC.key(),
+                JsonSerdeUtil.toJson(catalogPartitionSpec.getPartitionSpec()));
+        return newOptions;
     }
 }
diff --git a/flink-table-store-connector/src/main/java/org/apache/flink/table/store/connector/sink/StoreSink.java b/flink-table-store-connector/src/main/java/org/apache/flink/table/store/connector/sink/StoreSink.java
index 5a3d498c..b02823aa 100644
--- a/flink-table-store-connector/src/main/java/org/apache/flink/table/store/connector/sink/StoreSink.java
+++ b/flink-table-store-connector/src/main/java/org/apache/flink/table/store/connector/sink/StoreSink.java
@@ -27,6 +27,7 @@ import org.apache.flink.core.io.SimpleVersionedSerializer;
 import org.apache.flink.table.catalog.CatalogLock;
 import org.apache.flink.table.catalog.ObjectIdentifier;
 import org.apache.flink.table.data.RowData;
+import org.apache.flink.table.store.connector.StatefulPrecommittingSinkWriter;
 import org.apache.flink.table.store.connector.sink.global.GlobalCommittingSink;
 import org.apache.flink.table.store.file.FileStore;
 import org.apache.flink.table.store.file.WriteMode;
@@ -44,6 +45,7 @@ import javax.annotation.Nullable;
 import java.io.IOException;
 import java.io.UncheckedIOException;
 import java.util.Collection;
+import java.util.Collections;
 import java.util.Map;
 import java.util.concurrent.Callable;
 import java.util.function.Consumer;
@@ -69,6 +71,10 @@ public class StoreSink<WriterStateT, LogCommT>
 
     private final int numBucket;
 
+    private final boolean compactionTask;
+
+    @Nullable private final Map<String, String> compactPartitionSpec;
+
     @Nullable private final CatalogLock.Factory lockFactory;
 
     @Nullable private final Map<String, String> overwritePartition;
@@ -83,6 +89,8 @@ public class StoreSink<WriterStateT, LogCommT>
             int[] primaryKeys,
             int[] logPrimaryKeys,
             int numBucket,
+            boolean compactionTask,
+            @Nullable Map<String, String> compactPartitionSpec,
             @Nullable CatalogLock.Factory lockFactory,
             @Nullable Map<String, String> overwritePartition,
             @Nullable LogSinkProvider logSinkProvider) {
@@ -93,19 +101,33 @@ public class StoreSink<WriterStateT, LogCommT>
         this.primaryKeys = primaryKeys;
         this.logPrimaryKeys = logPrimaryKeys;
         this.numBucket = numBucket;
+        this.compactionTask = compactionTask;
+        this.compactPartitionSpec = compactPartitionSpec;
         this.lockFactory = lockFactory;
         this.overwritePartition = overwritePartition;
         this.logSinkProvider = logSinkProvider;
     }
 
     @Override
-    public StoreSinkWriter<WriterStateT> createWriter(InitContext initContext) throws IOException {
+    public StatefulPrecommittingSinkWriter<WriterStateT> createWriter(InitContext initContext)
+            throws IOException {
         return restoreWriter(initContext, null);
     }
 
+    @SuppressWarnings("unchecked")
     @Override
-    public StoreSinkWriter<WriterStateT> restoreWriter(
+    public StatefulPrecommittingSinkWriter<WriterStateT> restoreWriter(
             InitContext initContext, Collection<WriterStateT> states) throws IOException {
+        if (compactionTask) {
+            return (StatefulPrecommittingSinkWriter<WriterStateT>)
+                    new StoreSinkCompactor(
+                            initContext.getSubtaskId(),
+                            initContext.getNumberOfParallelSubtasks(),
+                            fileStore,
+                            compactPartitionSpec == null
+                                    ? Collections.emptyMap()
+                                    : compactPartitionSpec);
+        }
         SinkWriter<SinkRecord> logWriter = null;
         LogWriteCallback logCallback = null;
         if (logSinkProvider != null) {
diff --git a/flink-table-store-connector/src/main/java/org/apache/flink/table/store/connector/sink/StoreSinkCompactor.java b/flink-table-store-connector/src/main/java/org/apache/flink/table/store/connector/sink/StoreSinkCompactor.java
new file mode 100644
index 00000000..d5fcf374
--- /dev/null
+++ b/flink-table-store-connector/src/main/java/org/apache/flink/table/store/connector/sink/StoreSinkCompactor.java
@@ -0,0 +1,149 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.store.connector.sink;
+
+import org.apache.flink.api.connector.sink2.SinkWriter;
+import org.apache.flink.connector.file.table.FileSystemConnectorOptions;
+import org.apache.flink.table.data.RowData;
+import org.apache.flink.table.data.binary.BinaryRowData;
+import org.apache.flink.table.store.connector.StatefulPrecommittingSinkWriter;
+import org.apache.flink.table.store.file.FileStore;
+import org.apache.flink.table.store.file.data.DataFileMeta;
+import org.apache.flink.table.store.file.operation.FileStoreScan;
+import org.apache.flink.table.store.file.predicate.PredicateConverter;
+import org.apache.flink.table.store.file.utils.FileStorePathFactory;
+import org.apache.flink.table.store.file.writer.RecordWriter;
+import org.apache.flink.table.store.table.sink.FileCommittable;
+import org.apache.flink.util.concurrent.ExecutorThreadFactory;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.Collection;
+import java.util.Collections;
+import java.util.List;
+import java.util.Map;
+import java.util.Objects;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.Executors;
+
+/** A dedicated {@link SinkWriter} for manual triggered compaction. */
+public class StoreSinkCompactor implements StatefulPrecommittingSinkWriter<Void> {
+
+    private static final Logger LOG = LoggerFactory.getLogger(StoreSinkCompactor.class);
+
+    private final int subTaskId;
+    private final int numOfParallelInstances;
+
+    private final FileStore fileStore;
+    private final Map<String, String> partitionSpec;
+    private final ExecutorService compactExecutor;
+
+    public StoreSinkCompactor(
+            int subTaskId,
+            int numOfParallelInstances,
+            FileStore fileStore,
+            Map<String, String> partitionSpec) {
+        this.subTaskId = subTaskId;
+        this.numOfParallelInstances = numOfParallelInstances;
+        this.fileStore = fileStore;
+        this.partitionSpec = partitionSpec;
+        this.compactExecutor =
+                Executors.newSingleThreadScheduledExecutor(
+                        new ExecutorThreadFactory(
+                                String.format("compaction-subtask-%d", subTaskId)));
+    }
+
+    @Override
+    public void flush(boolean endOfInput) {}
+
+    @Override
+    public void write(RowData element, Context context) throws IOException, InterruptedException {
+        // nothing to write
+    }
+
+    @Override
+    public void close() throws Exception {
+        compactExecutor.shutdownNow();
+    }
+
+    @Override
+    public List<Void> snapshotState(long checkpointId) {
+        return Collections.emptyList();
+    }
+
+    @Override
+    public Collection<Committable> prepareCommit() throws IOException {
+        List<Committable> committables = new ArrayList<>();
+
+        FileStoreScan.Plan plan =
+                fileStore
+                        .newScan()
+                        .withPartitionFilter(
+                                PredicateConverter.CONVERTER.fromMap(
+                                        partitionSpec, fileStore.partitionType()))
+                        .plan();
+        for (Map.Entry<BinaryRowData, Map<Integer, List<DataFileMeta>>> partEntry :
+                plan.groupByPartFiles().entrySet()) {
+            BinaryRowData partition = partEntry.getKey();
+            for (Map.Entry<Integer, List<DataFileMeta>> bucketEntry :
+                    partEntry.getValue().entrySet()) {
+                int bucket = bucketEntry.getKey();
+                List<DataFileMeta> restoredFiles = bucketEntry.getValue();
+                if (select(partition, bucket)) {
+                    if (LOG.isDebugEnabled()) {
+                        LOG.debug(
+                                "Assign partition {}, bucket {} to subtask {}",
+                                FileStorePathFactory.getPartitionComputer(
+                                                fileStore.partitionType(),
+                                                FileSystemConnectorOptions.PARTITION_DEFAULT_NAME
+                                                        .defaultValue())
+                                        .generatePartValues(partition),
+                                bucket,
+                                subTaskId);
+                    }
+                    RecordWriter writer =
+                            fileStore
+                                    .newWrite()
+                                    .createCompactWriter(
+                                            partition.copy(),
+                                            bucket,
+                                            compactExecutor,
+                                            restoredFiles);
+                    FileCommittable committable;
+                    try {
+                        committable =
+                                new FileCommittable(
+                                        partition, bucketEntry.getKey(), writer.prepareCommit());
+                        committables.add(new Committable(Committable.Kind.FILE, committable));
+                    } catch (Exception e) {
+                        throw new IOException(e);
+                    }
+                }
+            }
+        }
+        return committables;
+    }
+
+    private boolean select(BinaryRowData partition, int bucket) {
+        return subTaskId == Math.abs(Objects.hash(partition, bucket) % numOfParallelInstances);
+    }
+}
diff --git a/flink-table-store-connector/src/main/java/org/apache/flink/table/store/connector/sink/StoreSinkWriter.java b/flink-table-store-connector/src/main/java/org/apache/flink/table/store/connector/sink/StoreSinkWriter.java
index 9a914636..62e2025e 100644
--- a/flink-table-store-connector/src/main/java/org/apache/flink/table/store/connector/sink/StoreSinkWriter.java
+++ b/flink-table-store-connector/src/main/java/org/apache/flink/table/store/connector/sink/StoreSinkWriter.java
@@ -26,6 +26,7 @@ import org.apache.flink.table.data.GenericRowData;
 import org.apache.flink.table.data.RowData;
 import org.apache.flink.table.data.binary.BinaryRowData;
 import org.apache.flink.table.data.binary.BinaryRowDataUtil;
+import org.apache.flink.table.store.connector.StatefulPrecommittingSinkWriter;
 import org.apache.flink.table.store.file.ValueKind;
 import org.apache.flink.table.store.file.WriteMode;
 import org.apache.flink.table.store.file.operation.FileStoreWrite;
@@ -54,8 +55,7 @@ import java.util.concurrent.Executors;
 
 /** A {@link SinkWriter} for dynamic store. */
 public class StoreSinkWriter<WriterStateT>
-        implements StatefulSinkWriter<RowData, WriterStateT>,
-                PrecommittingSinkWriter<RowData, Committable> {
+        implements StatefulPrecommittingSinkWriter<WriterStateT> {
 
     private static final BinaryRowData DUMMY_KEY = BinaryRowDataUtil.EMPTY_ROW;
 
diff --git a/flink-table-store-connector/src/main/java/org/apache/flink/table/store/connector/sink/TableStoreSink.java b/flink-table-store-connector/src/main/java/org/apache/flink/table/store/connector/sink/TableStoreSink.java
index 962aa0f4..0704c3ec 100644
--- a/flink-table-store-connector/src/main/java/org/apache/flink/table/store/connector/sink/TableStoreSink.java
+++ b/flink-table-store-connector/src/main/java/org/apache/flink/table/store/connector/sink/TableStoreSink.java
@@ -127,7 +127,8 @@ public class TableStoreSink
                             });
         }
         // Do not sink to log store when overwrite mode
-        final LogSinkProvider finalLogSinkProvider = overwrite ? null : logSinkProvider;
+        final LogSinkProvider finalLogSinkProvider =
+                overwrite || tableStore.isCompactionTask() ? null : logSinkProvider;
         return (DataStreamSinkProvider)
                 (providerContext, dataStream) ->
                         tableStore
diff --git a/flink-table-store-connector/src/main/java/org/apache/flink/table/store/connector/source/FileStoreEmptySource.java b/flink-table-store-connector/src/main/java/org/apache/flink/table/store/connector/source/FileStoreEmptySource.java
new file mode 100644
index 00000000..bb671f23
--- /dev/null
+++ b/flink-table-store-connector/src/main/java/org/apache/flink/table/store/connector/source/FileStoreEmptySource.java
@@ -0,0 +1,99 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.store.connector.source;
+
+import org.apache.flink.api.connector.source.Boundedness;
+import org.apache.flink.api.connector.source.ReaderOutput;
+import org.apache.flink.api.connector.source.Source;
+import org.apache.flink.api.connector.source.SourceReader;
+import org.apache.flink.api.connector.source.SourceReaderContext;
+import org.apache.flink.api.connector.source.SplitEnumerator;
+import org.apache.flink.api.connector.source.SplitEnumeratorContext;
+import org.apache.flink.core.io.InputStatus;
+import org.apache.flink.table.data.RowData;
+
+import java.util.Collections;
+import java.util.List;
+import java.util.concurrent.CompletableFuture;
+
+/** A dedicated empty source for "ALTER TABLE ... COMPACT". */
+public class FileStoreEmptySource
+        implements Source<RowData, FileStoreSourceSplit, PendingSplitsCheckpoint> {
+
+    @Override
+    public Boundedness getBoundedness() {
+        return Boundedness.BOUNDED;
+    }
+
+    @Override
+    public SourceReader<RowData, FileStoreSourceSplit> createReader(
+            SourceReaderContext readerContext) {
+        return new SourceReader<RowData, FileStoreSourceSplit>() {
+            @Override
+            public void start() {}
+
+            @Override
+            public InputStatus pollNext(ReaderOutput<RowData> output) {
+                return InputStatus.END_OF_INPUT;
+            }
+
+            @Override
+            public List<FileStoreSourceSplit> snapshotState(long checkpointId) {
+                return Collections.emptyList();
+            }
+
+            @Override
+            public CompletableFuture<Void> isAvailable() {
+                return CompletableFuture.completedFuture(null);
+            }
+
+            @Override
+            public void addSplits(List<FileStoreSourceSplit> splits) {}
+
+            @Override
+            public void notifyNoMoreSplits() {}
+
+            @Override
+            public void close() {}
+        };
+    }
+
+    @Override
+    public SplitEnumerator<FileStoreSourceSplit, PendingSplitsCheckpoint> createEnumerator(
+            SplitEnumeratorContext<FileStoreSourceSplit> context) {
+        return restoreEnumerator(context, null);
+    }
+
+    @Override
+    public SplitEnumerator<FileStoreSourceSplit, PendingSplitsCheckpoint> restoreEnumerator(
+            SplitEnumeratorContext<FileStoreSourceSplit> context,
+            PendingSplitsCheckpoint checkpoint) {
+        return new StaticFileStoreSplitEnumerator(context, null, Collections.emptyList());
+    }
+
+    @Override
+    public FileStoreSourceSplitSerializer getSplitSerializer() {
+        return new FileStoreSourceSplitSerializer();
+    }
+
+    @Override
+    public PendingSplitsCheckpointSerializer getEnumeratorCheckpointSerializer() {
+        return new PendingSplitsCheckpointSerializer(getSplitSerializer());
+    }
+}
diff --git a/flink-table-store-connector/src/main/java/org/apache/flink/table/store/connector/source/FileStoreSource.java b/flink-table-store-connector/src/main/java/org/apache/flink/table/store/connector/source/FileStoreSource.java
index 61f70997..fccd3c7e 100644
--- a/flink-table-store-connector/src/main/java/org/apache/flink/table/store/connector/source/FileStoreSource.java
+++ b/flink-table-store-connector/src/main/java/org/apache/flink/table/store/connector/source/FileStoreSource.java
@@ -18,7 +18,6 @@
 
 package org.apache.flink.table.store.connector.source;
 
-import org.apache.flink.annotation.VisibleForTesting;
 import org.apache.flink.api.connector.source.Boundedness;
 import org.apache.flink.api.connector.source.Source;
 import org.apache.flink.api.connector.source.SourceReader;
@@ -32,7 +31,6 @@ import org.apache.flink.table.store.file.WriteMode;
 import org.apache.flink.table.store.file.operation.FileStoreRead;
 import org.apache.flink.table.store.file.operation.FileStoreScan;
 import org.apache.flink.table.store.file.predicate.Predicate;
-import org.apache.flink.table.store.file.utils.PartitionedManifestMeta;
 
 import javax.annotation.Nullable;
 
@@ -66,12 +64,6 @@ public class FileStoreSource
 
     @Nullable private final Predicate fieldPredicate;
 
-    /**
-     * The partitioned manifest meta collected at planning phase when manual compaction is
-     * triggered.
-     */
-    @Nullable private final PartitionedManifestMeta specifiedPartManifests;
-
     public FileStoreSource(
             FileStore fileStore,
             WriteMode writeMode,
@@ -81,8 +73,7 @@ public class FileStoreSource
             boolean latestContinuous,
             @Nullable int[][] projectedFields,
             @Nullable Predicate partitionPredicate,
-            @Nullable Predicate fieldPredicate,
-            @Nullable PartitionedManifestMeta specifiedPartManifests) {
+            @Nullable Predicate fieldPredicate) {
         this.fileStore = fileStore;
         this.writeMode = writeMode;
         this.valueCountMode = valueCountMode;
@@ -92,7 +83,6 @@ public class FileStoreSource
         this.projectedFields = projectedFields;
         this.partitionPredicate = partitionPredicate;
         this.fieldPredicate = fieldPredicate;
-        this.specifiedPartManifests = specifiedPartManifests;
     }
 
     @Override
@@ -138,14 +128,6 @@ public class FileStoreSource
             PendingSplitsCheckpoint checkpoint) {
         FileStoreScan scan = fileStore.newScan();
 
-        if (specifiedPartManifests != null) {
-            return new StaticFileStoreSplitEnumerator(
-                    context,
-                    scan.snapshot(specifiedPartManifests.getSnapshotId()),
-                    new FileStoreSourceSplitGenerator()
-                            .createSplits(specifiedPartManifests.getManifestEntries()));
-        }
-
         if (partitionPredicate != null) {
             scan.withPartitionFilter(partitionPredicate);
         }
@@ -205,10 +187,4 @@ public class FileStoreSource
     public PendingSplitsCheckpointSerializer getEnumeratorCheckpointSerializer() {
         return new PendingSplitsCheckpointSerializer(getSplitSerializer());
     }
-
-    @VisibleForTesting
-    @Nullable
-    PartitionedManifestMeta getSpecifiedPartManifests() {
-        return specifiedPartManifests;
-    }
 }
diff --git a/flink-table-store-connector/src/main/java/org/apache/flink/table/store/connector/source/StaticFileStoreSplitEnumerator.java b/flink-table-store-connector/src/main/java/org/apache/flink/table/store/connector/source/StaticFileStoreSplitEnumerator.java
index c289a606..599417f1 100644
--- a/flink-table-store-connector/src/main/java/org/apache/flink/table/store/connector/source/StaticFileStoreSplitEnumerator.java
+++ b/flink-table-store-connector/src/main/java/org/apache/flink/table/store/connector/source/StaticFileStoreSplitEnumerator.java
@@ -44,7 +44,7 @@ public class StaticFileStoreSplitEnumerator
 
     public StaticFileStoreSplitEnumerator(
             SplitEnumeratorContext<FileStoreSourceSplit> context,
-            Snapshot snapshot,
+            @Nullable Snapshot snapshot,
             Collection<FileStoreSourceSplit> splits) {
         this.context = context;
         this.snapshot = snapshot;
diff --git a/flink-table-store-connector/src/test/java/org/apache/flink/table/store/connector/AlterTableCompactITCase.java b/flink-table-store-connector/src/test/java/org/apache/flink/table/store/connector/AlterTableCompactITCase.java
new file mode 100644
index 00000000..d664f5ee
--- /dev/null
+++ b/flink-table-store-connector/src/test/java/org/apache/flink/table/store/connector/AlterTableCompactITCase.java
@@ -0,0 +1,292 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.store.connector;
+
+import org.apache.flink.core.fs.Path;
+import org.apache.flink.table.catalog.ObjectIdentifier;
+import org.apache.flink.table.data.RowData;
+import org.apache.flink.table.data.binary.BinaryRowData;
+import org.apache.flink.table.store.file.KeyValue;
+import org.apache.flink.table.store.file.Snapshot;
+import org.apache.flink.table.store.file.TestKeyValueGenerator;
+import org.apache.flink.table.store.file.ValueKind;
+import org.apache.flink.table.store.file.utils.SnapshotFinder;
+import org.apache.flink.types.Row;
+import org.apache.flink.types.RowKind;
+
+import org.junit.Test;
+
+import java.io.IOException;
+import java.net.URI;
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.List;
+import java.util.Map;
+import java.util.Random;
+import java.util.stream.Collectors;
+
+import static org.apache.flink.table.planner.factories.TestValuesTableFactory.changelogRow;
+import static org.apache.flink.table.store.file.FileStoreOptions.relativeTablePath;
+import static org.apache.flink.table.store.file.TestKeyValueGenerator.GeneratorMode.MULTI_PARTITIONED;
+import static org.apache.flink.table.store.file.TestKeyValueGenerator.GeneratorMode.NON_PARTITIONED;
+import static org.apache.flink.table.store.file.TestKeyValueGenerator.GeneratorMode.SINGLE_PARTITIONED;
+import static org.assertj.core.api.Assertions.assertThat;
+
+/** ITCase for 'ALTER TABLE ... COMPACT'. */
+public class AlterTableCompactITCase extends FileStoreTableITCase {
+
+    private TestKeyValueGenerator generator;
+
+    @Override
+    protected List<String> ddl() {
+        return Arrays.asList(
+                "CREATE TABLE IF NOT EXISTS T0 (\n"
+                        + "shopId INT\n, "
+                        + "orderId BIGINT NOT NULL\n, "
+                        + "itemId BIGINT)",
+                "CREATE TABLE IF NOT EXISTS T1 (\n"
+                        + "dt STRING\n, "
+                        + "shopId INT\n, "
+                        + "orderId BIGINT NOT NULL\n, "
+                        + "itemId BIGINT)\n "
+                        + "PARTITIONED BY (dt)",
+                "CREATE TABLE IF NOT EXISTS T2 (\n"
+                        + "dt STRING\n, "
+                        + "hr INT\n, "
+                        + "shopId INT\n, "
+                        + "orderId BIGINT NOT NULL\n, "
+                        + "itemId BIGINT)"
+                        + "PARTITIONED BY (dt, hr)");
+    }
+
+    @Test
+    public void testNonPartitioned() throws IOException {
+        generator = new TestKeyValueGenerator(NON_PARTITIONED);
+        Random random = new Random();
+        innerTest("T0", random.nextInt(10) + 1, NON_PARTITIONED);
+    }
+
+    @Test
+    public void testSinglePartitioned() throws IOException {
+        generator = new TestKeyValueGenerator(SINGLE_PARTITIONED);
+        Random random = new Random();
+        innerTest("T1", random.nextInt(10) + 1, SINGLE_PARTITIONED);
+    }
+
+    @Test
+    public void testMultiPartitioned() throws IOException {
+        generator = new TestKeyValueGenerator(MULTI_PARTITIONED);
+        Random random = new Random();
+        innerTest("T2", random.nextInt(10) + 1, MULTI_PARTITIONED);
+    }
+
+    private void innerTest(String tableName, int batchNum, TestKeyValueGenerator.GeneratorMode mode)
+            throws IOException {
+        // increase trigger to avoid auto-compaction
+        batchSql(
+                String.format(
+                        "ALTER TABLE %s SET ('num-sorted-run.compaction-trigger' = '50')",
+                        tableName));
+        batchSql(
+                String.format(
+                        "ALTER TABLE %s SET ('num-sorted-run.stop-trigger' = '50')", tableName));
+
+        Random random = new Random();
+        List<KeyValue> dataset = new ArrayList<>();
+        long latestSnapshot = 0L;
+        for (int i = 0; i < batchNum; i++) {
+            List<KeyValue> data = generateData(random.nextInt(200) + 1);
+            String insertQuery =
+                    String.format(
+                            "INSERT INTO %s VALUES \n%s",
+                            tableName,
+                            data.stream()
+                                    .map(kv -> kvAsString(kv, mode))
+                                    .collect(Collectors.joining(",\n")));
+            batchSql(insertQuery);
+            Snapshot snapshot = findLatestSnapshot(tableName);
+            assertThat(snapshot.commitKind()).isEqualTo(Snapshot.CommitKind.APPEND);
+            latestSnapshot = snapshot.id();
+            dataset.addAll(data);
+        }
+        if (mode == NON_PARTITIONED) {
+            String compactQuery = String.format("ALTER TABLE %s COMPACT", tableName);
+            String selectQuery = String.format("SELECT * FROM %s", tableName);
+            compactAndCheck(
+                    tableName,
+                    compactQuery,
+                    selectQuery,
+                    latestSnapshot,
+                    dataset.stream()
+                            .map(kv -> convertToRow(kv, mode))
+                            .collect(Collectors.toList()));
+        } else {
+            List<BinaryRowData> partitions =
+                    dataset.stream()
+                            .map(kv -> generator.getPartition(kv))
+                            .distinct()
+                            .collect(Collectors.toList());
+            while (!partitions.isEmpty()) {
+                BinaryRowData part = pickPartition(partitions);
+                Map<String, String> partSpec = TestKeyValueGenerator.toPartitionMap(part, mode);
+                String compactQuery =
+                        String.format(
+                                "ALTER TABLE %s PARTITION (%s) COMPACT",
+                                tableName, partAsString(partSpec, false));
+                String selectQuery =
+                        String.format(
+                                "SELECT * FROM %s WHERE %s",
+                                tableName, partAsString(partSpec, true));
+                compactAndCheck(
+                        tableName,
+                        compactQuery,
+                        selectQuery,
+                        latestSnapshot,
+                        dataset.stream()
+                                .filter(kv -> partFilter(kv, part, mode))
+                                .map(kv -> convertToRow(kv, mode))
+                                .collect(Collectors.toList()));
+                latestSnapshot = findLatestSnapshot(tableName).id();
+            }
+        }
+    }
+
+    private void compactAndCheck(
+            String tableName,
+            String compactQuery,
+            String selectQuery,
+            long latestSnapshot,
+            List<Row> expectedData)
+            throws IOException {
+        batchSql(compactQuery);
+        Snapshot snapshot = findLatestSnapshot(tableName);
+        assertThat(snapshot.id()).isEqualTo(latestSnapshot + 1);
+        assertThat(snapshot.commitKind()).isEqualTo(Snapshot.CommitKind.COMPACT);
+        // check idempotence
+        batchSql(compactQuery);
+        assertThat(findLatestSnapshot(tableName).id()).isEqualTo(snapshot.id());
+
+        // read data
+        List<Row> readData = batchSql(selectQuery);
+        assertThat(readData).containsExactlyInAnyOrderElementsOf(expectedData);
+    }
+
+    private boolean partFilter(
+            KeyValue kv, BinaryRowData partition, TestKeyValueGenerator.GeneratorMode mode) {
+        RowData record = kv.value();
+        if (mode == SINGLE_PARTITIONED) {
+            return record.getString(0).equals(partition.getString(0));
+        } else if (mode == MULTI_PARTITIONED) {
+            return record.getString(0).equals(partition.getString(0))
+                    && record.getInt(1) == partition.getInt(1);
+        }
+        return true;
+    }
+
+    private String partAsString(Map<String, String> partSpec, boolean predicate) {
+        String dt = String.format("dt = '%s'", partSpec.get("dt"));
+        String hr = partSpec.get("hr");
+        if (hr == null) {
+            return dt;
+        }
+        hr = String.format("hr = %s", hr);
+        return predicate ? String.join(" AND ", dt, hr) : String.join(", ", dt, hr);
+    }
+
+    private BinaryRowData pickPartition(List<BinaryRowData> partitions) {
+        Random random = new Random();
+        int idx = random.nextInt(partitions.size());
+        return partitions.remove(idx);
+    }
+
+    private List<KeyValue> generateData(int numRecords) {
+        List<KeyValue> data = new ArrayList<>();
+        for (int i = 0; i < numRecords; i++) {
+            KeyValue kv = generator.next();
+            if (kv.valueKind() == ValueKind.ADD) {
+                data.add(kv);
+            }
+        }
+        return data;
+    }
+
+    private Row convertToRow(KeyValue keyValue, TestKeyValueGenerator.GeneratorMode mode) {
+        byte kind = keyValue.valueKind().toByteValue();
+        RowData record = keyValue.value();
+        String rowKind = RowKind.fromByteValue(kind == 0 ? kind : 3).shortString();
+        if (mode == NON_PARTITIONED) {
+            return changelogRow(rowKind, record.getInt(0), record.getLong(1), record.getLong(2));
+        } else if (mode == SINGLE_PARTITIONED) {
+            return changelogRow(
+                    rowKind,
+                    record.getString(0).toString(),
+                    record.getInt(1),
+                    record.getLong(2),
+                    record.getLong(3));
+        }
+        return changelogRow(
+                rowKind,
+                record.getString(0).toString(),
+                record.getInt(1),
+                record.getInt(2),
+                record.getLong(3),
+                record.getLong(4));
+    }
+
+    private String kvAsString(KeyValue keyValue, TestKeyValueGenerator.GeneratorMode mode) {
+        RowData record = keyValue.value();
+        switch (mode) {
+            case NON_PARTITIONED:
+                return String.format(
+                        "(%d, %d, %d)", record.getInt(0), record.getLong(1), record.getLong(2));
+            case SINGLE_PARTITIONED:
+                return String.format(
+                        "('%s', %d, %d, %d)",
+                        record.getString(0),
+                        record.getInt(1),
+                        record.getLong(2),
+                        record.getLong(3));
+            case MULTI_PARTITIONED:
+                return String.format(
+                        "('%s', %d, %d, %d, %d)",
+                        record.getString(0),
+                        record.getInt(1),
+                        record.getInt(2),
+                        record.getLong(3),
+                        record.getLong(4));
+            default:
+                throw new UnsupportedOperationException("unsupported mode");
+        }
+    }
+
+    private String getSnapshotDir(String tableName) {
+        return path
+                + relativeTablePath(
+                        ObjectIdentifier.of(
+                                bEnv.getCurrentCatalog(), bEnv.getCurrentDatabase(), tableName))
+                + "/snapshot";
+    }
+
+    private Snapshot findLatestSnapshot(String tableName) throws IOException {
+        String snapshotDir = getSnapshotDir(tableName);
+        Long latest = SnapshotFinder.findLatest(new Path(URI.create(snapshotDir)));
+        return Snapshot.fromPath(
+                new Path(URI.create(snapshotDir + String.format("/snapshot-%d", latest))));
+    }
+}
diff --git a/flink-table-store-connector/src/test/java/org/apache/flink/table/store/connector/FileStoreTableITCase.java b/flink-table-store-connector/src/test/java/org/apache/flink/table/store/connector/FileStoreTableITCase.java
index 59f3d966..002480fd 100644
--- a/flink-table-store-connector/src/test/java/org/apache/flink/table/store/connector/FileStoreTableITCase.java
+++ b/flink-table-store-connector/src/test/java/org/apache/flink/table/store/connector/FileStoreTableITCase.java
@@ -44,13 +44,14 @@ public abstract class FileStoreTableITCase extends AbstractTestBase {
 
     protected TableEnvironment bEnv;
     protected TableEnvironment sEnv;
+    protected String path;
 
     @Before
     public void before() throws IOException {
         bEnv = TableEnvironment.create(EnvironmentSettings.newInstance().inBatchMode().build());
         sEnv = TableEnvironment.create(EnvironmentSettings.newInstance().inStreamingMode().build());
         sEnv.getConfig().getConfiguration().set(CHECKPOINTING_INTERVAL, Duration.ofMillis(100));
-        String path = TEMPORARY_FOLDER.newFolder().toURI().toString();
+        path = TEMPORARY_FOLDER.newFolder().toURI().toString();
         prepareEnv(bEnv, path);
         prepareEnv(sEnv, path);
     }
diff --git a/flink-table-store-connector/src/test/java/org/apache/flink/table/store/connector/TableStoreManagedFactoryTest.java b/flink-table-store-connector/src/test/java/org/apache/flink/table/store/connector/TableStoreManagedFactoryTest.java
index ed756242..7716526a 100644
--- a/flink-table-store-connector/src/test/java/org/apache/flink/table/store/connector/TableStoreManagedFactoryTest.java
+++ b/flink-table-store-connector/src/test/java/org/apache/flink/table/store/connector/TableStoreManagedFactoryTest.java
@@ -21,13 +21,14 @@ package org.apache.flink.table.store.connector;
 import org.apache.flink.configuration.Configuration;
 import org.apache.flink.table.api.Schema;
 import org.apache.flink.table.api.TableException;
+import org.apache.flink.table.catalog.CatalogPartitionSpec;
 import org.apache.flink.table.catalog.CatalogTable;
 import org.apache.flink.table.catalog.ObjectIdentifier;
 import org.apache.flink.table.catalog.ResolvedCatalogTable;
 import org.apache.flink.table.catalog.ResolvedSchema;
 import org.apache.flink.table.factories.DynamicTableFactory;
 import org.apache.flink.table.factories.FactoryUtil;
-import org.apache.flink.table.store.file.TestKeyValueGenerator;
+import org.apache.flink.table.store.file.utils.JsonSerdeUtil;
 import org.apache.flink.table.store.log.LogOptions;
 import org.apache.flink.table.types.logical.RowType;
 
@@ -36,11 +37,11 @@ import org.junit.jupiter.api.io.TempDir;
 import org.junit.jupiter.params.ParameterizedTest;
 import org.junit.jupiter.params.provider.Arguments;
 import org.junit.jupiter.params.provider.MethodSource;
+import org.junit.jupiter.params.provider.ValueSource;
 
 import java.io.File;
 import java.nio.file.Path;
 import java.nio.file.Paths;
-import java.util.Arrays;
 import java.util.Collections;
 import java.util.HashMap;
 import java.util.List;
@@ -49,14 +50,22 @@ import java.util.UUID;
 import java.util.function.Predicate;
 import java.util.stream.Stream;
 
+import static java.util.Collections.emptyList;
 import static java.util.Collections.emptyMap;
+import static java.util.Collections.singletonList;
 import static java.util.Collections.singletonMap;
+import static org.apache.flink.table.store.connector.TableStoreFactoryOptions.COMPACTION_MANUAL_TRIGGERED;
+import static org.apache.flink.table.store.connector.TableStoreFactoryOptions.COMPACTION_PARTITION_SPEC;
 import static org.apache.flink.table.store.connector.TableStoreFactoryOptions.ROOT_PATH;
 import static org.apache.flink.table.store.connector.TableStoreTestBase.createResolvedTable;
 import static org.apache.flink.table.store.file.FileStoreOptions.BUCKET;
 import static org.apache.flink.table.store.file.FileStoreOptions.PATH;
 import static org.apache.flink.table.store.file.FileStoreOptions.TABLE_STORE_PREFIX;
-import static org.apache.flink.table.store.file.FileStoreOptions.relativeTablePath;
+import static org.apache.flink.table.store.file.FileStoreOptions.path;
+import static org.apache.flink.table.store.file.TestKeyValueGenerator.DEFAULT_PART_TYPE;
+import static org.apache.flink.table.store.file.TestKeyValueGenerator.DEFAULT_ROW_TYPE;
+import static org.apache.flink.table.store.file.TestKeyValueGenerator.GeneratorMode.MULTI_PARTITIONED;
+import static org.apache.flink.table.store.file.TestKeyValueGenerator.getPrimaryKeys;
 import static org.apache.flink.table.store.kafka.KafkaLogOptions.BOOTSTRAP_SERVERS;
 import static org.apache.flink.table.store.log.LogOptions.CONSISTENCY;
 import static org.apache.flink.table.store.log.LogOptions.LOG_PREFIX;
@@ -66,8 +75,11 @@ import static org.assertj.core.api.Assertions.assertThatThrownBy;
 /** Test cases for {@link TableStoreManagedFactory}. */
 public class TableStoreManagedFactoryTest {
 
+    private static final String CATALOG = "catalog";
+    private static final String DATABASE = "database";
+    private static final String TABLE = "table";
     private static final ObjectIdentifier TABLE_IDENTIFIER =
-            ObjectIdentifier.of("catalog", "database", "table");
+            ObjectIdentifier.of(CATALOG, DATABASE, TABLE);
 
     private final TableStoreManagedFactory tableStoreManagedFactory =
             new TableStoreManagedFactory();
@@ -76,12 +88,12 @@ public class TableStoreManagedFactoryTest {
     private DynamicTableFactory.Context context;
 
     @ParameterizedTest
-    @MethodSource("providingOptions")
+    @MethodSource("provideOptionsToEnrich")
     public void testEnrichOptions(
             Map<String, String> sessionOptions,
             Map<String, String> tableOptions,
             Map<String, String> expectedEnrichedOptions) {
-        context = createTableContext(sessionOptions, tableOptions);
+        context = createNonEnrichedContext(sessionOptions, tableOptions);
         Map<String, String> actualEnrichedOptions = tableStoreManagedFactory.enrichOptions(context);
         assertThat(actualEnrichedOptions)
                 .containsExactlyInAnyOrderEntriesOf(expectedEnrichedOptions);
@@ -92,12 +104,12 @@ public class TableStoreManagedFactoryTest {
         Map<String, String> sessionMap = new HashMap<>();
         sessionMap.put("table-store.root-path", "my_path");
         sessionMap.put("table-store.path", "another_path");
-        context = createTableContext(sessionMap, emptyMap());
+        context = createNonEnrichedContext(sessionMap, emptyMap());
         assertThatThrownBy(() -> tableStoreManagedFactory.enrichOptions(context))
                 .hasMessage(
                         "Managed table can not contain table path. You need to remove path in table options or session config.");
 
-        context = createTableContext(emptyMap(), emptyMap());
+        context = createNonEnrichedContext(emptyMap(), emptyMap());
         assertThatThrownBy(() -> tableStoreManagedFactory.enrichOptions(context))
                 .hasMessage(
                         "Please specify a root path by setting session level configuration as `SET 'table-store.root-path' = '...'`.");
@@ -109,13 +121,13 @@ public class TableStoreManagedFactoryTest {
         sessionMap.put("table-store.root-path", "my_path");
         sessionMap.put("table-store.log.system", "kafka");
         sessionMap.put("table-store.log.topic", "my_topic");
-        context = createTableContext(sessionMap, emptyMap());
+        context = createNonEnrichedContext(sessionMap, emptyMap());
         assertThatThrownBy(() -> tableStoreManagedFactory.enrichOptions(context))
                 .hasMessage(
                         "Managed table can not contain custom topic. You need to remove topic in table options or session config.");
 
         sessionMap.remove("table-store.log.topic");
-        context = createTableContext(sessionMap, emptyMap());
+        context = createNonEnrichedContext(sessionMap, emptyMap());
         Map<String, String> enriched = tableStoreManagedFactory.enrichOptions(context);
 
         Map<String, String> expected = new HashMap<>();
@@ -126,13 +138,10 @@ public class TableStoreManagedFactoryTest {
     }
 
     @ParameterizedTest
-    @MethodSource("providingEnrichedOptionsForCreation")
+    @MethodSource("provideOptionsToCreate")
     public void testOnCreateTable(Map<String, String> enrichedOptions, boolean ignoreIfExists) {
-        context = enrichContext(createTableContext(emptyMap(), enrichedOptions));
-        Path expectedPath =
-                Paths.get(
-                        sharedTempDir.toAbsolutePath().toString(),
-                        relativeTablePath(TABLE_IDENTIFIER));
+        context = createEnrichedContext(enrichedOptions);
+        Path expectedPath = Paths.get(path(enrichedOptions).getPath());
         boolean exist = expectedPath.toFile().exists();
         if (ignoreIfExists || !exist) {
             tableStoreManagedFactory.onCreateTable(context, ignoreIfExists);
@@ -156,26 +165,11 @@ public class TableStoreManagedFactoryTest {
         }
     }
 
-    private DynamicTableFactory.Context enrichContext(DynamicTableFactory.Context context) {
-        Map<String, String> newOptions = tableStoreManagedFactory.enrichOptions(context);
-        ResolvedCatalogTable table = context.getCatalogTable().copy(newOptions);
-        return new FactoryUtil.DefaultDynamicTableContext(
-                context.getObjectIdentifier(),
-                table,
-                emptyMap(),
-                context.getConfiguration(),
-                context.getClassLoader(),
-                context.isTemporary());
-    }
-
     @ParameterizedTest
-    @MethodSource("providingEnrichedOptionsForDrop")
+    @MethodSource("provideOptionsToDrop")
     public void testOnDropTable(Map<String, String> enrichedOptions, boolean ignoreIfNotExists) {
-        context = enrichContext(createTableContext(emptyMap(), enrichedOptions));
-        Path expectedPath =
-                Paths.get(
-                        sharedTempDir.toAbsolutePath().toString(),
-                        relativeTablePath(TABLE_IDENTIFIER));
+        context = createEnrichedContext(enrichedOptions);
+        Path expectedPath = Paths.get(path(enrichedOptions).getPath());
         boolean exist = expectedPath.toFile().exists();
         if (exist || ignoreIfNotExists) {
             tableStoreManagedFactory.onDropTable(context, ignoreIfNotExists);
@@ -214,7 +208,7 @@ public class TableStoreManagedFactoryTest {
     }
 
     @ParameterizedTest
-    @MethodSource("providingResolvedTable")
+    @MethodSource("provideResolvedTable")
     public void testCreateAndCheckTableStore(
             RowType rowType,
             List<String> partitions,
@@ -223,18 +217,12 @@ public class TableStoreManagedFactoryTest {
         ResolvedCatalogTable catalogTable =
                 createResolvedTable(
                         singletonMap(
-                                "path", sharedTempDir.toAbsolutePath() + "/" + UUID.randomUUID()),
+                                PATH.key(),
+                                sharedTempDir.toAbsolutePath() + "/" + UUID.randomUUID()),
                         rowType,
                         partitions,
                         primaryKeys);
-        context =
-                new FactoryUtil.DefaultDynamicTableContext(
-                        TABLE_IDENTIFIER,
-                        catalogTable,
-                        emptyMap(),
-                        Configuration.fromMap(emptyMap()),
-                        Thread.currentThread().getContextClassLoader(),
-                        false);
+        context = createEnrichedContext(TABLE_IDENTIFIER, catalogTable);
         if (expectedResult.success) {
             tableStoreManagedFactory.onCreateTable(context, false);
             TableStore tableStore = AbstractTableStoreFactory.buildTableStore(context);
@@ -256,9 +244,59 @@ public class TableStoreManagedFactoryTest {
         }
     }
 
+    @ParameterizedTest
+    @ValueSource(ints = {0, 1, 2})
+    public void testOnCompactTable(int partitionNum) {
+        context = createEnrichedContext(Collections.emptyMap());
+
+        Map<String, String> partSpec =
+                partitionNum == 0
+                        ? Collections.emptyMap()
+                        : partitionNum == 1 ? of("foo", "bar") : of("foo", "bar", "meow", "burr");
+        CatalogPartitionSpec catalogPartSpec = new CatalogPartitionSpec(partSpec);
+        Map<String, String> newOptions =
+                tableStoreManagedFactory.onCompactTable(context, catalogPartSpec);
+        assertThat(newOptions)
+                .containsEntry(COMPACTION_MANUAL_TRIGGERED.key(), String.valueOf(true));
+        assertThat(newOptions)
+                .containsEntry(COMPACTION_PARTITION_SPEC.key(), JsonSerdeUtil.toJson(partSpec));
+    }
+
     // ~ Tools ------------------------------------------------------------------
 
-    private static Stream<Arguments> providingOptions() {
+    private static ResolvedCatalogTable getDummyTable(Map<String, String> tableOptions) {
+        return new ResolvedCatalogTable(
+                CatalogTable.of(Schema.derived(), "a comment", emptyList(), tableOptions),
+                ResolvedSchema.of(emptyList()));
+    }
+
+    private static DynamicTableFactory.Context createNonEnrichedContext(
+            Map<String, String> sessionOptions, Map<String, String> tableOptions) {
+        return new FactoryUtil.DefaultDynamicTableContext(
+                TABLE_IDENTIFIER,
+                getDummyTable(tableOptions),
+                emptyMap(),
+                Configuration.fromMap(sessionOptions),
+                Thread.currentThread().getContextClassLoader(),
+                false);
+    }
+
+    private static DynamicTableFactory.Context createEnrichedContext(Map<String, String> options) {
+        return createEnrichedContext(TABLE_IDENTIFIER, getDummyTable(options));
+    }
+
+    private static DynamicTableFactory.Context createEnrichedContext(
+            ObjectIdentifier tableIdentifier, ResolvedCatalogTable catalogTable) {
+        return new FactoryUtil.DefaultDynamicTableContext(
+                tableIdentifier,
+                catalogTable,
+                emptyMap(),
+                Configuration.fromMap(emptyMap()),
+                Thread.currentThread().getContextClassLoader(),
+                false);
+    }
+
+    private static Stream<Arguments> provideOptionsToEnrich() {
         Map<String, String> enrichedOptions =
                 of(
                         BUCKET.key(),
@@ -315,16 +353,17 @@ public class TableStoreManagedFactoryTest {
         return expected;
     }
 
-    private static Stream<Arguments> providingEnrichedOptionsForCreation() {
-        Map<String, String> enrichedOptions = new HashMap<>();
-        enrichedOptions.put(ROOT_PATH.key(), sharedTempDir.toAbsolutePath().toString());
+    private static Stream<Arguments> provideOptionsToCreate() {
+        Map<String, String> enrichedOptions =
+                of(ROOT_PATH.key(), sharedTempDir.toAbsolutePath().toString());
+        enrichedOptions = generateTablePath(enrichedOptions);
         return Stream.of(
                 Arguments.of(enrichedOptions, false),
                 Arguments.of(enrichedOptions, true),
                 Arguments.of(enrichedOptions, false));
     }
 
-    private static Stream<Arguments> providingEnrichedOptionsForDrop() {
+    private static Stream<Arguments> provideOptionsToDrop() {
         File tablePath =
                 Paths.get(
                                 sharedTempDir.toAbsolutePath().toString(),
@@ -333,30 +372,31 @@ public class TableStoreManagedFactoryTest {
         if (!tablePath.exists()) {
             tablePath.mkdirs();
         }
-        Map<String, String> enrichedOptions = new HashMap<>();
-        enrichedOptions.put(ROOT_PATH.key(), sharedTempDir.toAbsolutePath().toString());
+        Map<String, String> enrichedOptions =
+                of(ROOT_PATH.key(), sharedTempDir.toAbsolutePath().toString());
+        enrichedOptions = generateTablePath(enrichedOptions);
         return Stream.of(
                 Arguments.of(enrichedOptions, false),
                 Arguments.of(enrichedOptions, true),
                 Arguments.of(enrichedOptions, false));
     }
 
-    private static Stream<Arguments> providingResolvedTable() {
-        RowType rowType = TestKeyValueGenerator.ROW_TYPE;
+    private static Stream<Arguments> provideResolvedTable() {
+        RowType rowType = DEFAULT_ROW_TYPE;
         // success case
         Arguments arg0 =
                 Arguments.of(
                         rowType,
-                        Arrays.asList("dt", "hr"),
-                        Arrays.asList("dt", "hr", "shopId"), // pk is [dt, hr, shopId]
+                        DEFAULT_PART_TYPE.getFieldNames(), // partition is [dt, hr]
+                        getPrimaryKeys(MULTI_PARTITIONED), // pk is [dt, hr, shopId]
                         new TableStoreTestBase.ExpectedResult().success(true));
 
         // failed case: pk doesn't contain partition key
         Arguments arg1 =
                 Arguments.of(
                         rowType,
-                        Arrays.asList("dt", "hr"),
-                        Collections.singletonList("shopId"), // pk is [shopId]
+                        DEFAULT_PART_TYPE.getFieldNames(), // partition is [dt, hr]
+                        singletonList("shopId"), // pk is [shopId]
                         new TableStoreTestBase.ExpectedResult()
                                 .success(false)
                                 .expectedType(IllegalStateException.class)
@@ -367,13 +407,13 @@ public class TableStoreManagedFactoryTest {
         Arguments arg2 =
                 Arguments.of(
                         rowType,
-                        Arrays.asList("dt", "hr", "shopId"),
-                        Arrays.asList("dt", "hr", "shopId"), // pk is [dt, hr, shopId]
+                        DEFAULT_PART_TYPE.getFieldNames(), // partition is [dt, hr]
+                        DEFAULT_PART_TYPE.getFieldNames(), // pk is [dt, hr]
                         new TableStoreTestBase.ExpectedResult()
                                 .success(false)
                                 .expectedType(IllegalStateException.class)
                                 .expectedMessage(
-                                        "Primary key constraint [dt, hr, shopId] should not be same with partition fields [dt, hr, shopId],"
+                                        "Primary key constraint [dt, hr] should not be same with partition fields [dt, hr],"
                                                 + " this will result in only one record in a partition"));
 
         return Stream.of(arg0, arg1, arg2);
@@ -391,25 +431,6 @@ public class TableStoreManagedFactoryTest {
         return newOptions;
     }
 
-    private static DynamicTableFactory.Context createTableContext(
-            Map<String, String> sessionOptions, Map<String, String> tableOptions) {
-        ResolvedCatalogTable resolvedCatalogTable =
-                new ResolvedCatalogTable(
-                        CatalogTable.of(
-                                Schema.derived(),
-                                "a comment",
-                                Collections.emptyList(),
-                                tableOptions),
-                        ResolvedSchema.of(Collections.emptyList()));
-        return new FactoryUtil.DefaultDynamicTableContext(
-                TABLE_IDENTIFIER,
-                resolvedCatalogTable,
-                emptyMap(),
-                Configuration.fromMap(sessionOptions),
-                Thread.currentThread().getContextClassLoader(),
-                false);
-    }
-
     private static Map<String, String> of(String... kvs) {
         assert kvs != null && kvs.length % 2 == 0;
         Map<String, String> map = new HashMap<>();
diff --git a/flink-table-store-connector/src/test/java/org/apache/flink/table/store/connector/sink/StoreSinkTest.java b/flink-table-store-connector/src/test/java/org/apache/flink/table/store/connector/sink/StoreSinkTest.java
index 47ba4fd0..35227a39 100644
--- a/flink-table-store-connector/src/test/java/org/apache/flink/table/store/connector/sink/StoreSinkTest.java
+++ b/flink-table-store-connector/src/test/java/org/apache/flink/table/store/connector/sink/StoreSinkTest.java
@@ -18,11 +18,17 @@
 
 package org.apache.flink.table.store.connector.sink;
 
+import org.apache.flink.api.common.operators.MailboxExecutor;
+import org.apache.flink.api.common.operators.ProcessingTimeService;
+import org.apache.flink.api.common.serialization.SerializationSchema;
+import org.apache.flink.api.connector.sink2.Sink;
+import org.apache.flink.metrics.groups.SinkWriterMetricGroup;
 import org.apache.flink.table.catalog.CatalogLock;
 import org.apache.flink.table.catalog.ObjectIdentifier;
 import org.apache.flink.table.data.GenericRowData;
 import org.apache.flink.table.data.RowData;
 import org.apache.flink.table.data.binary.BinaryRowData;
+import org.apache.flink.table.store.connector.StatefulPrecommittingSinkWriter;
 import org.apache.flink.table.store.connector.sink.TestFileStore.TestRecordWriter;
 import org.apache.flink.table.store.file.WriteMode;
 import org.apache.flink.table.store.file.manifest.ManifestCommittable;
@@ -31,6 +37,7 @@ import org.apache.flink.table.types.logical.BigIntType;
 import org.apache.flink.table.types.logical.IntType;
 import org.apache.flink.table.types.logical.RowType;
 import org.apache.flink.types.RowKind;
+import org.apache.flink.util.UserCodeClassLoader;
 
 import org.junit.Assume;
 import org.junit.Before;
@@ -38,11 +45,14 @@ import org.junit.Test;
 import org.junit.runner.RunWith;
 import org.junit.runners.Parameterized;
 
+import javax.annotation.Nullable;
+
 import java.util.Arrays;
 import java.util.Collections;
 import java.util.HashMap;
 import java.util.List;
 import java.util.Map;
+import java.util.OptionalLong;
 import java.util.concurrent.Callable;
 
 import static org.apache.flink.table.store.file.mergetree.compact.CompactManagerTest.row;
@@ -126,6 +136,8 @@ public class StoreSinkTest {
                         primaryKeys,
                         primaryKeys,
                         2,
+                        false,
+                        null,
                         () -> lock,
                         new HashMap<>(),
                         null);
@@ -187,6 +199,26 @@ public class StoreSinkTest {
                 .isEqualTo(Collections.singletonList("ADD-key-8-value-0/8/9"));
     }
 
+    @Test
+    public void testCreateCompactor() throws Exception {
+        StoreSink<?, ?> sink =
+                new StoreSink<>(
+                        identifier,
+                        fileStore,
+                        WriteMode.CHANGE_LOG,
+                        partitions,
+                        primaryKeys,
+                        primaryKeys,
+                        2,
+                        true,
+                        null,
+                        () -> lock,
+                        null,
+                        null);
+        StatefulPrecommittingSinkWriter<?> writer = sink.createWriter(initContext());
+        assertThat(writer).isInstanceOf(StoreSinkCompactor.class);
+    }
+
     private void writeAndAssert(StoreSink<?, ?> sink) throws Exception {
         writeAndCommit(
                 sink,
@@ -207,13 +239,14 @@ public class StoreSinkTest {
     }
 
     private List<Committable> write(StoreSink<?, ?> sink, RowData... rows) throws Exception {
-        StoreSinkWriter<?> writer = sink.createWriter(null);
+        StatefulPrecommittingSinkWriter<?> writer = sink.createWriter(null);
         for (RowData row : rows) {
             writer.write(row, null);
         }
 
-        List<Committable> committables = writer.prepareCommit();
-        Map<BinaryRowData, Map<Integer, RecordWriter>> writers = new HashMap<>(writer.writers());
+        List<Committable> committables = ((StoreSinkWriter) writer).prepareCommit();
+        Map<BinaryRowData, Map<Integer, RecordWriter>> writers =
+                new HashMap<>(((StoreSinkWriter) writer).writers());
         assertThat(writers.size()).isGreaterThan(0);
 
         writer.close();
@@ -249,7 +282,7 @@ public class StoreSinkTest {
         assertThat(lock.closed).isTrue();
     }
 
-    private StoreSink<?, ?> newSink(Map<String, String> overwritePartition) {
+    private StoreSink<?, ?> newSink(@Nullable Map<String, String> overwritePartition) {
         return new StoreSink<>(
                 identifier,
                 fileStore,
@@ -258,6 +291,8 @@ public class StoreSinkTest {
                 primaryKeys,
                 primaryKeys,
                 2,
+                false,
+                null,
                 () -> lock,
                 overwritePartition,
                 null);
@@ -283,4 +318,49 @@ public class StoreSinkTest {
             closed = true;
         }
     }
+
+    private Sink.InitContext initContext() {
+        return new Sink.InitContext() {
+            @Override
+            public UserCodeClassLoader getUserCodeClassLoader() {
+                return null;
+            }
+
+            @Override
+            public MailboxExecutor getMailboxExecutor() {
+                return null;
+            }
+
+            @Override
+            public ProcessingTimeService getProcessingTimeService() {
+                return null;
+            }
+
+            @Override
+            public int getSubtaskId() {
+                return 0;
+            }
+
+            @Override
+            public int getNumberOfParallelSubtasks() {
+                return 1;
+            }
+
+            @Override
+            public SinkWriterMetricGroup metricGroup() {
+                return null;
+            }
+
+            @Override
+            public OptionalLong getRestoredCheckpointId() {
+                return OptionalLong.empty();
+            }
+
+            @Override
+            public SerializationSchema.InitializationContext
+                    asSerializationSchemaInitializationContext() {
+                return null;
+            }
+        };
+    }
 }
diff --git a/flink-table-store-connector/src/test/java/org/apache/flink/table/store/connector/sink/TestFileStore.java b/flink-table-store-connector/src/test/java/org/apache/flink/table/store/connector/sink/TestFileStore.java
index b80462c3..5c8d255c 100644
--- a/flink-table-store-connector/src/test/java/org/apache/flink/table/store/connector/sink/TestFileStore.java
+++ b/flink-table-store-connector/src/test/java/org/apache/flink/table/store/connector/sink/TestFileStore.java
@@ -32,6 +32,7 @@ import org.apache.flink.table.store.file.operation.FileStoreScan;
 import org.apache.flink.table.store.file.operation.FileStoreWrite;
 import org.apache.flink.table.store.file.operation.Lock;
 import org.apache.flink.table.store.file.stats.StatsTestUtils;
+import org.apache.flink.table.store.file.writer.CompactWriter;
 import org.apache.flink.table.store.file.writer.RecordWriter;
 import org.apache.flink.table.types.logical.RowType;
 
@@ -89,6 +90,15 @@ public class TestFileStore implements FileStore {
                     BinaryRowData partition, int bucket, ExecutorService compactExecutor) {
                 return new TestRecordWriter(hasPk);
             }
+
+            @Override
+            public CompactWriter createCompactWriter(
+                    BinaryRowData partition,
+                    int bucket,
+                    ExecutorService compactExecutor,
+                    List<DataFileMeta> restoreFiles) {
+                throw new UnsupportedOperationException();
+            }
         };
     }
 
diff --git a/flink-table-store-connector/src/test/java/org/apache/flink/table/store/connector/source/FileStoreSourceTest.java b/flink-table-store-connector/src/test/java/org/apache/flink/table/store/connector/source/FileStoreSourceTest.java
deleted file mode 100644
index a33816a9..00000000
--- a/flink-table-store-connector/src/test/java/org/apache/flink/table/store/connector/source/FileStoreSourceTest.java
+++ /dev/null
@@ -1,299 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.table.store.connector.source;
-
-import org.apache.flink.api.java.tuple.Tuple2;
-import org.apache.flink.configuration.Configuration;
-import org.apache.flink.table.data.GenericRowData;
-import org.apache.flink.table.data.StringData;
-import org.apache.flink.table.data.binary.BinaryRowData;
-import org.apache.flink.table.runtime.typeutils.RowDataSerializer;
-import org.apache.flink.table.store.file.FileStore;
-import org.apache.flink.table.store.file.FileStoreImpl;
-import org.apache.flink.table.store.file.FileStoreOptions;
-import org.apache.flink.table.store.file.WriteMode;
-import org.apache.flink.table.store.file.data.DataFileMeta;
-import org.apache.flink.table.store.file.mergetree.compact.DeduplicateMergeFunction;
-import org.apache.flink.table.store.file.mergetree.compact.MergeFunction;
-import org.apache.flink.table.store.file.mergetree.compact.ValueCountMergeFunction;
-import org.apache.flink.table.store.file.stats.FieldStats;
-import org.apache.flink.table.store.file.stats.FieldStatsArraySerializer;
-import org.apache.flink.table.store.file.utils.PartitionedManifestMeta;
-import org.apache.flink.table.types.logical.BigIntType;
-import org.apache.flink.table.types.logical.DoubleType;
-import org.apache.flink.table.types.logical.IntType;
-import org.apache.flink.table.types.logical.LogicalType;
-import org.apache.flink.table.types.logical.RowType;
-import org.apache.flink.table.types.logical.VarCharType;
-
-import org.junit.jupiter.params.ParameterizedTest;
-import org.junit.jupiter.params.provider.Arguments;
-import org.junit.jupiter.params.provider.MethodSource;
-
-import java.io.ByteArrayInputStream;
-import java.io.ByteArrayOutputStream;
-import java.io.IOException;
-import java.io.ObjectInputStream;
-import java.io.ObjectOutputStream;
-import java.time.Duration;
-import java.util.ArrayList;
-import java.util.Collections;
-import java.util.HashMap;
-import java.util.List;
-import java.util.Map;
-import java.util.Random;
-import java.util.UUID;
-import java.util.stream.Stream;
-
-import static org.assertj.core.api.Assertions.assertThat;
-
-/** Test for {@link FileStoreSource}. */
-public class FileStoreSourceTest {
-
-    private static final RowType RECORD_TYPE =
-            RowType.of(
-                    new LogicalType[] {
-                        new IntType(), new VarCharType(), new DoubleType(), new VarCharType()
-                    },
-                    new String[] {"k0", "k1", "v0", "v1"});
-
-    @MethodSource("parameters")
-    @ParameterizedTest
-    public void testSerDe(boolean hasPk, boolean partitioned, boolean specified)
-            throws ClassNotFoundException, IOException {
-        FileStore fileStore = buildFileStore(hasPk, partitioned);
-        Long specifiedSnapshotId = specified ? 1L : null;
-        Map<BinaryRowData, Map<Integer, List<DataFileMeta>>> specifiedManifestEntries =
-                specified ? buildManifestEntries(hasPk, partitioned) : null;
-        PartitionedManifestMeta partitionedManifestMeta =
-                specified
-                        ? new PartitionedManifestMeta(specifiedSnapshotId, specifiedManifestEntries)
-                        : null;
-        FileStoreSource source =
-                new FileStoreSource(
-                        fileStore,
-                        WriteMode.CHANGE_LOG,
-                        !hasPk,
-                        true,
-                        Duration.ofSeconds(1).toMillis(),
-                        true,
-                        null,
-                        null,
-                        null,
-                        partitionedManifestMeta);
-        Object object = readObject(writeObject(source));
-        assertThat(object).isInstanceOf(FileStoreSource.class);
-        FileStoreSource deserialized = (FileStoreSource) object;
-        assertThat(deserialized.getBoundedness()).isEqualTo(source.getBoundedness());
-        if (specified) {
-            assertThat(deserialized.getSpecifiedPartManifests())
-                    .isEqualTo(source.getSpecifiedPartManifests());
-        } else {
-            assertThat(deserialized.getSpecifiedPartManifests()).isNull();
-        }
-    }
-
-    private byte[] writeObject(FileStoreSource source) throws IOException {
-        ByteArrayOutputStream baos = new ByteArrayOutputStream(4096);
-        ObjectOutputStream oos = new ObjectOutputStream(baos);
-        oos.writeObject(source);
-        oos.close();
-        return baos.toByteArray();
-    }
-
-    private Object readObject(byte[] bytes) throws IOException, ClassNotFoundException {
-        ByteArrayInputStream bais = new ByteArrayInputStream(bytes);
-        ObjectInputStream ois = new ObjectInputStream(bais);
-        Object object = ois.readObject();
-        ois.close();
-        return object;
-    }
-
-    public static Stream<Arguments> parameters() {
-        // hasPk, partitioned, specified
-        return Stream.of(
-                Arguments.of(true, true, false),
-                Arguments.of(true, false, false),
-                Arguments.of(false, false, false),
-                Arguments.of(false, true, false),
-                Arguments.of(true, true, true),
-                Arguments.of(true, false, true),
-                Arguments.of(false, false, true),
-                Arguments.of(false, true, true));
-    }
-
-    private FileStore buildFileStore(boolean hasPk, boolean partitioned) {
-        String user = "user";
-        RowType partitionType = getPartitionType(partitioned);
-        RowType keyType = getKeyType(hasPk);
-        RowType valueType = getValueType(hasPk);
-        MergeFunction mergeFunction =
-                hasPk ? new DeduplicateMergeFunction() : new ValueCountMergeFunction();
-        return new FileStoreImpl(
-                0,
-                new FileStoreOptions(new Configuration()),
-                WriteMode.CHANGE_LOG,
-                user,
-                partitionType,
-                keyType,
-                valueType,
-                mergeFunction);
-    }
-
-    private static Map<BinaryRowData, Map<Integer, List<DataFileMeta>>> buildManifestEntries(
-            boolean hasPk, boolean partitioned) {
-        Map<BinaryRowData, Map<Integer, List<DataFileMeta>>> manifestEntries = new HashMap<>();
-        Map<Integer, List<DataFileMeta>> bucketEntries = new HashMap<>();
-        int totalBuckets = new Random().nextInt(10) + 1;
-        for (int bucket = 0; bucket < totalBuckets; bucket++) {
-            List<DataFileMeta> metaList = new ArrayList<>();
-            for (Tuple2<BinaryRowData, BinaryRowData> tuple : genMinMax(hasPk)) {
-                metaList.add(genDataFileMeta(hasPk, tuple.f0, tuple.f1));
-            }
-            bucketEntries.put(bucket, metaList);
-        }
-        genPartitionValues(partitioned)
-                .forEach(partValue -> manifestEntries.put(partValue, bucketEntries));
-        return manifestEntries;
-    }
-
-    private static List<BinaryRowData> genPartitionValues(boolean partitioned) {
-        RowDataSerializer partSerializer = new RowDataSerializer(getPartitionType(partitioned));
-        if (partitioned) {
-            int partSize = new Random().nextInt(10) + 1;
-            List<BinaryRowData> partKeys = new ArrayList<>();
-            for (int i = 0; i < partSize; i++) {
-                partKeys.add(
-                        partSerializer
-                                .toBinaryRow(
-                                        GenericRowData.of(
-                                                StringData.fromString(
-                                                        UUID.randomUUID().toString())))
-                                .copy());
-            }
-            return partKeys;
-        }
-        return Collections.singletonList(partSerializer.toBinaryRow(GenericRowData.of()).copy());
-    }
-
-    private static List<Tuple2<BinaryRowData, BinaryRowData>> genMinMax(boolean hasPk) {
-        RowDataSerializer keySerializer = new RowDataSerializer(getKeyType(hasPk));
-        int size = new Random().nextInt(20);
-        List<Tuple2<BinaryRowData, BinaryRowData>> minMaxKeys = new ArrayList<>();
-        for (int i = 0; i < size; i++) {
-            int k0 = new Random().nextInt(1000);
-            String k1 = UUID.randomUUID().toString();
-            BinaryRowData keyMin;
-            BinaryRowData keyMax;
-            if (hasPk) {
-                keyMin =
-                        keySerializer.toBinaryRow(GenericRowData.of(k0, StringData.fromString(k1)));
-                keyMax =
-                        keySerializer.toBinaryRow(
-                                GenericRowData.of(k0 + 100, StringData.fromString(k1)));
-            } else {
-                double v0 = new Random().nextDouble();
-                String v1 = UUID.randomUUID().toString();
-                keyMin =
-                        keySerializer.toBinaryRow(
-                                GenericRowData.of(
-                                        k0,
-                                        StringData.fromString(k1),
-                                        v0,
-                                        StringData.fromString(v1)));
-                keyMax =
-                        keySerializer.toBinaryRow(
-                                GenericRowData.of(
-                                        k0 + 100,
-                                        StringData.fromString(k1),
-                                        v0 + 1.5,
-                                        StringData.fromString(v1)));
-            }
-            minMaxKeys.add(Tuple2.of(keyMin, keyMax));
-        }
-        return minMaxKeys;
-    }
-
-    private static DataFileMeta genDataFileMeta(
-            boolean hasPk, BinaryRowData keyMin, BinaryRowData keyMax) {
-        long seqNumber = new Random().nextLong() + 1;
-        FieldStats k0Stats = new FieldStats(keyMin.getInt(0), keyMax.getInt(1), 0);
-        FieldStats k1Stats = new FieldStats(keyMin.getString(1), keyMax.getString(1), 0);
-        FieldStats v0Status =
-                new FieldStats(
-                        hasPk ? null : keyMin.getDouble(2), hasPk ? null : keyMax.getDouble(2), 0);
-        FieldStats v1Status =
-                new FieldStats(
-                        hasPk ? null : keyMin.getString(3), hasPk ? null : keyMax.getString(3), 0);
-        long count = new Random().nextLong();
-
-        FieldStatsArraySerializer keyStatsSer = new FieldStatsArraySerializer(getKeyType(hasPk));
-        FieldStatsArraySerializer valueStatsSer =
-                new FieldStatsArraySerializer(getValueType(hasPk));
-
-        FieldStats[] keyStats =
-                hasPk
-                        ? new FieldStats[] {k0Stats, k1Stats}
-                        : new FieldStats[] {
-                            k0Stats, k1Stats,
-                            v0Status, v1Status
-                        };
-        FieldStats[] valueStats =
-                hasPk
-                        ? new FieldStats[] {
-                            k0Stats, k1Stats,
-                            v0Status, v1Status
-                        }
-                        : new FieldStats[] {
-                            new FieldStats(count, count + new Random().nextInt(100), 0)
-                        };
-        return new DataFileMeta(
-                "data-" + UUID.randomUUID(),
-                new Random().nextInt(100),
-                new Random().nextInt(100),
-                keyMin,
-                keyMax,
-                keyStatsSer.toBinary(keyStats),
-                valueStatsSer.toBinary(valueStats),
-                seqNumber,
-                seqNumber + new Random().nextInt(100),
-                new Random().nextInt(4));
-    }
-
-    private static RowType getKeyType(boolean hasPk) {
-        return hasPk
-                ? RowType.of(
-                        new LogicalType[] {new IntType(), new VarCharType()},
-                        new String[] {"k0", "k1"})
-                : RECORD_TYPE;
-    }
-
-    private static RowType getValueType(boolean hasPk) {
-        return hasPk
-                ? RECORD_TYPE
-                : RowType.of(
-                        new LogicalType[] {new BigIntType(false)}, new String[] {"_VALUE_COUNT"});
-    }
-
-    private static RowType getPartitionType(boolean partitioned) {
-        return partitioned
-                ? RowType.of(new LogicalType[] {new VarCharType()}, new String[] {"k1"})
-                : RowType.of();
-    }
-}
diff --git a/flink-table-store-core/src/main/java/org/apache/flink/table/store/file/FileStoreImpl.java b/flink-table-store-core/src/main/java/org/apache/flink/table/store/file/FileStoreImpl.java
index ea196f4e..ee7428a9 100644
--- a/flink-table-store-core/src/main/java/org/apache/flink/table/store/file/FileStoreImpl.java
+++ b/flink-table-store-core/src/main/java/org/apache/flink/table/store/file/FileStoreImpl.java
@@ -19,11 +19,7 @@
 package org.apache.flink.table.store.file;
 
 import org.apache.flink.annotation.VisibleForTesting;
-import org.apache.flink.table.api.TableConfig;
 import org.apache.flink.table.data.RowData;
-import org.apache.flink.table.runtime.generated.GeneratedRecordComparator;
-import org.apache.flink.table.runtime.generated.RecordComparator;
-import org.apache.flink.table.store.codegen.CodeGenUtils;
 import org.apache.flink.table.store.file.manifest.ManifestFile;
 import org.apache.flink.table.store.file.manifest.ManifestList;
 import org.apache.flink.table.store.file.mergetree.compact.DeduplicateMergeFunction;
@@ -36,13 +32,16 @@ import org.apache.flink.table.store.file.operation.FileStoreReadImpl;
 import org.apache.flink.table.store.file.operation.FileStoreScanImpl;
 import org.apache.flink.table.store.file.operation.FileStoreWriteImpl;
 import org.apache.flink.table.store.file.utils.FileStorePathFactory;
+import org.apache.flink.table.store.file.utils.KeyComparatorSupplier;
 import org.apache.flink.table.types.logical.BigIntType;
 import org.apache.flink.table.types.logical.LogicalType;
 import org.apache.flink.table.types.logical.RowType;
 
 import javax.annotation.Nullable;
 
+import java.util.Comparator;
 import java.util.List;
+import java.util.function.Supplier;
 import java.util.stream.Collectors;
 
 /** File store implementation. */
@@ -55,8 +54,8 @@ public class FileStoreImpl implements FileStore {
     private final RowType partitionType;
     private final RowType keyType;
     private final RowType valueType;
+    private final Supplier<Comparator<RowData>> keyComparatorSupplier;
     @Nullable private final MergeFunction mergeFunction;
-    private final GeneratedRecordComparator genRecordComparator;
 
     public FileStoreImpl(
             long schemaId,
@@ -75,9 +74,7 @@ public class FileStoreImpl implements FileStore {
         this.keyType = keyType;
         this.valueType = valueType;
         this.mergeFunction = mergeFunction;
-        this.genRecordComparator =
-                CodeGenUtils.generateRecordComparator(
-                        new TableConfig(), keyType.getChildren(), "KeyComparator");
+        this.keyComparatorSupplier = new KeyComparatorSupplier(keyType);
     }
 
     public FileStorePathFactory pathFactory() {
@@ -106,17 +103,13 @@ public class FileStoreImpl implements FileStore {
         return new ManifestList.Factory(partitionType, options.manifestFormat(), pathFactory());
     }
 
-    private RecordComparator newKeyComparator() {
-        return genRecordComparator.newInstance(Thread.currentThread().getContextClassLoader());
-    }
-
     @Override
     public FileStoreWriteImpl newWrite() {
         return new FileStoreWriteImpl(
                 writeMode,
                 keyType,
                 valueType,
-                this::newKeyComparator,
+                keyComparatorSupplier,
                 mergeFunction,
                 options.fileFormat(),
                 pathFactory(),
@@ -130,7 +123,7 @@ public class FileStoreImpl implements FileStore {
                 writeMode,
                 keyType,
                 valueType,
-                newKeyComparator(),
+                keyComparatorSupplier.get(),
                 mergeFunction,
                 options.fileFormat(),
                 pathFactory());
diff --git a/flink-table-store-core/src/main/java/org/apache/flink/table/store/file/mergetree/Increment.java b/flink-table-store-core/src/main/java/org/apache/flink/table/store/file/mergetree/Increment.java
index 965d1d82..1cf74743 100644
--- a/flink-table-store-core/src/main/java/org/apache/flink/table/store/file/mergetree/Increment.java
+++ b/flink-table-store-core/src/main/java/org/apache/flink/table/store/file/mergetree/Increment.java
@@ -35,6 +35,7 @@ import java.util.Objects;
  */
 public class Increment {
 
+    private static final List<DataFileMeta> EMPTY_NEW_FILES = Collections.emptyList();
     private static final List<DataFileMeta> EMPTY_COMPACT_BEFORE = Collections.emptyList();
     private static final List<DataFileMeta> EMPTY_COMPACT_AFTER = Collections.emptyList();
 
@@ -48,6 +49,11 @@ public class Increment {
         return new Increment(newFiles, EMPTY_COMPACT_BEFORE, EMPTY_COMPACT_AFTER);
     }
 
+    public static Increment forCompact(
+            List<DataFileMeta> compactBefore, List<DataFileMeta> compactAfter) {
+        return new Increment(EMPTY_NEW_FILES, compactBefore, compactAfter);
+    }
+
     public Increment(
             List<DataFileMeta> newFiles,
             List<DataFileMeta> beCompacted,
diff --git a/flink-table-store-core/src/main/java/org/apache/flink/table/store/file/mergetree/compact/CompactManager.java b/flink-table-store-core/src/main/java/org/apache/flink/table/store/file/mergetree/compact/CompactManager.java
index 7879965e..133e2a12 100644
--- a/flink-table-store-core/src/main/java/org/apache/flink/table/store/file/mergetree/compact/CompactManager.java
+++ b/flink-table-store-core/src/main/java/org/apache/flink/table/store/file/mergetree/compact/CompactManager.java
@@ -97,40 +97,51 @@ public class CompactManager {
 
                             if (LOG.isDebugEnabled()) {
                                 LOG.debug(
-                                        "Submit compaction with files (level, size): "
+                                        "Submit compaction with files (name, level, size): "
                                                 + levels.levelSortedRuns().stream()
                                                         .flatMap(lsr -> lsr.run().files().stream())
                                                         .map(
                                                                 file ->
                                                                         String.format(
-                                                                                "(%d, %d)",
-                                                                                file.level(),
-                                                                                file.fileSize()))
-                                                        .collect(Collectors.joining(", ")));
-                                LOG.debug(
-                                        "Pick these files (level, size) for compaction: "
-                                                + unit.files().stream()
-                                                        .map(
-                                                                file ->
-                                                                        String.format(
-                                                                                "(%d, %d)",
+                                                                                "(%s, %d, %d)",
+                                                                                file.fileName(),
                                                                                 file.level(),
                                                                                 file.fileSize()))
                                                         .collect(Collectors.joining(", ")));
                             }
-
-                            CompactTask task = new CompactTask(unit, dropDelete);
-                            taskFuture = executor.submit(task);
+                            submitCompaction(unit, dropDelete);
                         });
     }
 
+    public void submitCompaction(CompactUnit unit, boolean dropDelete) {
+        CompactTask task = new CompactTask(unit, dropDelete);
+        if (LOG.isDebugEnabled()) {
+            LOG.debug(
+                    "Pick these files (name, level, size) for compaction: {}",
+                    unit.files().stream()
+                            .map(
+                                    file ->
+                                            String.format(
+                                                    "(%s, %d, %d)",
+                                                    file.fileName(), file.level(), file.fileSize()))
+                            .collect(Collectors.joining(", ")));
+        }
+        taskFuture = executor.submit(task);
+    }
+
     /** Finish current task, and update result files to {@link Levels}. */
     public Optional<CompactResult> finishCompaction(Levels levels, boolean blocking)
             throws ExecutionException, InterruptedException {
+        Optional<CompactResult> result = finishCompaction(blocking);
+        result.ifPresent(r -> levels.update(r.before(), r.after()));
+        return result;
+    }
+
+    public Optional<CompactResult> finishCompaction(boolean blocking)
+            throws ExecutionException, InterruptedException {
         if (taskFuture != null) {
             if (blocking || taskFuture.isDone()) {
                 CompactResult result = taskFuture.get();
-                levels.update(result.before(), result.after());
                 taskFuture = null;
                 return Optional.of(result);
             }
diff --git a/flink-table-store-core/src/main/java/org/apache/flink/table/store/file/mergetree/compact/CompactUnit.java b/flink-table-store-core/src/main/java/org/apache/flink/table/store/file/mergetree/compact/CompactUnit.java
index dd1645a5..af8510e1 100644
--- a/flink-table-store-core/src/main/java/org/apache/flink/table/store/file/mergetree/compact/CompactUnit.java
+++ b/flink-table-store-core/src/main/java/org/apache/flink/table/store/file/mergetree/compact/CompactUnit.java
@@ -25,7 +25,7 @@ import java.util.ArrayList;
 import java.util.List;
 
 /** A files unit for compaction. */
-interface CompactUnit {
+public interface CompactUnit {
 
     int outputLevel();
 
diff --git a/flink-table-store-core/src/main/java/org/apache/flink/table/store/file/operation/FileStoreCommitImpl.java b/flink-table-store-core/src/main/java/org/apache/flink/table/store/file/operation/FileStoreCommitImpl.java
index e8e09fc1..1c45ecfa 100644
--- a/flink-table-store-core/src/main/java/org/apache/flink/table/store/file/operation/FileStoreCommitImpl.java
+++ b/flink-table-store-core/src/main/java/org/apache/flink/table/store/file/operation/FileStoreCommitImpl.java
@@ -170,12 +170,14 @@ public class FileStoreCommitImpl implements FileStoreCommit {
         }
 
         List<ManifestEntry> appendChanges = collectChanges(committable.newFiles(), ValueKind.ADD);
-        tryCommit(
-                appendChanges,
-                committable.identifier(),
-                committable.logOffsets(),
-                Snapshot.CommitKind.APPEND,
-                false);
+        if (!appendChanges.isEmpty()) {
+            tryCommit(
+                    appendChanges,
+                    committable.identifier(),
+                    committable.logOffsets(),
+                    Snapshot.CommitKind.APPEND,
+                    false);
+        }
 
         List<ManifestEntry> compactChanges = new ArrayList<>();
         compactChanges.addAll(collectChanges(committable.compactBefore(), ValueKind.DELETE));
diff --git a/flink-table-store-core/src/main/java/org/apache/flink/table/store/file/operation/FileStoreWrite.java b/flink-table-store-core/src/main/java/org/apache/flink/table/store/file/operation/FileStoreWrite.java
index 253bf8f6..d9e020b1 100644
--- a/flink-table-store-core/src/main/java/org/apache/flink/table/store/file/operation/FileStoreWrite.java
+++ b/flink-table-store-core/src/main/java/org/apache/flink/table/store/file/operation/FileStoreWrite.java
@@ -19,8 +19,10 @@
 package org.apache.flink.table.store.file.operation;
 
 import org.apache.flink.table.data.binary.BinaryRowData;
+import org.apache.flink.table.store.file.data.DataFileMeta;
 import org.apache.flink.table.store.file.writer.RecordWriter;
 
+import java.util.List;
 import java.util.concurrent.ExecutorService;
 
 /** Write operation which provides {@link RecordWriter} creation. */
@@ -32,4 +34,11 @@ public interface FileStoreWrite {
     /** Create an empty {@link RecordWriter} from partition and bucket. */
     RecordWriter createEmptyWriter(
             BinaryRowData partition, int bucket, ExecutorService compactExecutor);
+
+    /** Create a compact {@link RecordWriter} from partition, bucket and restore files. */
+    RecordWriter createCompactWriter(
+            BinaryRowData partition,
+            int bucket,
+            ExecutorService compactExecutor,
+            List<DataFileMeta> restoredFiles);
 }
diff --git a/flink-table-store-core/src/main/java/org/apache/flink/table/store/file/operation/FileStoreWriteImpl.java b/flink-table-store-core/src/main/java/org/apache/flink/table/store/file/operation/FileStoreWriteImpl.java
index 45d47206..53c2f3b3 100644
--- a/flink-table-store-core/src/main/java/org/apache/flink/table/store/file/operation/FileStoreWriteImpl.java
+++ b/flink-table-store-core/src/main/java/org/apache/flink/table/store/file/operation/FileStoreWriteImpl.java
@@ -20,7 +20,6 @@ package org.apache.flink.table.store.file.operation;
 
 import org.apache.flink.table.data.RowData;
 import org.apache.flink.table.data.binary.BinaryRowData;
-import org.apache.flink.table.store.file.KeyValue;
 import org.apache.flink.table.store.file.WriteMode;
 import org.apache.flink.table.store.file.data.DataFileMeta;
 import org.apache.flink.table.store.file.data.DataFilePathFactory;
@@ -35,11 +34,13 @@ import org.apache.flink.table.store.file.mergetree.MergeTreeWriter;
 import org.apache.flink.table.store.file.mergetree.SortBufferMemTable;
 import org.apache.flink.table.store.file.mergetree.compact.CompactManager;
 import org.apache.flink.table.store.file.mergetree.compact.CompactStrategy;
+import org.apache.flink.table.store.file.mergetree.compact.CompactUnit;
 import org.apache.flink.table.store.file.mergetree.compact.MergeFunction;
 import org.apache.flink.table.store.file.mergetree.compact.UniversalCompaction;
 import org.apache.flink.table.store.file.utils.FileStorePathFactory;
 import org.apache.flink.table.store.file.utils.RecordReaderIterator;
 import org.apache.flink.table.store.file.writer.AppendOnlyWriter;
+import org.apache.flink.table.store.file.writer.CompactWriter;
 import org.apache.flink.table.store.file.writer.RecordWriter;
 import org.apache.flink.table.types.logical.RowType;
 
@@ -48,6 +49,7 @@ import org.apache.flink.shaded.guava30.com.google.common.collect.Lists;
 import java.util.Collections;
 import java.util.Comparator;
 import java.util.List;
+import java.util.Optional;
 import java.util.concurrent.ExecutorService;
 import java.util.function.Supplier;
 
@@ -136,6 +138,22 @@ public class FileStoreWriteImpl implements FileStoreWrite {
         return createMergeTreeWriter(partition, bucket, Collections.emptyList(), compactExecutor);
     }
 
+    @Override
+    public RecordWriter createCompactWriter(
+            BinaryRowData partition,
+            int bucket,
+            ExecutorService compactExecutor,
+            List<DataFileMeta> restoredFiles) {
+        return new CompactWriter(
+                CompactUnit.fromFiles(options.numLevels - 1, restoredFiles),
+                createCompactManager(
+                        partition,
+                        bucket,
+                        (numLevels, runs) ->
+                                Optional.of(CompactUnit.fromLevelRuns(numLevels - 1, runs)),
+                        compactExecutor));
+    }
+
     private RecordWriter createMergeTreeWriter(
             BinaryRowData partition,
             int bucket,
@@ -154,7 +172,14 @@ public class FileStoreWriteImpl implements FileStoreWrite {
                         dataFileWriter.valueType(),
                         options.writeBufferSize,
                         options.pageSize),
-                createCompactManager(partition, bucket, compactExecutor),
+                createCompactManager(
+                        partition,
+                        bucket,
+                        new UniversalCompaction(
+                                options.maxSizeAmplificationPercent,
+                                options.sizeRatio,
+                                options.numSortedRunCompactionTrigger),
+                        compactExecutor),
                 new Levels(keyComparator, restoreFiles, options.numLevels),
                 maxSequenceNumber,
                 keyComparator,
@@ -165,18 +190,16 @@ public class FileStoreWriteImpl implements FileStoreWrite {
     }
 
     private CompactManager createCompactManager(
-            BinaryRowData partition, int bucket, ExecutorService compactExecutor) {
-        CompactStrategy compactStrategy =
-                new UniversalCompaction(
-                        options.maxSizeAmplificationPercent,
-                        options.sizeRatio,
-                        options.numSortedRunCompactionTrigger);
+            BinaryRowData partition,
+            int bucket,
+            CompactStrategy compactStrategy,
+            ExecutorService compactExecutor) {
         DataFileWriter dataFileWriter = dataFileWriterFactory.create(partition, bucket);
         Comparator<RowData> keyComparator = keyComparatorSupplier.get();
         CompactManager.Rewriter rewriter =
                 (outputLevel, dropDelete, sections) ->
                         dataFileWriter.write(
-                                new RecordReaderIterator<KeyValue>(
+                                new RecordReaderIterator<>(
                                         new MergeTreeReader(
                                                 sections,
                                                 dropDelete,
diff --git a/flink-table-store-core/src/main/java/org/apache/flink/table/store/file/predicate/PredicateConverter.java b/flink-table-store-core/src/main/java/org/apache/flink/table/store/file/predicate/PredicateConverter.java
index 0656e2f6..377d0fac 100644
--- a/flink-table-store-core/src/main/java/org/apache/flink/table/store/file/predicate/PredicateConverter.java
+++ b/flink-table-store-core/src/main/java/org/apache/flink/table/store/file/predicate/PredicateConverter.java
@@ -58,6 +58,7 @@ public class PredicateConverter implements ExpressionVisitor<Predicate> {
 
     @Nullable
     public Predicate fromMap(Map<String, String> map, RowType rowType) {
+        // TODO: It is somewhat misleading that an empty map creates a null predicate filter
         List<String> fieldNames = rowType.getFieldNames();
         Predicate predicate = null;
         for (Map.Entry<String, String> entry : map.entrySet()) {
diff --git a/flink-table-store-core/src/main/java/org/apache/flink/table/store/file/utils/FileStorePathFactory.java b/flink-table-store-core/src/main/java/org/apache/flink/table/store/file/utils/FileStorePathFactory.java
index 7b182acd..4f79a568 100644
--- a/flink-table-store-core/src/main/java/org/apache/flink/table/store/file/utils/FileStorePathFactory.java
+++ b/flink-table-store-core/src/main/java/org/apache/flink/table/store/file/utils/FileStorePathFactory.java
@@ -68,21 +68,26 @@ public class FileStorePathFactory {
         this.root = root;
         this.uuid = UUID.randomUUID().toString();
 
-        String[] partitionColumns = partitionType.getFieldNames().toArray(new String[0]);
-        this.partitionComputer =
-                new RowDataPartitionComputer(
-                        defaultPartValue,
-                        partitionColumns,
-                        partitionType.getFields().stream()
-                                .map(f -> LogicalTypeDataTypeConverter.toDataType(f.getType()))
-                                .toArray(DataType[]::new),
-                        partitionColumns);
+        this.partitionComputer = getPartitionComputer(partitionType, defaultPartValue);
         this.formatIdentifier = formatIdentifier;
 
         this.manifestFileCount = new AtomicInteger(0);
         this.manifestListCount = new AtomicInteger(0);
     }
 
+    @VisibleForTesting
+    public static RowDataPartitionComputer getPartitionComputer(
+            RowType partitionType, String defaultPartValue) {
+        String[] partitionColumns = partitionType.getFieldNames().toArray(new String[0]);
+        return new RowDataPartitionComputer(
+                defaultPartValue,
+                partitionColumns,
+                partitionType.getFields().stream()
+                        .map(f -> LogicalTypeDataTypeConverter.toDataType(f.getType()))
+                        .toArray(DataType[]::new),
+                partitionColumns);
+    }
+
     public Path newManifestFile() {
         return new Path(
                 root + "/manifest/manifest-" + uuid + "-" + manifestFileCount.getAndIncrement());
diff --git a/flink-table-store-core/src/main/java/org/apache/flink/table/store/file/utils/KeyComparatorSupplier.java b/flink-table-store-core/src/main/java/org/apache/flink/table/store/file/utils/KeyComparatorSupplier.java
new file mode 100644
index 00000000..1b03d88b
--- /dev/null
+++ b/flink-table-store-core/src/main/java/org/apache/flink/table/store/file/utils/KeyComparatorSupplier.java
@@ -0,0 +1,49 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.store.file.utils;
+
+import org.apache.flink.table.api.TableConfig;
+import org.apache.flink.table.data.RowData;
+import org.apache.flink.table.runtime.generated.GeneratedRecordComparator;
+import org.apache.flink.table.runtime.generated.RecordComparator;
+import org.apache.flink.table.store.codegen.CodeGenUtils;
+import org.apache.flink.table.types.logical.RowType;
+import org.apache.flink.util.function.SerializableSupplier;
+
+import java.util.Comparator;
+import java.util.function.Supplier;
+
+/** A {@link Supplier} that returns the comparator for the file store key. */
+public class KeyComparatorSupplier implements SerializableSupplier<Comparator<RowData>> {
+
+    private static final long serialVersionUID = 1L;
+
+    private final GeneratedRecordComparator genRecordComparator;
+
+    public KeyComparatorSupplier(RowType keyType) {
+        genRecordComparator =
+                CodeGenUtils.generateRecordComparator(
+                        new TableConfig(), keyType.getChildren(), "KeyComparator");
+    }
+
+    @Override
+    public RecordComparator get() {
+        return genRecordComparator.newInstance(Thread.currentThread().getContextClassLoader());
+    }
+}
diff --git a/flink-table-store-core/src/main/java/org/apache/flink/table/store/file/utils/PartitionedManifestMeta.java b/flink-table-store-core/src/main/java/org/apache/flink/table/store/file/utils/PartitionedManifestMeta.java
deleted file mode 100644
index e867ba4b..00000000
--- a/flink-table-store-core/src/main/java/org/apache/flink/table/store/file/utils/PartitionedManifestMeta.java
+++ /dev/null
@@ -1,131 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.table.store.file.utils;
-
-import org.apache.flink.core.memory.DataInputViewStreamWrapper;
-import org.apache.flink.core.memory.DataOutputViewStreamWrapper;
-import org.apache.flink.table.data.binary.BinaryRowData;
-import org.apache.flink.table.store.file.data.DataFileMeta;
-import org.apache.flink.table.store.file.data.DataFileMetaSerializer;
-import org.apache.flink.util.Preconditions;
-
-import java.io.IOException;
-import java.io.ObjectInputStream;
-import java.io.ObjectOutputStream;
-import java.io.Serializable;
-import java.util.ArrayList;
-import java.util.HashMap;
-import java.util.List;
-import java.util.Map;
-import java.util.Objects;
-
-import static org.apache.flink.table.store.file.utils.SerializationUtils.deserializeBinaryRow;
-import static org.apache.flink.table.store.file.utils.SerializationUtils.serializeBinaryRow;
-
-/** Manifest entries per partitioned with the corresponding snapshot id. */
-public class PartitionedManifestMeta implements Serializable {
-
-    private static final long serialVersionUID = 1L;
-
-    /** The latest snapshot id seen at planning phase when manual compaction is triggered. */
-    private final Long snapshotId;
-
-    /** The manifest entries collected at planning phase when manual compaction is triggered. */
-    private transient Map<BinaryRowData, Map<Integer, List<DataFileMeta>>> manifestEntries;
-
-    public PartitionedManifestMeta(
-            Long snapshotId,
-            Map<BinaryRowData, Map<Integer, List<DataFileMeta>>> specifiedManifestEntries) {
-        Preconditions.checkNotNull(snapshotId, "Specified snapshot should not be null.");
-        Preconditions.checkNotNull(
-                specifiedManifestEntries, "Specified manifest entries should not be null.");
-        this.snapshotId = snapshotId;
-        this.manifestEntries = specifiedManifestEntries;
-    }
-
-    public Long getSnapshotId() {
-        return snapshotId;
-    }
-
-    public Map<BinaryRowData, Map<Integer, List<DataFileMeta>>> getManifestEntries() {
-        return manifestEntries;
-    }
-
-    private void writeObject(ObjectOutputStream out) throws IOException {
-        out.defaultWriteObject();
-
-        DataFileMetaSerializer metaSerializer = new DataFileMetaSerializer();
-        DataOutputViewStreamWrapper view = new DataOutputViewStreamWrapper(out);
-        view.writeInt(manifestEntries.size());
-        for (Map.Entry<BinaryRowData, Map<Integer, List<DataFileMeta>>> partEntry :
-                manifestEntries.entrySet()) {
-            serializeBinaryRow(partEntry.getKey(), view);
-            Map<Integer, List<DataFileMeta>> bucketEntry = partEntry.getValue();
-            view.writeInt(bucketEntry.size());
-            for (Map.Entry<Integer, List<DataFileMeta>> entry : bucketEntry.entrySet()) {
-                view.writeInt(entry.getKey());
-                view.writeInt(entry.getValue().size());
-                for (DataFileMeta meta : entry.getValue()) {
-                    metaSerializer.serialize(meta, view);
-                }
-            }
-        }
-    }
-
-    private void readObject(ObjectInputStream in) throws ClassNotFoundException, IOException {
-        in.defaultReadObject();
-
-        manifestEntries = new HashMap<>();
-        DataFileMetaSerializer metaSerializer = new DataFileMetaSerializer();
-        DataInputViewStreamWrapper view = new DataInputViewStreamWrapper(in);
-        int partitionNum = view.readInt();
-        for (int i = 0; i < partitionNum; i++) {
-            BinaryRowData partition = deserializeBinaryRow(view);
-            Map<Integer, List<DataFileMeta>> bucketEntry = new HashMap<>();
-            int bucketNum = view.readInt();
-            for (int j = 0; j < bucketNum; j++) {
-                int bucket = view.readInt();
-                int entryNum = view.readInt();
-                List<DataFileMeta> metas = new ArrayList<>();
-                for (int k = 0; k < entryNum; k++) {
-                    metas.add(metaSerializer.deserialize(view));
-                }
-                bucketEntry.put(bucket, metas);
-            }
-            manifestEntries.put(partition, bucketEntry);
-        }
-    }
-
-    @Override
-    public boolean equals(Object o) {
-        if (this == o) {
-            return true;
-        }
-        if (!(o instanceof PartitionedManifestMeta)) {
-            return false;
-        }
-        PartitionedManifestMeta that = (PartitionedManifestMeta) o;
-        return snapshotId.equals(that.snapshotId) && manifestEntries.equals(that.manifestEntries);
-    }
-
-    @Override
-    public int hashCode() {
-        return Objects.hash(snapshotId, manifestEntries);
-    }
-}
diff --git a/flink-table-store-core/src/main/java/org/apache/flink/table/store/file/writer/CompactWriter.java b/flink-table-store-core/src/main/java/org/apache/flink/table/store/file/writer/CompactWriter.java
new file mode 100644
index 00000000..de625beb
--- /dev/null
+++ b/flink-table-store-core/src/main/java/org/apache/flink/table/store/file/writer/CompactWriter.java
@@ -0,0 +1,82 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.store.file.writer;
+
+import org.apache.flink.table.data.RowData;
+import org.apache.flink.table.store.file.ValueKind;
+import org.apache.flink.table.store.file.data.DataFileMeta;
+import org.apache.flink.table.store.file.mergetree.Increment;
+import org.apache.flink.table.store.file.mergetree.compact.CompactManager;
+import org.apache.flink.table.store.file.mergetree.compact.CompactUnit;
+
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.List;
+import java.util.concurrent.ExecutionException;
+
+/**
+ * A {@link RecordWriter} implementation that only perform compaction on existing records and does
+ * not generate new records.
+ */
+public class CompactWriter implements RecordWriter {
+
+    private final CompactUnit unit;
+    private final CompactManager compactManager;
+
+    public CompactWriter(CompactUnit unit, CompactManager compactManager) {
+        this.unit = unit;
+        this.compactManager = compactManager;
+    }
+
+    @Override
+    public Increment prepareCommit() throws IOException, InterruptedException {
+        List<DataFileMeta> compactBefore = new ArrayList<>();
+        List<DataFileMeta> compactAfter = new ArrayList<>();
+        if (compactManager.isCompactionFinished()) {
+            compactManager.submitCompaction(unit, true);
+            try {
+                compactManager
+                        .finishCompaction(true)
+                        .ifPresent(
+                                result -> {
+                                    compactBefore.addAll(result.before());
+                                    compactAfter.addAll(result.after());
+                                });
+                return Increment.forCompact(compactBefore, compactAfter);
+            } catch (ExecutionException e) {
+                throw new IOException(e.getCause());
+            }
+        }
+        throw new IllegalStateException("Compact manager should have finished previous task.");
+    }
+
+    @Override
+    public List<DataFileMeta> close() throws Exception {
+        return Collections.emptyList();
+    }
+
+    @Override
+    public void write(ValueKind valueKind, RowData key, RowData value) throws Exception {
+        // nothing to write
+    }
+
+    @Override
+    public void sync() throws Exception {}
+}
diff --git a/flink-table-store-core/src/test/java/org/apache/flink/table/store/file/KeyValueSerializerTest.java b/flink-table-store-core/src/test/java/org/apache/flink/table/store/file/KeyValueSerializerTest.java
index 27563671..75a44155 100644
--- a/flink-table-store-core/src/test/java/org/apache/flink/table/store/file/KeyValueSerializerTest.java
+++ b/flink-table-store-core/src/test/java/org/apache/flink/table/store/file/KeyValueSerializerTest.java
@@ -32,7 +32,7 @@ public class KeyValueSerializerTest extends ObjectSerializerTestBase<KeyValue> {
     @Override
     protected ObjectSerializer<KeyValue> serializer() {
         return new KeyValueSerializer(
-                TestKeyValueGenerator.KEY_TYPE, TestKeyValueGenerator.ROW_TYPE);
+                TestKeyValueGenerator.KEY_TYPE, TestKeyValueGenerator.DEFAULT_ROW_TYPE);
     }
 
     @Override
@@ -47,7 +47,7 @@ public class KeyValueSerializerTest extends ObjectSerializerTestBase<KeyValue> {
                                 expected,
                                 actual,
                                 TestKeyValueGenerator.KEY_SERIALIZER,
-                                TestKeyValueGenerator.ROW_SERIALIZER))
+                                TestKeyValueGenerator.DEFAULT_ROW_SERIALIZER))
                 .isTrue();
     }
 
diff --git a/flink-table-store-core/src/test/java/org/apache/flink/table/store/file/TestKeyValueGenerator.java b/flink-table-store-core/src/test/java/org/apache/flink/table/store/file/TestKeyValueGenerator.java
index bb73ea79..29dc549d 100644
--- a/flink-table-store-core/src/test/java/org/apache/flink/table/store/file/TestKeyValueGenerator.java
+++ b/flink-table-store-core/src/test/java/org/apache/flink/table/store/file/TestKeyValueGenerator.java
@@ -20,6 +20,7 @@ package org.apache.flink.table.store.file;
 
 import org.apache.flink.table.data.GenericArrayData;
 import org.apache.flink.table.data.GenericRowData;
+import org.apache.flink.table.data.RowData;
 import org.apache.flink.table.data.StringData;
 import org.apache.flink.table.data.binary.BinaryRowData;
 import org.apache.flink.table.runtime.generated.RecordComparator;
@@ -34,10 +35,16 @@ import org.apache.flink.table.types.logical.VarCharType;
 import javax.annotation.Nullable;
 
 import java.util.ArrayList;
+import java.util.Arrays;
 import java.util.HashMap;
 import java.util.List;
 import java.util.Map;
 import java.util.Random;
+import java.util.stream.Collectors;
+
+import static org.apache.flink.table.store.file.TestKeyValueGenerator.GeneratorMode.MULTI_PARTITIONED;
+import static org.apache.flink.table.store.file.TestKeyValueGenerator.GeneratorMode.NON_PARTITIONED;
+import static org.apache.flink.table.store.file.TestKeyValueGenerator.GeneratorMode.SINGLE_PARTITIONED;
 
 /** Random {@link KeyValue} generator. */
 public class TestKeyValueGenerator {
@@ -53,7 +60,7 @@ public class TestKeyValueGenerator {
     // * itemId: long, any value
     // * price & amount: int[], [1 ~ 100, 1 ~ 10]
     // * comment: string, length from 10 to 1000
-    public static final RowType ROW_TYPE =
+    public static final RowType DEFAULT_ROW_TYPE =
             RowType.of(
                     new LogicalType[] {
                         new VarCharType(false, 8),
@@ -67,18 +74,42 @@ public class TestKeyValueGenerator {
                     new String[] {
                         "dt", "hr", "shopId", "orderId", "itemId", "priceAmount", "comment"
                     });
-    public static final RowType PARTITION_TYPE =
+    public static final RowType DEFAULT_PART_TYPE =
+            new RowType(DEFAULT_ROW_TYPE.getFields().subList(0, 2));
+
+    public static final RowType SINGLE_PARTITIONED_ROW_TYPE =
+            RowType.of(
+                    new LogicalType[] {
+                        new VarCharType(false, 8),
+                        new IntType(false),
+                        new BigIntType(false),
+                        new BigIntType(),
+                        new ArrayType(new IntType()),
+                        new VarCharType(Integer.MAX_VALUE)
+                    },
+                    new String[] {"dt", "shopId", "orderId", "itemId", "priceAmount", "comment"});
+    public static final RowType SINGLE_PARTITIONED_PART_TYPE =
+            RowType.of(SINGLE_PARTITIONED_ROW_TYPE.getTypeAt(0));
+
+    public static final RowType NON_PARTITIONED_ROW_TYPE =
             RowType.of(
-                    new LogicalType[] {new VarCharType(false, 8), new IntType(false)},
-                    new String[] {"dt", "hr"});
+                    new LogicalType[] {
+                        new IntType(false),
+                        new BigIntType(false),
+                        new BigIntType(),
+                        new ArrayType(new IntType()),
+                        new VarCharType(Integer.MAX_VALUE)
+                    },
+                    new String[] {"shopId", "orderId", "itemId", "priceAmount", "comment"});
+    public static final RowType NON_PARTITIONED_PART_TYPE = RowType.of();
+
     public static final RowType KEY_TYPE =
             RowType.of(
                     new LogicalType[] {new IntType(false), new BigIntType(false)},
                     new String[] {"key_shopId", "key_orderId"});
 
-    public static final RowDataSerializer ROW_SERIALIZER = new RowDataSerializer(ROW_TYPE);
-    private static final RowDataSerializer PARTITION_SERIALIZER =
-            new RowDataSerializer(PARTITION_TYPE);
+    public static final RowDataSerializer DEFAULT_ROW_SERIALIZER =
+            new RowDataSerializer(DEFAULT_ROW_TYPE);
     public static final RowDataSerializer KEY_SERIALIZER = new RowDataSerializer(KEY_TYPE);
     public static final RecordComparator KEY_COMPARATOR =
             (a, b) -> {
@@ -89,6 +120,7 @@ public class TestKeyValueGenerator {
                 return Long.compare(a.getLong(1), b.getLong(1));
             };
 
+    private final GeneratorMode mode;
     private final Random random;
 
     private final List<Order> addedOrders;
@@ -96,13 +128,42 @@ public class TestKeyValueGenerator {
 
     private long sequenceNumber;
 
+    private final RowDataSerializer rowSerializer;
+    private final RowDataSerializer partitionSerializer;
+
     public TestKeyValueGenerator() {
+        this(MULTI_PARTITIONED);
+    }
+
+    public TestKeyValueGenerator(GeneratorMode mode) {
+        this.mode = mode;
         this.random = new Random();
 
         this.addedOrders = new ArrayList<>();
         this.deletedOrders = new ArrayList<>();
 
         this.sequenceNumber = 0;
+
+        RowType rowType;
+        RowType partitionType;
+        switch (mode) {
+            case NON_PARTITIONED:
+                rowType = NON_PARTITIONED_ROW_TYPE;
+                partitionType = NON_PARTITIONED_PART_TYPE;
+                break;
+            case SINGLE_PARTITIONED:
+                rowType = SINGLE_PARTITIONED_ROW_TYPE;
+                partitionType = SINGLE_PARTITIONED_PART_TYPE;
+                break;
+            case MULTI_PARTITIONED:
+                rowType = DEFAULT_ROW_TYPE;
+                partitionType = DEFAULT_PART_TYPE;
+                break;
+            default:
+                throw new UnsupportedOperationException("Unsupported generator mode: " + mode);
+        }
+        rowSerializer = new RowDataSerializer(rowType);
+        partitionSerializer = new RowDataSerializer(partitionType);
     }
 
     public KeyValue next() {
@@ -140,31 +201,65 @@ public class TestKeyValueGenerator {
                                 .copy(),
                         sequenceNumber++,
                         kind,
-                        ROW_SERIALIZER
-                                .toBinaryRow(
-                                        GenericRowData.of(
-                                                StringData.fromString(order.dt),
-                                                order.hr,
-                                                order.shopId,
-                                                order.orderId,
-                                                order.itemId,
-                                                order.priceAmount == null
-                                                        ? null
-                                                        : new GenericArrayData(order.priceAmount),
-                                                StringData.fromString(order.comment)))
-                                .copy());
+                        rowSerializer.toBinaryRow(convertToRow(order)).copy());
+    }
+
+    private RowData convertToRow(Order order) {
+        List<Object> values =
+                new ArrayList<>(
+                        Arrays.asList(
+                                order.shopId,
+                                order.orderId,
+                                order.itemId,
+                                order.priceAmount == null
+                                        ? null
+                                        : new GenericArrayData(order.priceAmount),
+                                StringData.fromString(order.comment)));
+
+        if (mode == MULTI_PARTITIONED) {
+            values.add(0, StringData.fromString(order.dt));
+            values.add(1, order.hr);
+        } else if (mode == SINGLE_PARTITIONED) {
+            values.add(0, StringData.fromString(order.dt));
+        }
+        return GenericRowData.of(values.toArray(new Object[0]));
     }
 
     public BinaryRowData getPartition(KeyValue kv) {
-        return PARTITION_SERIALIZER
-                .toBinaryRow(GenericRowData.of(kv.value().getString(0), kv.value().getInt(1)))
-                .copy();
+        Object[] values;
+        if (mode == MULTI_PARTITIONED) {
+            values = new Object[] {kv.value().getString(0), kv.value().getInt(1)};
+        } else if (mode == SINGLE_PARTITIONED) {
+            values = new Object[] {kv.value().getString(0)};
+        } else {
+            values = new Object[0];
+        }
+        return partitionSerializer.toBinaryRow(GenericRowData.of(values)).copy();
     }
 
-    public static Map<String, String> toPartitionMap(BinaryRowData partition) {
+    public static List<String> getPrimaryKeys(GeneratorMode mode) {
+        List<String> trimmedPk =
+                KEY_TYPE.getFieldNames().stream()
+                        .map(f -> f.replaceFirst("key_", ""))
+                        .collect(Collectors.toList());
+        if (mode != NON_PARTITIONED) {
+            trimmedPk = new ArrayList<>(trimmedPk);
+            trimmedPk.addAll(
+                    mode == MULTI_PARTITIONED
+                            ? DEFAULT_PART_TYPE.getFieldNames()
+                            : SINGLE_PARTITIONED_PART_TYPE.getFieldNames());
+        }
+        return trimmedPk;
+    }
+
+    public static Map<String, String> toPartitionMap(BinaryRowData partition, GeneratorMode mode) {
         Map<String, String> map = new HashMap<>();
-        map.put("dt", partition.getString(0).toString());
-        map.put("hr", String.valueOf(partition.getInt(1)));
+        if (mode == MULTI_PARTITIONED) {
+            map.put("dt", partition.getString(0).toString());
+            map.put("hr", String.valueOf(partition.getInt(1)));
+        } else if (mode == SINGLE_PARTITIONED) {
+            map.put("dt", partition.getString(0).toString());
+        }
         return map;
     }
 
@@ -220,4 +315,11 @@ public class TestKeyValueGenerator {
             comment = random.nextInt(10) == 0 ? null : randomString(random.nextInt(1001 - 10) + 10);
         }
     }
+
+    /** Generator mode for {@link TestKeyValueGenerator}. */
+    public enum GeneratorMode {
+        NON_PARTITIONED,
+        SINGLE_PARTITIONED,
+        MULTI_PARTITIONED
+    }
 }
diff --git a/flink-table-store-core/src/test/java/org/apache/flink/table/store/file/data/DataFileTest.java b/flink-table-store-core/src/test/java/org/apache/flink/table/store/file/data/DataFileTest.java
index 800c565a..781a91e6 100644
--- a/flink-table-store-core/src/test/java/org/apache/flink/table/store/file/data/DataFileTest.java
+++ b/flink-table-store-core/src/test/java/org/apache/flink/table/store/file/data/DataFileTest.java
@@ -81,7 +81,7 @@ public class DataFileTest {
 
         checkRollingFiles(
                 TestKeyValueGenerator.KEY_TYPE,
-                TestKeyValueGenerator.ROW_TYPE,
+                TestKeyValueGenerator.DEFAULT_ROW_TYPE,
                 data.meta,
                 actualMetas,
                 writer.suggestedFileSize());
@@ -91,7 +91,7 @@ public class DataFileTest {
                 data,
                 actualMetas,
                 TestKeyValueGenerator.KEY_SERIALIZER,
-                TestKeyValueGenerator.ROW_SERIALIZER,
+                TestKeyValueGenerator.DEFAULT_ROW_SERIALIZER,
                 serializer,
                 reader,
                 kv -> kv);
@@ -137,7 +137,7 @@ public class DataFileTest {
                 data,
                 actualMetas,
                 projectedKeySerializer,
-                TestKeyValueGenerator.ROW_SERIALIZER,
+                TestKeyValueGenerator.DEFAULT_ROW_SERIALIZER,
                 serializer,
                 fileReader,
                 kv ->
@@ -208,7 +208,7 @@ public class DataFileTest {
         int suggestedFileSize = ThreadLocalRandom.current().nextInt(8192) + 1024;
         return new DataFileWriter.Factory(
                         TestKeyValueGenerator.KEY_TYPE,
-                        TestKeyValueGenerator.ROW_TYPE,
+                        TestKeyValueGenerator.DEFAULT_ROW_TYPE,
                         // normal format will buffer changes in memory and we can't determine
                         // if the written file size is really larger than suggested, so we use a
                         // special format which flushes for every added element
@@ -224,7 +224,7 @@ public class DataFileTest {
         DataFileReader.Factory factory =
                 new DataFileReader.Factory(
                         TestKeyValueGenerator.KEY_TYPE,
-                        TestKeyValueGenerator.ROW_TYPE,
+                        TestKeyValueGenerator.DEFAULT_ROW_TYPE,
                         new FlushingFileFormat(format),
                         pathFactory);
         if (keyProjection != null) {
diff --git a/flink-table-store-core/src/test/java/org/apache/flink/table/store/file/data/DataFileTestDataGenerator.java b/flink-table-store-core/src/test/java/org/apache/flink/table/store/file/data/DataFileTestDataGenerator.java
index f436f552..2163ed62 100644
--- a/flink-table-store-core/src/test/java/org/apache/flink/table/store/file/data/DataFileTestDataGenerator.java
+++ b/flink-table-store-core/src/test/java/org/apache/flink/table/store/file/data/DataFileTestDataGenerator.java
@@ -102,7 +102,7 @@ public class DataFileTestDataGenerator {
         FieldStatsCollector keyStatsCollector =
                 new FieldStatsCollector(TestKeyValueGenerator.KEY_TYPE);
         FieldStatsCollector valueStatsCollector =
-                new FieldStatsCollector(TestKeyValueGenerator.ROW_TYPE);
+                new FieldStatsCollector(TestKeyValueGenerator.DEFAULT_ROW_TYPE);
         long totalSize = 0;
         BinaryRowData minKey = null;
         BinaryRowData maxKey = null;
diff --git a/flink-table-store-core/src/test/java/org/apache/flink/table/store/file/manifest/ManifestFileTest.java b/flink-table-store-core/src/test/java/org/apache/flink/table/store/file/manifest/ManifestFileTest.java
index 5b9a85cf..535d63dd 100644
--- a/flink-table-store-core/src/test/java/org/apache/flink/table/store/file/manifest/ManifestFileTest.java
+++ b/flink-table-store-core/src/test/java/org/apache/flink/table/store/file/manifest/ManifestFileTest.java
@@ -22,7 +22,6 @@ import org.apache.flink.configuration.Configuration;
 import org.apache.flink.core.fs.FileSystem;
 import org.apache.flink.core.fs.Path;
 import org.apache.flink.table.store.file.FileStoreOptions;
-import org.apache.flink.table.store.file.TestKeyValueGenerator;
 import org.apache.flink.table.store.file.format.FileFormat;
 import org.apache.flink.table.store.file.stats.StatsTestUtils;
 import org.apache.flink.table.store.file.utils.FailingAtomicRenameFileSystem;
@@ -37,6 +36,7 @@ import java.util.List;
 import java.util.concurrent.ThreadLocalRandom;
 import java.util.stream.Collectors;
 
+import static org.apache.flink.table.store.file.TestKeyValueGenerator.DEFAULT_PART_TYPE;
 import static org.assertj.core.api.Assertions.assertThat;
 
 /** Tests for {@link ManifestFile}. */
@@ -96,12 +96,11 @@ public class ManifestFileTest {
         FileStorePathFactory pathFactory =
                 new FileStorePathFactory(
                         new Path(path),
-                        TestKeyValueGenerator.PARTITION_TYPE,
+                        DEFAULT_PART_TYPE,
                         "default",
                         FileStoreOptions.FILE_FORMAT.defaultValue());
         int suggestedFileSize = ThreadLocalRandom.current().nextInt(8192) + 1024;
-        return new ManifestFile.Factory(
-                        TestKeyValueGenerator.PARTITION_TYPE, avro, pathFactory, suggestedFileSize)
+        return new ManifestFile.Factory(DEFAULT_PART_TYPE, avro, pathFactory, suggestedFileSize)
                 .create();
     }
 
diff --git a/flink-table-store-core/src/test/java/org/apache/flink/table/store/file/manifest/ManifestListTest.java b/flink-table-store-core/src/test/java/org/apache/flink/table/store/file/manifest/ManifestListTest.java
index 5db2111d..38a4d68d 100644
--- a/flink-table-store-core/src/test/java/org/apache/flink/table/store/file/manifest/ManifestListTest.java
+++ b/flink-table-store-core/src/test/java/org/apache/flink/table/store/file/manifest/ManifestListTest.java
@@ -94,10 +94,10 @@ public class ManifestListTest {
         FileStorePathFactory pathFactory =
                 new FileStorePathFactory(
                         new Path(path),
-                        TestKeyValueGenerator.PARTITION_TYPE,
+                        TestKeyValueGenerator.DEFAULT_PART_TYPE,
                         "default",
                         FileStoreOptions.FILE_FORMAT.defaultValue());
-        return new ManifestList.Factory(TestKeyValueGenerator.PARTITION_TYPE, avro, pathFactory)
+        return new ManifestList.Factory(TestKeyValueGenerator.DEFAULT_PART_TYPE, avro, pathFactory)
                 .create();
     }
 }
diff --git a/flink-table-store-core/src/test/java/org/apache/flink/table/store/file/manifest/ManifestTestDataGenerator.java b/flink-table-store-core/src/test/java/org/apache/flink/table/store/file/manifest/ManifestTestDataGenerator.java
index db5dbd8c..659ae3b8 100644
--- a/flink-table-store-core/src/test/java/org/apache/flink/table/store/file/manifest/ManifestTestDataGenerator.java
+++ b/flink-table-store-core/src/test/java/org/apache/flink/table/store/file/manifest/ManifestTestDataGenerator.java
@@ -83,7 +83,7 @@ public class ManifestTestDataGenerator {
                 !entries.isEmpty(), "Manifest entries are empty. Invalid test data.");
 
         FieldStatsCollector collector =
-                new FieldStatsCollector(TestKeyValueGenerator.PARTITION_TYPE);
+                new FieldStatsCollector(TestKeyValueGenerator.DEFAULT_PART_TYPE);
 
         long numAddedFiles = 0;
         long numDeletedFiles = 0;
diff --git a/flink-table-store-core/src/test/java/org/apache/flink/table/store/file/operation/FileStoreCommitTest.java b/flink-table-store-core/src/test/java/org/apache/flink/table/store/file/operation/FileStoreCommitTest.java
index fca49e69..ed270788 100644
--- a/flink-table-store-core/src/test/java/org/apache/flink/table/store/file/operation/FileStoreCommitTest.java
+++ b/flink-table-store-core/src/test/java/org/apache/flink/table/store/file/operation/FileStoreCommitTest.java
@@ -263,6 +263,24 @@ public class FileStoreCommitTest {
         assertThat(snapshot.getLogOffsets()).isEqualTo(expected);
     }
 
+    @Test
+    public void testCommitEmpty() throws Exception {
+        TestFileStore store = createStore(false, 2);
+        Snapshot snapshot =
+                store.commitData(
+                                generateDataList(10),
+                                gen::getPartition,
+                                kv -> 0,
+                                Collections.emptyMap())
+                        .get(0);
+
+        store.commitData(
+                Collections.emptyList(), gen::getPartition, kv -> 0, Collections.emptyMap());
+        Path snapshotDir = store.pathFactory().snapshotDirectory();
+
+        assertThat(SnapshotFinder.findLatest(snapshotDir)).isEqualTo(snapshot.id());
+    }
+
     private TestFileStore createStore(boolean failing) {
         return createStore(failing, 1);
     }
@@ -276,9 +294,9 @@ public class FileStoreCommitTest {
                 "avro",
                 root,
                 numBucket,
-                TestKeyValueGenerator.PARTITION_TYPE,
+                TestKeyValueGenerator.DEFAULT_PART_TYPE,
                 TestKeyValueGenerator.KEY_TYPE,
-                TestKeyValueGenerator.ROW_TYPE,
+                TestKeyValueGenerator.DEFAULT_ROW_TYPE,
                 new DeduplicateMergeFunction());
     }
 
@@ -310,7 +328,10 @@ public class FileStoreCommitTest {
 
         LOG.debug("========== Beginning of " + name + " ==========");
         for (KeyValue kv : supplier.get()) {
-            LOG.debug(kv.toString(TestKeyValueGenerator.KEY_TYPE, TestKeyValueGenerator.ROW_TYPE));
+            LOG.debug(
+                    kv.toString(
+                            TestKeyValueGenerator.KEY_TYPE,
+                            TestKeyValueGenerator.DEFAULT_ROW_TYPE));
         }
         LOG.debug("========== End of " + name + " ==========");
     }
diff --git a/flink-table-store-core/src/test/java/org/apache/flink/table/store/file/operation/FileStoreExpireTest.java b/flink-table-store-core/src/test/java/org/apache/flink/table/store/file/operation/FileStoreExpireTest.java
index 00022240..25516460 100644
--- a/flink-table-store-core/src/test/java/org/apache/flink/table/store/file/operation/FileStoreExpireTest.java
+++ b/flink-table-store-core/src/test/java/org/apache/flink/table/store/file/operation/FileStoreExpireTest.java
@@ -58,9 +58,9 @@ public class FileStoreExpireTest {
                         "avro",
                         tempDir.toString(),
                         1,
-                        TestKeyValueGenerator.PARTITION_TYPE,
+                        TestKeyValueGenerator.DEFAULT_PART_TYPE,
                         TestKeyValueGenerator.KEY_TYPE,
-                        TestKeyValueGenerator.ROW_TYPE,
+                        TestKeyValueGenerator.DEFAULT_ROW_TYPE,
                         new DeduplicateMergeFunction());
         pathFactory = store.pathFactory();
     }
diff --git a/flink-table-store-core/src/test/java/org/apache/flink/table/store/file/operation/FileStoreReadTest.java b/flink-table-store-core/src/test/java/org/apache/flink/table/store/file/operation/FileStoreReadTest.java
index c9aff153..e236d517 100644
--- a/flink-table-store-core/src/test/java/org/apache/flink/table/store/file/operation/FileStoreReadTest.java
+++ b/flink-table-store-core/src/test/java/org/apache/flink/table/store/file/operation/FileStoreReadTest.java
@@ -133,9 +133,9 @@ public class FileStoreReadTest {
         }
         TestFileStore store =
                 createStore(
-                        TestKeyValueGenerator.PARTITION_TYPE,
+                        TestKeyValueGenerator.DEFAULT_PART_TYPE,
                         TestKeyValueGenerator.KEY_TYPE,
-                        TestKeyValueGenerator.ROW_TYPE,
+                        TestKeyValueGenerator.DEFAULT_ROW_TYPE,
                         new DeduplicateMergeFunction());
 
         RowDataSerializer projectedValueSerializer =
diff --git a/flink-table-store-core/src/test/java/org/apache/flink/table/store/file/operation/FileStoreScanTest.java b/flink-table-store-core/src/test/java/org/apache/flink/table/store/file/operation/FileStoreScanTest.java
index e590b76b..e2ca788c 100644
--- a/flink-table-store-core/src/test/java/org/apache/flink/table/store/file/operation/FileStoreScanTest.java
+++ b/flink-table-store-core/src/test/java/org/apache/flink/table/store/file/operation/FileStoreScanTest.java
@@ -65,9 +65,9 @@ public class FileStoreScanTest {
                         "avro",
                         tempDir.toString(),
                         NUM_BUCKETS,
-                        TestKeyValueGenerator.PARTITION_TYPE,
+                        TestKeyValueGenerator.DEFAULT_PART_TYPE,
                         TestKeyValueGenerator.KEY_TYPE,
-                        TestKeyValueGenerator.ROW_TYPE,
+                        TestKeyValueGenerator.DEFAULT_ROW_TYPE,
                         new DeduplicateMergeFunction());
         pathFactory = store.pathFactory();
     }
diff --git a/flink-table-store-core/src/test/java/org/apache/flink/table/store/file/operation/TestCommitThread.java b/flink-table-store-core/src/test/java/org/apache/flink/table/store/file/operation/TestCommitThread.java
index 7ed31a87..c65ef8bb 100644
--- a/flink-table-store-core/src/test/java/org/apache/flink/table/store/file/operation/TestCommitThread.java
+++ b/flink-table-store-core/src/test/java/org/apache/flink/table/store/file/operation/TestCommitThread.java
@@ -38,6 +38,8 @@ import java.util.concurrent.ExecutorService;
 import java.util.concurrent.Executors;
 import java.util.concurrent.ThreadLocalRandom;
 
+import static org.apache.flink.table.store.file.TestKeyValueGenerator.GeneratorMode.MULTI_PARTITIONED;
+
 /** Testing {@link Thread}s to perform concurrent commits. */
 public class TestCommitThread extends Thread {
 
@@ -114,7 +116,7 @@ public class TestCommitThread extends Thread {
                 committable,
                 () ->
                         commit.overwrite(
-                                TestKeyValueGenerator.toPartitionMap(partition),
+                                TestKeyValueGenerator.toPartitionMap(partition, MULTI_PARTITIONED),
                                 committable,
                                 Collections.emptyMap()));
     }