You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by cz...@apache.org on 2023/01/06 08:17:10 UTC

[flink-table-store] branch release-0.3 updated: [FLINK-30573] Fix bug that Table Store dedicated compact job may skip some records when checkpoint interval is long

This is an automated email from the ASF dual-hosted git repository.

czweng pushed a commit to branch release-0.3
in repository https://gitbox.apache.org/repos/asf/flink-table-store.git


The following commit(s) were added to refs/heads/release-0.3 by this push:
     new 1ba8f372 [FLINK-30573] Fix bug that Table Store dedicated compact job may skip some records when checkpoint interval is long
1ba8f372 is described below

commit 1ba8f372a05d64e7370f7dcbb65b77e23079f8ef
Author: tsreaper <ts...@gmail.com>
AuthorDate: Fri Jan 6 16:06:50 2023 +0800

    [FLINK-30573] Fix bug that Table Store dedicated compact job may skip some records when checkpoint interval is long
    
    This closes #460
    
    (cherry picked from commit 8fdbbd4e8c22a82d79509cab0add4a6aa7331672)
---
 .../table/store/connector/sink/CompactorSink.java  |   2 +-
 .../store/connector/sink/CompactorSinkBuilder.java |   8 +-
 .../sink/FullChangelogStoreSinkWrite.java          | 122 ++++++++++++--------
 ...ava => OffsetRowDataHashStreamPartitioner.java} |  23 ++--
 .../store/connector/sink/StoreCompactOperator.java |  30 ++++-
 .../table/store/connector/sink/StoreSinkWrite.java |   4 +
 .../store/connector/sink/StoreSinkWriteImpl.java   |  20 ++++
 .../connector/source/CompactorSourceBuilder.java   |  16 ++-
 .../ChangelogWithKeyFileStoreTableITCase.java      | 123 +++++++++++++++++----
 .../connector/action/CompactActionITCase.java      |  43 +++----
 .../store/connector/sink/CompactorSinkITCase.java  |  41 +++++--
 .../connector/source/CompactorSourceITCase.java    | 120 ++++++++++----------
 .../source/FileStoreSourceSplitGeneratorTest.java  |   1 +
 .../source/FileStoreSourceSplitSerializerTest.java |   2 +-
 .../source/TestChangelogDataReadWrite.java         |   3 +-
 .../src/test/resources/log4j2-test.properties      |   2 +-
 .../table/store/file/append/AppendOnlyWriter.java  |   5 +
 .../store/file/mergetree/MergeTreeWriter.java      |   5 +
 .../file/mergetree/compact/CompactStrategy.java    |   7 +-
 .../file/operation/AbstractFileStoreWrite.java     | 102 ++++++++++-------
 .../file/operation/AppendOnlyFileStoreWrite.java   |  27 ++++-
 .../table/store/file/operation/FileStoreWrite.java |  15 +++
 .../file/operation/KeyValueFileStoreWrite.java     |  20 +++-
 .../store/file/operation/MemoryFileStoreWrite.java |   8 +-
 .../table/store/file/utils/ObjectSerializer.java   |  17 +++
 .../flink/table/store/file/utils/RecordWriter.java |  10 ++
 .../flink/table/store/table/sink/TableWrite.java   |  12 ++
 .../table/store/table/sink/TableWriteImpl.java     |   7 ++
 .../store/table/source/AbstractDataTableScan.java  |  28 +++--
 .../flink/table/store/table/source/DataSplit.java  |  28 ++++-
 .../ContinuousCompactorFollowUpScanner.java        |   5 +-
 .../table/store/table/system/BucketsTable.java     |  48 ++++++--
 .../flink/table/store/file/TestFileStore.java      |  11 +-
 .../file/operation/KeyValueFileStoreReadTest.java  |   5 +-
 .../store/file/operation/TestCommitThread.java     |   6 +-
 .../flink/table/store/table/source/SplitTest.java  |   8 +-
 .../ContinuousCompactorFollowUpScannerTest.java    |  32 ++++--
 .../ContinuousCompactorStartingScannerTest.java    |   4 +-
 .../src/test/resources/log4j2-test.properties      |   2 +-
 .../table/store/tests/FlinkActionsE2eTest.java     |   2 +-
 .../src/test/resources/log4j2-test.properties      |   2 +-
 .../store/mapred/TableStoreInputSplitTest.java     |   1 +
 .../src/test/resources/log4j2-test.properties      |   2 +-
 .../src/test/resources/log4j2-test.properties      |   2 +-
 .../src/test/resources/log4j2-test.properties      |   2 +-
 .../src/test/resources/log4j2-test.properties      |   2 +-
 46 files changed, 691 insertions(+), 294 deletions(-)

diff --git a/flink-table-store-connector/src/main/java/org/apache/flink/table/store/connector/sink/CompactorSink.java b/flink-table-store-connector/src/main/java/org/apache/flink/table/store/connector/sink/CompactorSink.java
index 4de06f34..c234858f 100644
--- a/flink-table-store-connector/src/main/java/org/apache/flink/table/store/connector/sink/CompactorSink.java
+++ b/flink-table-store-connector/src/main/java/org/apache/flink/table/store/connector/sink/CompactorSink.java
@@ -24,7 +24,7 @@ import org.apache.flink.table.store.file.operation.Lock;
 import org.apache.flink.table.store.table.FileStoreTable;
 import org.apache.flink.util.function.SerializableFunction;
 
-/** {@link FlinkSink} for stand-alone compact jobs. */
+/** {@link FlinkSink} for dedicated compact jobs. */
 public class CompactorSink extends FlinkSink {
 
     private static final long serialVersionUID = 1L;
diff --git a/flink-table-store-connector/src/main/java/org/apache/flink/table/store/connector/sink/CompactorSinkBuilder.java b/flink-table-store-connector/src/main/java/org/apache/flink/table/store/connector/sink/CompactorSinkBuilder.java
index aee210ae..c57063a2 100644
--- a/flink-table-store-connector/src/main/java/org/apache/flink/table/store/connector/sink/CompactorSinkBuilder.java
+++ b/flink-table-store-connector/src/main/java/org/apache/flink/table/store/connector/sink/CompactorSinkBuilder.java
@@ -50,9 +50,11 @@ public class CompactorSinkBuilder {
     }
 
     public DataStreamSink<?> build() {
-        HashRowStreamPartitioner partitioner =
-                new HashRowStreamPartitioner(
-                        BucketsTable.rowType(table.schema().logicalPartitionType()));
+        OffsetRowDataHashStreamPartitioner partitioner =
+                new OffsetRowDataHashStreamPartitioner(
+                        BucketsTable.partitionWithBucketRowType(
+                                table.schema().logicalPartitionType()),
+                        1);
         PartitionTransformation<RowData> partitioned =
                 new PartitionTransformation<>(input.getTransformation(), partitioner);
 
diff --git a/flink-table-store-connector/src/main/java/org/apache/flink/table/store/connector/sink/FullChangelogStoreSinkWrite.java b/flink-table-store-connector/src/main/java/org/apache/flink/table/store/connector/sink/FullChangelogStoreSinkWrite.java
index 3d7b0395..1b12b677 100644
--- a/flink-table-store-connector/src/main/java/org/apache/flink/table/store/connector/sink/FullChangelogStoreSinkWrite.java
+++ b/flink-table-store-connector/src/main/java/org/apache/flink/table/store/connector/sink/FullChangelogStoreSinkWrite.java
@@ -38,15 +38,18 @@ import org.apache.flink.table.store.file.utils.SnapshotManager;
 import org.apache.flink.table.store.table.FileStoreTable;
 import org.apache.flink.table.store.table.sink.SinkRecord;
 
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
 import java.io.IOException;
 import java.util.ArrayList;
 import java.util.HashSet;
 import java.util.List;
 import java.util.Map;
 import java.util.NavigableMap;
-import java.util.Optional;
 import java.util.Set;
 import java.util.TreeMap;
+import java.util.TreeSet;
 
 /**
  * {@link StoreSinkWrite} for {@link CoreOptions.ChangelogProducer#FULL_COMPACTION} changelog
@@ -54,6 +57,8 @@ import java.util.TreeMap;
  */
 public class FullChangelogStoreSinkWrite extends StoreSinkWriteImpl {
 
+    private static final Logger LOG = LoggerFactory.getLogger(FullChangelogStoreSinkWrite.class);
+
     private final long fullCompactionThresholdMs;
 
     private final Set<Tuple2<BinaryRowData, Integer>> currentWrittenBuckets;
@@ -64,7 +69,7 @@ public class FullChangelogStoreSinkWrite extends StoreSinkWriteImpl {
     private final NavigableMap<Long, Long> firstWriteMs;
     private final ListState<Tuple2<Long, Long>> firstWriteMsState;
 
-    private Long snapshotIdentifierToCheck;
+    private transient TreeSet<Long> commitIdentifiersToCheck;
 
     public FullChangelogStoreSinkWrite(
             FileStoreTable table,
@@ -116,7 +121,7 @@ public class FullChangelogStoreSinkWrite extends StoreSinkWriteImpl {
         firstWriteMs = new TreeMap<>();
         firstWriteMsState.get().forEach(t -> firstWriteMs.put(t.f0, t.f1));
 
-        snapshotIdentifierToCheck = null;
+        commitIdentifiersToCheck = new TreeSet<>();
     }
 
     @Override
@@ -134,6 +139,10 @@ public class FullChangelogStoreSinkWrite extends StoreSinkWriteImpl {
     }
 
     private void touchBucket(BinaryRowData partition, int bucket) {
+        if (LOG.isDebugEnabled()) {
+            LOG.debug("touch partition {}, bucket {}", partition, bucket);
+        }
+
         // partition is a reused BinaryRowData
         // we first check if the tuple exists to minimize copying
         if (!currentWrittenBuckets.contains(Tuple2.of(partition, bucket))) {
@@ -148,15 +157,9 @@ public class FullChangelogStoreSinkWrite extends StoreSinkWriteImpl {
     @Override
     public List<Committable> prepareCommit(boolean doCompaction, long checkpointId)
             throws IOException {
-        if (snapshotIdentifierToCheck != null) {
-            Optional<Snapshot> snapshot = findSnapshot(snapshotIdentifierToCheck);
-            if (snapshot.map(s -> s.commitKind() == Snapshot.CommitKind.COMPACT).orElse(false)) {
-                writtenBuckets.headMap(snapshotIdentifierToCheck, true).clear();
-                firstWriteMs.headMap(snapshotIdentifierToCheck, true).clear();
-                snapshotIdentifierToCheck = null;
-            }
-        }
+        checkSuccessfulFullCompaction();
 
+        // check what buckets we've modified during this checkpoint interval
         if (!currentWrittenBuckets.isEmpty()) {
             writtenBuckets
                     .computeIfAbsent(checkpointId, k -> new HashSet<>())
@@ -166,8 +169,18 @@ public class FullChangelogStoreSinkWrite extends StoreSinkWriteImpl {
             currentFirstWriteMs = null;
         }
 
-        if (snapshotIdentifierToCheck == null // wait for last forced full compaction to complete
-                && !writtenBuckets.isEmpty() // there should be something to compact
+        if (LOG.isDebugEnabled()) {
+            for (Map.Entry<Long, Set<Tuple2<BinaryRowData, Integer>>> checkpointIdAndBuckets :
+                    writtenBuckets.entrySet()) {
+                LOG.debug(
+                        "Written buckets for checkpoint #{} are:", checkpointIdAndBuckets.getKey());
+                for (Tuple2<BinaryRowData, Integer> bucket : checkpointIdAndBuckets.getValue()) {
+                    LOG.debug("  * partition {}, bucket {}", bucket.f0, bucket.f1);
+                }
+            }
+        }
+
+        if (!writtenBuckets.isEmpty() // there should be something to compact
                 && System.currentTimeMillis() - firstWriteMs.firstEntry().getValue()
                         >= fullCompactionThresholdMs // time without full compaction exceeds
         ) {
@@ -175,58 +188,75 @@ public class FullChangelogStoreSinkWrite extends StoreSinkWriteImpl {
         }
 
         if (doCompaction) {
-            snapshotIdentifierToCheck = checkpointId;
-            Set<Tuple2<BinaryRowData, Integer>> compactedBuckets = new HashSet<>();
-            for (Set<Tuple2<BinaryRowData, Integer>> buckets : writtenBuckets.values()) {
-                for (Tuple2<BinaryRowData, Integer> bucket : buckets) {
-                    if (compactedBuckets.contains(bucket)) {
-                        continue;
-                    }
-                    compactedBuckets.add(bucket);
-                    try {
-                        write.compact(bucket.f0, bucket.f1, true);
-                    } catch (Exception e) {
-                        throw new RuntimeException(e);
-                    }
-                }
+            if (LOG.isDebugEnabled()) {
+                LOG.debug("Submit full compaction for checkpoint #{}", checkpointId);
             }
+            submitFullCompaction();
+            commitIdentifiersToCheck.add(checkpointId);
         }
 
         return super.prepareCommit(doCompaction, checkpointId);
     }
 
-    private Optional<Snapshot> findSnapshot(long identifierToCheck) {
-        // TODO We need a mechanism to do timeout recovery in case of snapshot expiration.
+    private void checkSuccessfulFullCompaction() {
         SnapshotManager snapshotManager = table.snapshotManager();
         Long latestId = snapshotManager.latestSnapshotId();
         if (latestId == null) {
-            return Optional.empty();
+            return;
         }
-
         Long earliestId = snapshotManager.earliestSnapshotId();
         if (earliestId == null) {
-            return Optional.empty();
+            return;
         }
 
-        // We must find the snapshot whose identifier is exactly `identifierToCheck`.
-        // We can't just compare with the latest snapshot identifier by this user (like what we do
-        // in `AbstractFileStoreWrite`), because even if the latest snapshot identifier is newer,
-        // compact changes may still be discarded due to commit conflicts.
         for (long id = latestId; id >= earliestId; id--) {
             Snapshot snapshot = snapshotManager.snapshot(id);
-            if (!snapshot.commitUser().equals(commitUser)) {
-                continue;
-            }
-            if (snapshot.commitIdentifier() == identifierToCheck) {
-                return Optional.of(snapshot);
-            } else if (snapshot.commitIdentifier() < identifierToCheck) {
-                // We're searching from new snapshots to old ones. So if we find an older
-                // identifier, we can make sure our target snapshot will never occur.
-                return Optional.empty();
+            if (snapshot.commitUser().equals(commitUser)
+                    && snapshot.commitKind() == Snapshot.CommitKind.COMPACT) {
+                long commitIdentifier = snapshot.commitIdentifier();
+                if (commitIdentifiersToCheck.contains(commitIdentifier)) {
+                    // We found a full compaction snapshot triggered by `submitFullCompaction`
+                    // method.
+                    //
+                    // Because `submitFullCompaction` will compact all buckets in `writtenBuckets`,
+                    // thus a successful commit indicates that all previous buckets have been
+                    // compacted.
+                    //
+                    // We must make sure that the compact snapshot is triggered by
+                    // `submitFullCompaction`, because normal compaction may also trigger full
+                    // compaction, but that only compacts a specific bucket, not all buckets
+                    // recorded in `writtenBuckets`.
+                    if (LOG.isDebugEnabled()) {
+                        LOG.debug(
+                                "Found full compaction snapshot #{} with identifier {}",
+                                id,
+                                commitIdentifier);
+                    }
+                    writtenBuckets.headMap(commitIdentifier, true).clear();
+                    firstWriteMs.headMap(commitIdentifier, true).clear();
+                    commitIdentifiersToCheck.headSet(commitIdentifier).clear();
+                    break;
+                }
             }
         }
+    }
 
-        return Optional.empty();
+    private void submitFullCompaction() {
+        Set<Tuple2<BinaryRowData, Integer>> compactedBuckets = new HashSet<>();
+        writtenBuckets.forEach(
+                (checkpointId, buckets) -> {
+                    for (Tuple2<BinaryRowData, Integer> bucket : buckets) {
+                        if (compactedBuckets.contains(bucket)) {
+                            continue;
+                        }
+                        compactedBuckets.add(bucket);
+                        try {
+                            write.compact(bucket.f0, bucket.f1, true);
+                        } catch (Exception e) {
+                            throw new RuntimeException(e);
+                        }
+                    }
+                });
     }
 
     @Override
diff --git a/flink-table-store-connector/src/main/java/org/apache/flink/table/store/connector/sink/HashRowStreamPartitioner.java b/flink-table-store-connector/src/main/java/org/apache/flink/table/store/connector/sink/OffsetRowDataHashStreamPartitioner.java
similarity index 70%
rename from flink-table-store-connector/src/main/java/org/apache/flink/table/store/connector/sink/HashRowStreamPartitioner.java
rename to flink-table-store-connector/src/main/java/org/apache/flink/table/store/connector/sink/OffsetRowDataHashStreamPartitioner.java
index f69443a0..d9ed04cd 100644
--- a/flink-table-store-connector/src/main/java/org/apache/flink/table/store/connector/sink/HashRowStreamPartitioner.java
+++ b/flink-table-store-connector/src/main/java/org/apache/flink/table/store/connector/sink/OffsetRowDataHashStreamPartitioner.java
@@ -24,28 +24,37 @@ import org.apache.flink.streaming.runtime.partitioner.StreamPartitioner;
 import org.apache.flink.streaming.runtime.streamrecord.StreamRecord;
 import org.apache.flink.table.data.RowData;
 import org.apache.flink.table.runtime.typeutils.RowDataSerializer;
+import org.apache.flink.table.store.file.utils.OffsetRowData;
 import org.apache.flink.table.types.logical.RowType;
 
-/** {@link StreamPartitioner} to partition {@link RowData} according to its hash value. */
-public class HashRowStreamPartitioner extends StreamPartitioner<RowData> {
+/**
+ * {@link StreamPartitioner} to partition {@link RowData} according to its hash value from an {@link
+ * OffsetRowData}.
+ */
+public class OffsetRowDataHashStreamPartitioner extends StreamPartitioner<RowData> {
 
-    private final RowType rowType;
+    private final RowType offsetRowType;
+    private final int offset;
 
+    private transient OffsetRowData offsetRowData;
     private transient RowDataSerializer serializer;
 
-    public HashRowStreamPartitioner(RowType rowType) {
-        this.rowType = rowType;
+    public OffsetRowDataHashStreamPartitioner(RowType offsetRowType, int offset) {
+        this.offsetRowType = offsetRowType;
+        this.offset = offset;
     }
 
     @Override
     public void setup(int numberOfChannels) {
         super.setup(numberOfChannels);
-        serializer = new RowDataSerializer(rowType);
+        this.offsetRowData = new OffsetRowData(offsetRowType.getFieldCount(), offset);
+        serializer = new RowDataSerializer(offsetRowType);
     }
 
     @Override
     public int selectChannel(SerializationDelegate<StreamRecord<RowData>> record) {
-        int hash = serializer.toBinaryRow(record.getInstance().getValue()).hashCode();
+        RowData rowData = record.getInstance().getValue();
+        int hash = serializer.toBinaryRow(offsetRowData.replace(rowData)).hashCode();
         return Math.abs(hash) % numberOfChannels;
     }
 
diff --git a/flink-table-store-connector/src/main/java/org/apache/flink/table/store/connector/sink/StoreCompactOperator.java b/flink-table-store-connector/src/main/java/org/apache/flink/table/store/connector/sink/StoreCompactOperator.java
index 9ac2957a..fdab6d1f 100644
--- a/flink-table-store-connector/src/main/java/org/apache/flink/table/store/connector/sink/StoreCompactOperator.java
+++ b/flink-table-store-connector/src/main/java/org/apache/flink/table/store/connector/sink/StoreCompactOperator.java
@@ -24,6 +24,8 @@ import org.apache.flink.table.data.RowData;
 import org.apache.flink.table.data.binary.BinaryRowData;
 import org.apache.flink.table.runtime.typeutils.RowDataSerializer;
 import org.apache.flink.table.store.CoreOptions;
+import org.apache.flink.table.store.file.io.DataFileMeta;
+import org.apache.flink.table.store.file.io.DataFileMetaSerializer;
 import org.apache.flink.table.store.file.utils.OffsetRowData;
 import org.apache.flink.table.store.table.FileStoreTable;
 import org.apache.flink.util.Preconditions;
@@ -47,6 +49,7 @@ public class StoreCompactOperator extends PrepareCommitOperator {
     private transient StoreSinkWrite write;
     private transient RowDataSerializer partitionSerializer;
     private transient OffsetRowData reusedPartition;
+    private transient DataFileMetaSerializer dataFileMetaSerializer;
 
     public StoreCompactOperator(
             FileStoreTable table,
@@ -73,17 +76,34 @@ public class StoreCompactOperator extends PrepareCommitOperator {
     public void open() throws Exception {
         super.open();
         partitionSerializer = new RowDataSerializer(table.schema().logicalPartitionType());
-        reusedPartition = new OffsetRowData(partitionSerializer.getArity(), 0);
+        reusedPartition = new OffsetRowData(partitionSerializer.getArity(), 1);
+        dataFileMetaSerializer = new DataFileMetaSerializer();
     }
 
     @Override
     public void processElement(StreamRecord<RowData> element) throws Exception {
-        RowData partitionAndBucket = element.getValue();
-        reusedPartition.replace(partitionAndBucket);
+        RowData record = element.getValue();
+
+        long snapshotId = record.getLong(0);
+
+        reusedPartition.replace(record);
         BinaryRowData partition = partitionSerializer.toBinaryRow(reusedPartition).copy();
-        int bucket = partitionAndBucket.getInt(partitionSerializer.getArity());
 
-        write.compact(partition, bucket, !isStreaming);
+        int bucket = record.getInt(partitionSerializer.getArity() + 1);
+
+        byte[] serializedFiles = record.getBinary(partitionSerializer.getArity() + 2);
+        List<DataFileMeta> files = dataFileMetaSerializer.deserializeList(serializedFiles);
+
+        if (isStreaming) {
+            write.notifyNewFiles(snapshotId, partition, bucket, files);
+            write.compact(partition, bucket, false);
+        } else {
+            Preconditions.checkArgument(
+                    files.isEmpty(),
+                    "Batch compact job does not concern what files are compacted. "
+                            + "They only need to know what buckets are compacted.");
+            write.compact(partition, bucket, true);
+        }
     }
 
     @Override
diff --git a/flink-table-store-connector/src/main/java/org/apache/flink/table/store/connector/sink/StoreSinkWrite.java b/flink-table-store-connector/src/main/java/org/apache/flink/table/store/connector/sink/StoreSinkWrite.java
index fe3465d9..f44e0de4 100644
--- a/flink-table-store-connector/src/main/java/org/apache/flink/table/store/connector/sink/StoreSinkWrite.java
+++ b/flink-table-store-connector/src/main/java/org/apache/flink/table/store/connector/sink/StoreSinkWrite.java
@@ -23,6 +23,7 @@ import org.apache.flink.runtime.state.StateInitializationContext;
 import org.apache.flink.runtime.state.StateSnapshotContext;
 import org.apache.flink.table.data.RowData;
 import org.apache.flink.table.data.binary.BinaryRowData;
+import org.apache.flink.table.store.file.io.DataFileMeta;
 import org.apache.flink.table.store.table.FileStoreTable;
 import org.apache.flink.table.store.table.sink.SinkRecord;
 
@@ -39,6 +40,9 @@ interface StoreSinkWrite {
 
     void compact(BinaryRowData partition, int bucket, boolean fullCompaction) throws Exception;
 
+    void notifyNewFiles(
+            long snapshotId, BinaryRowData partition, int bucket, List<DataFileMeta> files);
+
     List<Committable> prepareCommit(boolean doCompaction, long checkpointId) throws IOException;
 
     void snapshotState(StateSnapshotContext context) throws Exception;
diff --git a/flink-table-store-connector/src/main/java/org/apache/flink/table/store/connector/sink/StoreSinkWriteImpl.java b/flink-table-store-connector/src/main/java/org/apache/flink/table/store/connector/sink/StoreSinkWriteImpl.java
index 374616ec..75186dac 100644
--- a/flink-table-store-connector/src/main/java/org/apache/flink/table/store/connector/sink/StoreSinkWriteImpl.java
+++ b/flink-table-store-connector/src/main/java/org/apache/flink/table/store/connector/sink/StoreSinkWriteImpl.java
@@ -23,11 +23,15 @@ import org.apache.flink.runtime.state.StateInitializationContext;
 import org.apache.flink.runtime.state.StateSnapshotContext;
 import org.apache.flink.table.data.RowData;
 import org.apache.flink.table.data.binary.BinaryRowData;
+import org.apache.flink.table.store.file.io.DataFileMeta;
 import org.apache.flink.table.store.table.FileStoreTable;
 import org.apache.flink.table.store.table.sink.FileCommittable;
 import org.apache.flink.table.store.table.sink.SinkRecord;
 import org.apache.flink.table.store.table.sink.TableWrite;
 
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
 import java.io.IOException;
 import java.util.ArrayList;
 import java.util.List;
@@ -35,6 +39,8 @@ import java.util.List;
 /** Default implementation of {@link StoreSinkWrite}. This writer does not have states. */
 public class StoreSinkWriteImpl implements StoreSinkWrite {
 
+    private static final Logger LOG = LoggerFactory.getLogger(StoreSinkWriteImpl.class);
+
     protected final FileStoreTable table;
     protected final String commitUser;
     protected final TableWrite write;
@@ -84,6 +90,20 @@ public class StoreSinkWriteImpl implements StoreSinkWrite {
         write.compact(partition, bucket, fullCompaction);
     }
 
+    @Override
+    public void notifyNewFiles(
+            long snapshotId, BinaryRowData partition, int bucket, List<DataFileMeta> files) {
+        if (LOG.isDebugEnabled()) {
+            LOG.debug(
+                    "Receive {} new files from snapshot {}, partition {}, bucket {}",
+                    files.size(),
+                    snapshotId,
+                    partition,
+                    bucket);
+        }
+        write.notifyNewFiles(snapshotId, partition, bucket, files);
+    }
+
     @Override
     public List<Committable> prepareCommit(boolean doCompaction, long checkpointId)
             throws IOException {
diff --git a/flink-table-store-connector/src/main/java/org/apache/flink/table/store/connector/source/CompactorSourceBuilder.java b/flink-table-store-connector/src/main/java/org/apache/flink/table/store/connector/source/CompactorSourceBuilder.java
index 649daa7e..43cbb3f0 100644
--- a/flink-table-store-connector/src/main/java/org/apache/flink/table/store/connector/source/CompactorSourceBuilder.java
+++ b/flink-table-store-connector/src/main/java/org/apache/flink/table/store/connector/source/CompactorSourceBuilder.java
@@ -44,12 +44,12 @@ import java.util.Map;
 
 /**
  * Source builder to build a Flink {@link StaticFileStoreSource} or {@link
- * ContinuousFileStoreSource}. This is for stand-alone compactor jobs.
+ * ContinuousFileStoreSource}. This is for dedicated compactor jobs.
  */
 public class CompactorSourceBuilder {
 
     private final String tableIdentifier;
-    private final BucketsTable bucketsTable;
+    private final FileStoreTable table;
 
     private boolean isContinuous = false;
     private StreamExecutionEnvironment env;
@@ -57,7 +57,7 @@ public class CompactorSourceBuilder {
 
     public CompactorSourceBuilder(String tableIdentifier, FileStoreTable table) {
         this.tableIdentifier = tableIdentifier;
-        this.bucketsTable = new BucketsTable(table);
+        this.table = table;
     }
 
     public CompactorSourceBuilder withContinuousMode(boolean isContinuous) {
@@ -79,13 +79,16 @@ public class CompactorSourceBuilder {
         return this;
     }
 
-    private Source<RowData, ?, ?> buildSource() {
+    private Source<RowData, ?, ?> buildSource(BucketsTable bucketsTable) {
         Predicate partitionPredicate = null;
         if (specifiedPartitions != null) {
+            // This predicate is based on the row type of the original table, not bucket table.
+            // Because TableScan in BucketsTable is the same with FileStoreTable,
+            // and partition filter is done by scan.
             partitionPredicate =
                     PredicateBuilder.or(
                             specifiedPartitions.stream()
-                                    .map(p -> PredicateConverter.fromMap(p, bucketsTable.rowType()))
+                                    .map(p -> PredicateConverter.fromMap(p, table.rowType()))
                                     .toArray(Predicate[]::new));
         }
 
@@ -122,9 +125,10 @@ public class CompactorSourceBuilder {
             throw new IllegalArgumentException("StreamExecutionEnvironment should not be null.");
         }
 
+        BucketsTable bucketsTable = new BucketsTable(table, isContinuous);
         LogicalType produceType = bucketsTable.rowType();
         return env.fromSource(
-                buildSource(),
+                buildSource(bucketsTable),
                 WatermarkStrategy.noWatermarks(),
                 tableIdentifier + "-compact-source",
                 InternalTypeInfo.of(produceType));
diff --git a/flink-table-store-connector/src/test/java/org/apache/flink/table/store/connector/ChangelogWithKeyFileStoreTableITCase.java b/flink-table-store-connector/src/test/java/org/apache/flink/table/store/connector/ChangelogWithKeyFileStoreTableITCase.java
index b759040e..21a73a05 100644
--- a/flink-table-store-connector/src/test/java/org/apache/flink/table/store/connector/ChangelogWithKeyFileStoreTableITCase.java
+++ b/flink-table-store-connector/src/test/java/org/apache/flink/table/store/connector/ChangelogWithKeyFileStoreTableITCase.java
@@ -18,6 +18,9 @@
 
 package org.apache.flink.table.store.connector;
 
+import org.apache.flink.api.common.JobStatus;
+import org.apache.flink.api.common.restartstrategy.RestartStrategies;
+import org.apache.flink.core.execution.JobClient;
 import org.apache.flink.core.fs.Path;
 import org.apache.flink.runtime.client.JobStatusMessage;
 import org.apache.flink.runtime.testutils.MiniClusterResourceConfiguration;
@@ -105,24 +108,21 @@ public class ChangelogWithKeyFileStoreTableITCase extends TestBaseUtils {
         return TableEnvironment.create(EnvironmentSettings.newInstance().inBatchMode().build());
     }
 
-    private TableEnvironment createStreamingTableEnvironment() {
+    private TableEnvironment createStreamingTableEnvironment(int checkpointIntervalMs) {
         TableEnvironment sEnv =
                 TableEnvironment.create(
                         EnvironmentSettings.newInstance().inStreamingMode().build());
         // set checkpoint interval to a random number to emulate different speed of commit
         sEnv.getConfig()
                 .getConfiguration()
-                .set(
-                        CHECKPOINTING_INTERVAL,
-                        Duration.ofMillis(ThreadLocalRandom.current().nextInt(900) + 100));
+                .set(CHECKPOINTING_INTERVAL, Duration.ofMillis(checkpointIntervalMs));
         return sEnv;
     }
 
-    private StreamExecutionEnvironment createStreamExecutionEnvironment() {
+    private StreamExecutionEnvironment createStreamExecutionEnvironment(int checkpointIntervalMs) {
         StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
         env.getCheckpointConfig().setCheckpointingMode(CheckpointingMode.EXACTLY_ONCE);
-        env.getCheckpointConfig()
-                .setCheckpointInterval(ThreadLocalRandom.current().nextInt(900) + 100);
+        env.getCheckpointConfig().setCheckpointInterval(checkpointIntervalMs);
         return env;
     }
 
@@ -132,7 +132,8 @@ public class ChangelogWithKeyFileStoreTableITCase extends TestBaseUtils {
 
     @Test
     public void testFullCompactionTriggerInterval() throws Exception {
-        TableEnvironment sEnv = createStreamingTableEnvironment();
+        TableEnvironment sEnv =
+                createStreamingTableEnvironment(ThreadLocalRandom.current().nextInt(900) + 100);
         sEnv.getConfig()
                 .getConfiguration()
                 .set(ExecutionConfigOptions.TABLE_EXEC_RESOURCE_DEFAULT_PARALLELISM, 1);
@@ -206,40 +207,103 @@ public class ChangelogWithKeyFileStoreTableITCase extends TestBaseUtils {
         Assert.assertEquals(expected, actual);
     }
 
+    @Test
+    public void testFullCompactionWithLongCheckpointInterval() throws Exception {
+        // create table
+        TableEnvironment bEnv = createBatchTableEnvironment();
+        bEnv.getConfig()
+                .getConfiguration()
+                .set(ExecutionConfigOptions.TABLE_EXEC_RESOURCE_DEFAULT_PARALLELISM, 1);
+        bEnv.executeSql(
+                String.format(
+                        "CREATE CATALOG testCatalog WITH ('type'='table-store', 'warehouse'='%s')",
+                        path));
+        bEnv.executeSql("USE CATALOG testCatalog");
+        bEnv.executeSql(
+                "CREATE TABLE T ("
+                        + "  k INT,"
+                        + "  v INT,"
+                        + "  PRIMARY KEY (k) NOT ENFORCED"
+                        + ") WITH ("
+                        + "  'bucket' = '1',"
+                        + "  'changelog-producer' = 'full-compaction',"
+                        + "  'changelog-producer.compaction-interval' = '2s',"
+                        + "  'write.compaction-skip' = 'true'"
+                        + ")");
+
+        // run select job
+        TableEnvironment sEnv = createStreamingTableEnvironment(100);
+        sEnv.getConfig()
+                .getConfiguration()
+                .set(ExecutionConfigOptions.TABLE_EXEC_RESOURCE_DEFAULT_PARALLELISM, 1);
+        sEnv.executeSql(
+                String.format(
+                        "CREATE CATALOG testCatalog WITH ('type'='table-store', 'warehouse'='%s')",
+                        path));
+        sEnv.executeSql("USE CATALOG testCatalog");
+        CloseableIterator<Row> it = sEnv.executeSql("SELECT * FROM T").collect();
+
+        // run compact job
+        StreamExecutionEnvironment env = createStreamExecutionEnvironment(2000);
+        env.setParallelism(1);
+        env.setRestartStrategy(RestartStrategies.noRestart());
+        FlinkActions.compact(new Path(path + "/default.db/T")).build(env);
+        JobClient client = env.executeAsync();
+
+        // write records for a while
+        long startMs = System.currentTimeMillis();
+        int currentKey = 0;
+        while (System.currentTimeMillis() - startMs <= 10000) {
+            currentKey++;
+            bEnv.executeSql(
+                            String.format(
+                                    "INSERT INTO T VALUES (%d, %d)", currentKey, currentKey * 100))
+                    .await();
+        }
+
+        Assert.assertEquals(JobStatus.RUNNING, client.getJobStatus().get());
+
+        for (int i = 1; i <= currentKey; i++) {
+            Assert.assertTrue(it.hasNext());
+            Assert.assertEquals(String.format("+I[%d, %d]", i, i * 100), it.next().toString());
+        }
+        it.close();
+    }
+
     // ------------------------------------------------------------------------
     //  Random Tests
     // ------------------------------------------------------------------------
 
-    @Test
+    @Test(timeout = 600000)
     public void testNoChangelogProducerBatchRandom() throws Exception {
         TableEnvironment bEnv = createBatchTableEnvironment();
         testNoChangelogProducerRandom(bEnv, 1, false);
     }
 
-    @Test
+    @Test(timeout = 600000)
     public void testNoChangelogProducerStreamingRandom() throws Exception {
-        TableEnvironment sEnv = createStreamingTableEnvironment();
         ThreadLocalRandom random = ThreadLocalRandom.current();
+        TableEnvironment sEnv = createStreamingTableEnvironment(random.nextInt(900) + 100);
         testNoChangelogProducerRandom(sEnv, random.nextInt(1, 3), random.nextBoolean());
     }
 
-    @Test
+    @Test(timeout = 600000)
     public void testFullCompactionChangelogProducerBatchRandom() throws Exception {
         TableEnvironment bEnv = createBatchTableEnvironment();
         testFullCompactionChangelogProducerRandom(bEnv, 1, false);
     }
 
-    @Test
+    @Test(timeout = 600000)
     public void testFullCompactionChangelogProducerStreamingRandom() throws Exception {
-        TableEnvironment sEnv = createStreamingTableEnvironment();
         ThreadLocalRandom random = ThreadLocalRandom.current();
+        TableEnvironment sEnv = createStreamingTableEnvironment(random.nextInt(900) + 100);
         testFullCompactionChangelogProducerRandom(sEnv, random.nextInt(1, 3), random.nextBoolean());
     }
 
-    @Test
+    @Test(timeout = 600000)
     public void testStandAloneFullCompactJobRandom() throws Exception {
-        TableEnvironment sEnv = createStreamingTableEnvironment();
         ThreadLocalRandom random = ThreadLocalRandom.current();
+        TableEnvironment sEnv = createStreamingTableEnvironment(random.nextInt(900) + 100);
         testStandAloneFullCompactJobRandom(sEnv, random.nextInt(1, 3), random.nextBoolean());
     }
 
@@ -259,38 +323,49 @@ public class ChangelogWithKeyFileStoreTableITCase extends TestBaseUtils {
 
     private void testFullCompactionChangelogProducerRandom(
             TableEnvironment tEnv, int numProducers, boolean enableFailure) throws Exception {
+        ThreadLocalRandom random = ThreadLocalRandom.current();
+
         testRandom(
                 tEnv,
                 numProducers,
                 enableFailure,
                 "'bucket' = '4',"
+                        + String.format(
+                                "'write-buffer-size' = '%s',",
+                                random.nextBoolean() ? "512kb" : "1mb")
                         + "'changelog-producer' = 'full-compaction',"
                         + "'changelog-producer.compaction-interval' = '1s'");
 
         // sleep for a random amount of time to check
         // if we can first read complete records then read incremental records correctly
-        Thread.sleep(ThreadLocalRandom.current().nextInt(5000));
+        Thread.sleep(random.nextInt(5000));
 
         checkFullCompactionTestResult(numProducers);
     }
 
     private void testStandAloneFullCompactJobRandom(
             TableEnvironment tEnv, int numProducers, boolean enableConflicts) throws Exception {
+        ThreadLocalRandom random = ThreadLocalRandom.current();
+
         testRandom(
                 tEnv,
                 numProducers,
                 false,
                 "'bucket' = '4',"
+                        + String.format(
+                                "'write-buffer-size' = '%s',",
+                                random.nextBoolean() ? "512kb" : "1mb")
                         + "'changelog-producer' = 'full-compaction',"
-                        + "'changelog-producer.compaction-interval' = '1s',"
+                        + "'changelog-producer.compaction-interval' = '2s',"
                         + "'write.compaction-skip' = 'true'");
 
         // sleep for a random amount of time to check
-        // if stand-alone compactor job can find first snapshot to compact correctly
-        Thread.sleep(ThreadLocalRandom.current().nextInt(2500));
+        // if dedicated compactor job can find first snapshot to compact correctly
+        Thread.sleep(random.nextInt(2500));
 
         for (int i = enableConflicts ? 2 : 1; i > 0; i--) {
-            StreamExecutionEnvironment env = createStreamExecutionEnvironment();
+            StreamExecutionEnvironment env =
+                    createStreamExecutionEnvironment(random.nextInt(1900) + 100);
             env.setParallelism(2);
             FlinkActions.compact(new Path(path + "/default.db/T")).build(env);
             env.executeAsync();
@@ -298,13 +373,13 @@ public class ChangelogWithKeyFileStoreTableITCase extends TestBaseUtils {
 
         // sleep for a random amount of time to check
         // if we can first read complete records then read incremental records correctly
-        Thread.sleep(ThreadLocalRandom.current().nextInt(2500));
+        Thread.sleep(random.nextInt(2500));
 
         checkFullCompactionTestResult(numProducers);
     }
 
     private void checkFullCompactionTestResult(int numProducers) throws Exception {
-        TableEnvironment sEnv = createStreamingTableEnvironment();
+        TableEnvironment sEnv = createStreamingTableEnvironment(100);
         sEnv.getConfig()
                 .getConfiguration()
                 .set(ExecutionConfigOptions.TABLE_EXEC_RESOURCE_DEFAULT_PARALLELISM, 1);
@@ -385,7 +460,7 @@ public class ChangelogWithKeyFileStoreTableITCase extends TestBaseUtils {
                                 + (LIMIT + NUM_PARTS * NUM_KEYS)
                                 + "',"
                                 + "  'rows-per-second' = '"
-                                + (LIMIT / 5 + ThreadLocalRandom.current().nextInt(LIMIT / 10))
+                                + (LIMIT / 20 + ThreadLocalRandom.current().nextInt(LIMIT / 20))
                                 + "'"
                                 + ")")
                 .await();
diff --git a/flink-table-store-connector/src/test/java/org/apache/flink/table/store/connector/action/CompactActionITCase.java b/flink-table-store-connector/src/test/java/org/apache/flink/table/store/connector/action/CompactActionITCase.java
index 332d04c7..803ea251 100644
--- a/flink-table-store-connector/src/test/java/org/apache/flink/table/store/connector/action/CompactActionITCase.java
+++ b/flink-table-store-connector/src/test/java/org/apache/flink/table/store/connector/action/CompactActionITCase.java
@@ -25,6 +25,7 @@ import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
 import org.apache.flink.table.api.DataTypes;
 import org.apache.flink.table.data.GenericRowData;
 import org.apache.flink.table.data.RowData;
+import org.apache.flink.table.data.StringData;
 import org.apache.flink.table.store.CoreOptions;
 import org.apache.flink.table.store.file.Snapshot;
 import org.apache.flink.table.store.file.mergetree.compact.ConcatRecordReader;
@@ -69,9 +70,9 @@ public class CompactActionITCase extends AbstractTestBase {
                         DataTypes.INT().getLogicalType(),
                         DataTypes.INT().getLogicalType(),
                         DataTypes.INT().getLogicalType(),
-                        DataTypes.INT().getLogicalType()
+                        DataTypes.STRING().getLogicalType()
                     },
-                    new String[] {"dt", "hh", "k", "v"});
+                    new String[] {"k", "v", "hh", "dt"});
 
     private Path tablePath;
     private String commitUser;
@@ -92,14 +93,14 @@ public class CompactActionITCase extends AbstractTestBase {
         TableWrite write = table.newWrite(commitUser);
         TableCommit commit = table.newCommit(commitUser);
 
-        write.write(rowData(20221208, 15, 1, 100));
-        write.write(rowData(20221208, 16, 1, 100));
-        write.write(rowData(20221209, 15, 1, 100));
+        write.write(rowData(1, 100, 15, StringData.fromString("20221208")));
+        write.write(rowData(1, 100, 16, StringData.fromString("20221208")));
+        write.write(rowData(1, 100, 15, StringData.fromString("20221209")));
         commit.commit(0, write.prepareCommit(true, 0));
 
-        write.write(rowData(20221208, 15, 2, 200));
-        write.write(rowData(20221208, 16, 2, 200));
-        write.write(rowData(20221209, 15, 2, 200));
+        write.write(rowData(2, 200, 15, StringData.fromString("20221208")));
+        write.write(rowData(2, 200, 16, StringData.fromString("20221208")));
+        write.write(rowData(2, 200, 15, StringData.fromString("20221209")));
         commit.commit(1, write.prepareCommit(true, 1));
 
         Snapshot snapshot = snapshotManager.snapshot(snapshotManager.latestSnapshotId());
@@ -145,9 +146,9 @@ public class CompactActionITCase extends AbstractTestBase {
         TableCommit commit = table.newCommit(commitUser);
 
         // base records
-        write.write(rowData(20221208, 15, 1, 100));
-        write.write(rowData(20221208, 16, 1, 100));
-        write.write(rowData(20221209, 15, 1, 100));
+        write.write(rowData(1, 100, 15, StringData.fromString("20221208")));
+        write.write(rowData(1, 100, 16, StringData.fromString("20221208")));
+        write.write(rowData(1, 100, 15, StringData.fromString("20221209")));
         commit.commit(0, write.prepareCommit(true, 0));
 
         Snapshot snapshot = snapshotManager.snapshot(snapshotManager.latestSnapshotId());
@@ -180,12 +181,12 @@ public class CompactActionITCase extends AbstractTestBase {
         Assert.assertEquals(2, (long) plan.snapshotId);
         List<String> actual = getResult(table.newRead(), plan.splits());
         actual.sort(String::compareTo);
-        Assert.assertEquals(Arrays.asList("+I 20221208|15|1|100", "+I 20221209|15|1|100"), actual);
+        Assert.assertEquals(Arrays.asList("+I 1|100|15|20221208", "+I 1|100|15|20221209"), actual);
 
         // incremental records
-        write.write(rowData(20221208, 15, 1, 101));
-        write.write(rowData(20221208, 16, 1, 101));
-        write.write(rowData(20221209, 15, 1, 101));
+        write.write(rowData(1, 101, 15, StringData.fromString("20221208")));
+        write.write(rowData(1, 101, 16, StringData.fromString("20221208")));
+        write.write(rowData(1, 101, 15, StringData.fromString("20221209")));
         commit.commit(1, write.prepareCommit(true, 1));
 
         write.close();
@@ -205,10 +206,10 @@ public class CompactActionITCase extends AbstractTestBase {
         actual.sort(String::compareTo);
         Assert.assertEquals(
                 Arrays.asList(
-                        "+U 20221208|15|1|101",
-                        "+U 20221209|15|1|101",
-                        "-U 20221208|15|1|100",
-                        "-U 20221209|15|1|100"),
+                        "+U 1|101|15|20221208",
+                        "+U 1|101|15|20221209",
+                        "-U 1|100|15|20221208",
+                        "-U 1|100|15|20221209"),
                 actual);
     }
 
@@ -246,12 +247,12 @@ public class CompactActionITCase extends AbstractTestBase {
 
     private String rowDataToString(RowData rowData) {
         return String.format(
-                "%s %d|%d|%d|%d",
+                "%s %d|%d|%d|%s",
                 rowData.getRowKind().shortString(),
                 rowData.getInt(0),
                 rowData.getInt(1),
                 rowData.getInt(2),
-                rowData.getInt(3));
+                rowData.getString(3).toString());
     }
 
     private FileStoreTable createFileStoreTable(Map<String, String> options) throws Exception {
diff --git a/flink-table-store-connector/src/test/java/org/apache/flink/table/store/connector/sink/CompactorSinkITCase.java b/flink-table-store-connector/src/test/java/org/apache/flink/table/store/connector/sink/CompactorSinkITCase.java
index 774074cb..8477cf5c 100644
--- a/flink-table-store-connector/src/test/java/org/apache/flink/table/store/connector/sink/CompactorSinkITCase.java
+++ b/flink-table-store-connector/src/test/java/org/apache/flink/table/store/connector/sink/CompactorSinkITCase.java
@@ -25,6 +25,8 @@ import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
 import org.apache.flink.table.api.DataTypes;
 import org.apache.flink.table.data.GenericRowData;
 import org.apache.flink.table.data.RowData;
+import org.apache.flink.table.data.StringData;
+import org.apache.flink.table.store.connector.source.CompactorSourceBuilder;
 import org.apache.flink.table.store.file.Snapshot;
 import org.apache.flink.table.store.file.schema.SchemaManager;
 import org.apache.flink.table.store.file.schema.TableSchema;
@@ -47,6 +49,9 @@ import org.junit.Test;
 import java.io.IOException;
 import java.util.Arrays;
 import java.util.Collections;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
 import java.util.UUID;
 
 /** IT cases for {@link CompactorSinkBuilder} and {@link CompactorSink}. */
@@ -58,9 +63,9 @@ public class CompactorSinkITCase extends AbstractTestBase {
                         DataTypes.INT().getLogicalType(),
                         DataTypes.INT().getLogicalType(),
                         DataTypes.INT().getLogicalType(),
-                        DataTypes.INT().getLogicalType()
+                        DataTypes.STRING().getLogicalType()
                     },
-                    new String[] {"dt", "hh", "k", "v"});
+                    new String[] {"k", "v", "hh", "dt"});
 
     private Path tablePath;
     private String commitUser;
@@ -78,14 +83,14 @@ public class CompactorSinkITCase extends AbstractTestBase {
         TableWrite write = table.newWrite(commitUser);
         TableCommit commit = table.newCommit(commitUser);
 
-        write.write(rowData(20221208, 15, 1, 100));
-        write.write(rowData(20221208, 16, 1, 100));
-        write.write(rowData(20221209, 15, 1, 100));
+        write.write(rowData(1, 100, 15, StringData.fromString("20221208")));
+        write.write(rowData(1, 100, 16, StringData.fromString("20221208")));
+        write.write(rowData(1, 100, 15, StringData.fromString("20221209")));
         commit.commit(0, write.prepareCommit(true, 0));
 
-        write.write(rowData(20221208, 15, 2, 200));
-        write.write(rowData(20221208, 16, 2, 200));
-        write.write(rowData(20221209, 15, 2, 200));
+        write.write(rowData(2, 200, 15, StringData.fromString("20221208")));
+        write.write(rowData(2, 200, 16, StringData.fromString("20221208")));
+        write.write(rowData(2, 200, 15, StringData.fromString("20221209")));
         commit.commit(1, write.prepareCommit(true, 1));
 
         Snapshot snapshot = snapshotManager.snapshot(snapshotManager.latestSnapshotId());
@@ -97,8 +102,14 @@ public class CompactorSinkITCase extends AbstractTestBase {
 
         StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
         env.setRuntimeMode(RuntimeExecutionMode.BATCH);
+        CompactorSourceBuilder sourceBuilder =
+                new CompactorSourceBuilder(tablePath.toString(), table);
         DataStreamSource<RowData> source =
-                env.fromElements(rowData(20221208, 15, 0), rowData(20221209, 15, 0));
+                sourceBuilder
+                        .withEnv(env)
+                        .withContinuousMode(false)
+                        .withPartitions(getSpecifiedPartitions())
+                        .build();
         new CompactorSinkBuilder(table).withInput(source).build();
         env.execute();
 
@@ -119,6 +130,18 @@ public class CompactorSinkITCase extends AbstractTestBase {
         }
     }
 
+    private List<Map<String, String>> getSpecifiedPartitions() {
+        Map<String, String> partition1 = new HashMap<>();
+        partition1.put("dt", "20221208");
+        partition1.put("hh", "15");
+
+        Map<String, String> partition2 = new HashMap<>();
+        partition2.put("dt", "20221209");
+        partition2.put("hh", "15");
+
+        return Arrays.asList(partition1, partition2);
+    }
+
     private GenericRowData rowData(Object... values) {
         return GenericRowData.of(values);
     }
diff --git a/flink-table-store-connector/src/test/java/org/apache/flink/table/store/connector/source/CompactorSourceITCase.java b/flink-table-store-connector/src/test/java/org/apache/flink/table/store/connector/source/CompactorSourceITCase.java
index f7ebba03..49d037ca 100644
--- a/flink-table-store-connector/src/test/java/org/apache/flink/table/store/connector/source/CompactorSourceITCase.java
+++ b/flink-table-store-connector/src/test/java/org/apache/flink/table/store/connector/source/CompactorSourceITCase.java
@@ -24,8 +24,10 @@ import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
 import org.apache.flink.table.api.DataTypes;
 import org.apache.flink.table.data.GenericRowData;
 import org.apache.flink.table.data.RowData;
+import org.apache.flink.table.data.StringData;
 import org.apache.flink.table.data.binary.BinaryRowData;
 import org.apache.flink.table.data.writer.BinaryRowWriter;
+import org.apache.flink.table.store.file.io.DataFileMetaSerializer;
 import org.apache.flink.table.store.file.schema.SchemaManager;
 import org.apache.flink.table.store.file.schema.TableSchema;
 import org.apache.flink.table.store.file.schema.UpdateSchema;
@@ -42,6 +44,7 @@ import org.junit.Before;
 import org.junit.Test;
 
 import java.io.IOException;
+import java.io.UncheckedIOException;
 import java.util.ArrayList;
 import java.util.Arrays;
 import java.util.Collections;
@@ -60,10 +63,12 @@ public class CompactorSourceITCase extends AbstractTestBase {
                     new LogicalType[] {
                         DataTypes.INT().getLogicalType(),
                         DataTypes.INT().getLogicalType(),
-                        DataTypes.INT().getLogicalType(),
+                        DataTypes.STRING().getLogicalType(),
                         DataTypes.INT().getLogicalType()
                     },
-                    new String[] {"dt", "hh", "k", "v"});
+                    new String[] {"k", "v", "dt", "hh"});
+
+    private final DataFileMetaSerializer dataFileMetaSerializer = new DataFileMetaSerializer();
 
     private Path tablePath;
     private String commitUser;
@@ -80,12 +85,12 @@ public class CompactorSourceITCase extends AbstractTestBase {
         TableWrite write = table.newWrite(commitUser);
         TableCommit commit = table.newCommit(commitUser);
 
-        write.write(rowData(20221208, 15, 1, 1510));
-        write.write(rowData(20221208, 16, 2, 1620));
+        write.write(rowData(1, 1510, StringData.fromString("20221208"), 15));
+        write.write(rowData(2, 1620, StringData.fromString("20221208"), 16));
         commit.commit(0, write.prepareCommit(true, 0));
 
-        write.write(rowData(20221208, 15, 1, 1511));
-        write.write(rowData(20221209, 15, 1, 1510));
+        write.write(rowData(1, 1511, StringData.fromString("20221208"), 15));
+        write.write(rowData(1, 1510, StringData.fromString("20221209"), 15));
         commit.commit(1, write.prepareCommit(true, 1));
 
         StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
@@ -102,7 +107,10 @@ public class CompactorSourceITCase extends AbstractTestBase {
         }
         assertThat(actual)
                 .hasSameElementsAs(
-                        Arrays.asList("+I 20221208|15|0", "+I 20221208|16|0", "+I 20221209|15|0"));
+                        Arrays.asList(
+                                "+I 2|20221208|15|0|0",
+                                "+I 2|20221208|16|0|0",
+                                "+I 2|20221209|15|0|0"));
 
         write.close();
         commit.close();
@@ -115,22 +123,22 @@ public class CompactorSourceITCase extends AbstractTestBase {
         TableWrite write = table.newWrite(commitUser);
         TableCommit commit = table.newCommit(commitUser);
 
-        write.write(rowData(20221208, 15, 1, 1510));
-        write.write(rowData(20221208, 16, 2, 1620));
+        write.write(rowData(1, 1510, StringData.fromString("20221208"), 15));
+        write.write(rowData(2, 1620, StringData.fromString("20221208"), 16));
         commit.commit(0, write.prepareCommit(true, 0));
 
-        write.write(rowData(20221208, 15, 1, 1511));
-        write.write(rowData(20221209, 15, 1, 1510));
-        write.compact(binaryRow(20221208, 15), 0, true);
-        write.compact(binaryRow(20221209, 15), 0, true);
+        write.write(rowData(1, 1511, StringData.fromString("20221208"), 15));
+        write.write(rowData(1, 1510, StringData.fromString("20221209"), 15));
+        write.compact(binaryRow("20221208", 15), 0, true);
+        write.compact(binaryRow("20221209", 15), 0, true);
         commit.commit(1, write.prepareCommit(true, 1));
 
-        write.write(rowData(20221208, 15, 2, 1520));
-        write.write(rowData(20221208, 16, 2, 1621));
+        write.write(rowData(2, 1520, StringData.fromString("20221208"), 15));
+        write.write(rowData(2, 1621, StringData.fromString("20221208"), 16));
         commit.commit(2, write.prepareCommit(true, 2));
 
-        write.write(rowData(20221208, 15, 1, 1512));
-        write.write(rowData(20221209, 16, 2, 1620));
+        write.write(rowData(1, 1512, StringData.fromString("20221208"), 15));
+        write.write(rowData(2, 1620, StringData.fromString("20221209"), 16));
         commit.commit(3, write.prepareCommit(true, 3));
 
         StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
@@ -148,35 +156,22 @@ public class CompactorSourceITCase extends AbstractTestBase {
         assertThat(actual)
                 .hasSameElementsAs(
                         Arrays.asList(
-                                "+I 20221208|15|0",
-                                "+I 20221208|16|0",
-                                "+I 20221208|15|0",
-                                "+I 20221209|16|0"));
-
-        write.write(rowData(20221209, 15, 2, 1520));
-        write.write(rowData(20221208, 16, 1, 1510));
-        write.write(rowData(20221209, 15, 1, 1511));
+                                "+I 4|20221208|15|0|1",
+                                "+I 4|20221208|16|0|1",
+                                "+I 5|20221208|15|0|1",
+                                "+I 5|20221209|16|0|1"));
+
+        write.write(rowData(2, 1520, StringData.fromString("20221209"), 15));
+        write.write(rowData(1, 1510, StringData.fromString("20221208"), 16));
+        write.write(rowData(1, 1511, StringData.fromString("20221209"), 15));
         commit.commit(4, write.prepareCommit(true, 4));
 
         actual.clear();
         for (int i = 0; i < 2; i++) {
             actual.add(toString(it.next()));
         }
-        assertThat(actual).hasSameElementsAs(Arrays.asList("+I 20221208|16|0", "+I 20221209|15|0"));
-
-        write.close();
-        commit.close();
-
-        write = table.newWrite(commitUser).withOverwrite(true);
-        Map<String, String> partitionMap = new HashMap<>();
-        partitionMap.put("dt", "20221209");
-        partitionMap.put("hh", "16");
-        commit = table.newCommit(commitUser).withOverwritePartition(partitionMap);
-        write.write(rowData(20221209, 16, 1, 1512));
-        write.write(rowData(20221209, 16, 2, 1622));
-        commit.commit(5, write.prepareCommit(true, 5));
-
-        assertThat(toString(it.next())).isEqualTo("+I 20221209|16|0");
+        assertThat(actual)
+                .hasSameElementsAs(Arrays.asList("+I 6|20221208|16|0|1", "+I 6|20221209|15|0|1"));
 
         write.close();
         commit.close();
@@ -189,10 +184,10 @@ public class CompactorSourceITCase extends AbstractTestBase {
                 true,
                 getSpecifiedPartitions(),
                 Arrays.asList(
-                        "+I 20221208|16|0",
-                        "+I 20221209|15|0",
-                        "+I 20221208|16|0",
-                        "+I 20221209|15|0"));
+                        "+I 1|20221208|16|0|1",
+                        "+I 2|20221209|15|0|1",
+                        "+I 3|20221208|16|0|1",
+                        "+I 3|20221209|15|0|1"));
     }
 
     @Test
@@ -200,7 +195,7 @@ public class CompactorSourceITCase extends AbstractTestBase {
         testPartitionSpec(
                 false,
                 getSpecifiedPartitions(),
-                Arrays.asList("+I 20221208|16|0", "+I 20221209|15|0"));
+                Arrays.asList("+I 3|20221208|16|0|0", "+I 3|20221209|15|0|0"));
     }
 
     private List<Map<String, String>> getSpecifiedPartitions() {
@@ -224,17 +219,17 @@ public class CompactorSourceITCase extends AbstractTestBase {
         TableWrite write = table.newWrite(commitUser);
         TableCommit commit = table.newCommit(commitUser);
 
-        write.write(rowData(20221208, 15, 1, 1510));
-        write.write(rowData(20221208, 16, 2, 1620));
+        write.write(rowData(1, 1510, StringData.fromString("20221208"), 15));
+        write.write(rowData(2, 1620, StringData.fromString("20221208"), 16));
         commit.commit(0, write.prepareCommit(true, 0));
 
-        write.write(rowData(20221208, 15, 2, 1520));
-        write.write(rowData(20221209, 15, 2, 1520));
+        write.write(rowData(2, 1520, StringData.fromString("20221208"), 15));
+        write.write(rowData(2, 1520, StringData.fromString("20221209"), 15));
         commit.commit(1, write.prepareCommit(true, 1));
 
-        write.write(rowData(20221208, 15, 1, 1511));
-        write.write(rowData(20221208, 16, 1, 1610));
-        write.write(rowData(20221209, 15, 1, 1510));
+        write.write(rowData(1, 1511, StringData.fromString("20221208"), 15));
+        write.write(rowData(1, 1610, StringData.fromString("20221208"), 16));
+        write.write(rowData(1, 1510, StringData.fromString("20221209"), 15));
         commit.commit(2, write.prepareCommit(true, 2));
 
         StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
@@ -258,22 +253,31 @@ public class CompactorSourceITCase extends AbstractTestBase {
     }
 
     private String toString(RowData rowData) {
+        int numFiles;
+        try {
+            numFiles = dataFileMetaSerializer.deserializeList(rowData.getBinary(4)).size();
+        } catch (IOException e) {
+            throw new UncheckedIOException(e);
+        }
+
         return String.format(
-                "%s %d|%d|%d",
+                "%s %d|%s|%d|%d|%d",
                 rowData.getRowKind().shortString(),
-                rowData.getInt(0),
-                rowData.getInt(1),
-                rowData.getInt(2));
+                rowData.getLong(0),
+                rowData.getString(1).toString(),
+                rowData.getInt(2),
+                rowData.getInt(3),
+                numFiles);
     }
 
     private GenericRowData rowData(Object... values) {
         return GenericRowData.of(values);
     }
 
-    private BinaryRowData binaryRow(int dt, int hh) {
+    private BinaryRowData binaryRow(String dt, int hh) {
         BinaryRowData b = new BinaryRowData(2);
         BinaryRowWriter writer = new BinaryRowWriter(b);
-        writer.writeInt(0, dt);
+        writer.writeString(0, StringData.fromString(dt));
         writer.writeInt(1, hh);
         writer.complete();
         return b;
diff --git a/flink-table-store-connector/src/test/java/org/apache/flink/table/store/connector/source/FileStoreSourceSplitGeneratorTest.java b/flink-table-store-connector/src/test/java/org/apache/flink/table/store/connector/source/FileStoreSourceSplitGeneratorTest.java
index 00ccf65f..8d7b1937 100644
--- a/flink-table-store-connector/src/test/java/org/apache/flink/table/store/connector/source/FileStoreSourceSplitGeneratorTest.java
+++ b/flink-table-store-connector/src/test/java/org/apache/flink/table/store/connector/source/FileStoreSourceSplitGeneratorTest.java
@@ -78,6 +78,7 @@ public class FileStoreSourceSplitGeneratorTest {
                 };
         List<DataSplit> scanSplits =
                 AbstractDataTableScan.generateSplits(
+                        1L,
                         false,
                         Collections::singletonList,
                         plan.groupByPartFiles(plan.files(FileKind.ADD)));
diff --git a/flink-table-store-connector/src/test/java/org/apache/flink/table/store/connector/source/FileStoreSourceSplitSerializerTest.java b/flink-table-store-connector/src/test/java/org/apache/flink/table/store/connector/source/FileStoreSourceSplitSerializerTest.java
index 3ecc9983..f95c604a 100644
--- a/flink-table-store-connector/src/test/java/org/apache/flink/table/store/connector/source/FileStoreSourceSplitSerializerTest.java
+++ b/flink-table-store-connector/src/test/java/org/apache/flink/table/store/connector/source/FileStoreSourceSplitSerializerTest.java
@@ -118,7 +118,7 @@ public class FileStoreSourceSplitSerializerTest {
             boolean isIncremental,
             long recordsToSkip) {
         return new FileStoreSourceSplit(
-                id, new DataSplit(partition, bucket, files, isIncremental), recordsToSkip);
+                id, new DataSplit(1L, partition, bucket, files, isIncremental), recordsToSkip);
     }
 
     private static FileStoreSourceSplit serializeAndDeserialize(FileStoreSourceSplit split)
diff --git a/flink-table-store-connector/src/test/java/org/apache/flink/table/store/connector/source/TestChangelogDataReadWrite.java b/flink-table-store-connector/src/test/java/org/apache/flink/table/store/connector/source/TestChangelogDataReadWrite.java
index 3df55343..42d7cc13 100644
--- a/flink-table-store-connector/src/test/java/org/apache/flink/table/store/connector/source/TestChangelogDataReadWrite.java
+++ b/flink-table-store-connector/src/test/java/org/apache/flink/table/store/connector/source/TestChangelogDataReadWrite.java
@@ -184,7 +184,8 @@ public class TestChangelogDataReadWrite {
                                 null, // not used, we only create an empty writer
                                 options,
                                 EXTRACTOR)
-                        .createEmptyWriter(partition, bucket, service);
+                        .createEmptyWriterContainer(partition, bucket, service)
+                        .writer;
         ((MemoryOwner) writer)
                 .setMemoryPool(
                         new HeapMemorySegmentPool(options.writeBufferSize(), options.pageSize()));
diff --git a/flink-table-store-connector/src/test/resources/log4j2-test.properties b/flink-table-store-connector/src/test/resources/log4j2-test.properties
index 835c2ec9..1b3980d1 100644
--- a/flink-table-store-connector/src/test/resources/log4j2-test.properties
+++ b/flink-table-store-connector/src/test/resources/log4j2-test.properties
@@ -25,4 +25,4 @@ appender.testlogger.name = TestLogger
 appender.testlogger.type = CONSOLE
 appender.testlogger.target = SYSTEM_ERR
 appender.testlogger.layout.type = PatternLayout
-appender.testlogger.layout.pattern = %-4r [%t] %-5p %c %x - %m%n
+appender.testlogger.layout.pattern = %-4r [%tid %t] %-5p %c %x - %m%n
diff --git a/flink-table-store-core/src/main/java/org/apache/flink/table/store/file/append/AppendOnlyWriter.java b/flink-table-store-core/src/main/java/org/apache/flink/table/store/file/append/AppendOnlyWriter.java
index dfa94931..264f2c66 100644
--- a/flink-table-store-core/src/main/java/org/apache/flink/table/store/file/append/AppendOnlyWriter.java
+++ b/flink-table-store-core/src/main/java/org/apache/flink/table/store/file/append/AppendOnlyWriter.java
@@ -98,6 +98,11 @@ public class AppendOnlyWriter implements RecordWriter<RowData> {
         flushWriter(true, fullCompaction);
     }
 
+    @Override
+    public void addNewFiles(List<DataFileMeta> files) {
+        files.forEach(compactManager::addNewFile);
+    }
+
     @Override
     public CommitIncrement prepareCommit(boolean blocking) throws Exception {
         flushWriter(false, false);
diff --git a/flink-table-store-core/src/main/java/org/apache/flink/table/store/file/mergetree/MergeTreeWriter.java b/flink-table-store-core/src/main/java/org/apache/flink/table/store/file/mergetree/MergeTreeWriter.java
index f17c8b0f..8432d109 100644
--- a/flink-table-store-core/src/main/java/org/apache/flink/table/store/file/mergetree/MergeTreeWriter.java
+++ b/flink-table-store-core/src/main/java/org/apache/flink/table/store/file/mergetree/MergeTreeWriter.java
@@ -143,6 +143,11 @@ public class MergeTreeWriter implements RecordWriter<KeyValue>, MemoryOwner {
         flushWriteBuffer(true, fullCompaction);
     }
 
+    @Override
+    public void addNewFiles(List<DataFileMeta> files) {
+        files.forEach(compactManager::addNewFile);
+    }
+
     @Override
     public long memoryOccupancy() {
         return writeBuffer.memoryOccupancy();
diff --git a/flink-table-store-core/src/main/java/org/apache/flink/table/store/file/mergetree/compact/CompactStrategy.java b/flink-table-store-core/src/main/java/org/apache/flink/table/store/file/mergetree/compact/CompactStrategy.java
index 58e5902a..ba363ef4 100644
--- a/flink-table-store-core/src/main/java/org/apache/flink/table/store/file/mergetree/compact/CompactStrategy.java
+++ b/flink-table-store-core/src/main/java/org/apache/flink/table/store/file/mergetree/compact/CompactStrategy.java
@@ -41,6 +41,11 @@ public interface CompactStrategy {
     /** Pick a compaction unit consisting of all existing files. */
     static Optional<CompactUnit> pickFullCompaction(int numLevels, List<LevelSortedRun> runs) {
         int maxLevel = numLevels - 1;
-        return Optional.of(CompactUnit.fromLevelRuns(maxLevel, runs));
+        if (runs.size() == 1 && runs.get(0).level() == maxLevel) {
+            // only 1 sorted run on the max level, nothing to compact
+            return Optional.empty();
+        } else {
+            return Optional.of(CompactUnit.fromLevelRuns(maxLevel, runs));
+        }
     }
 }
diff --git a/flink-table-store-core/src/main/java/org/apache/flink/table/store/file/operation/AbstractFileStoreWrite.java b/flink-table-store-core/src/main/java/org/apache/flink/table/store/file/operation/AbstractFileStoreWrite.java
index 98212081..c498d56d 100644
--- a/flink-table-store-core/src/main/java/org/apache/flink/table/store/file/operation/AbstractFileStoreWrite.java
+++ b/flink-table-store-core/src/main/java/org/apache/flink/table/store/file/operation/AbstractFileStoreWrite.java
@@ -54,12 +54,12 @@ public abstract class AbstractFileStoreWrite<T> implements FileStoreWrite<T> {
     private static final Logger LOG = LoggerFactory.getLogger(AbstractFileStoreWrite.class);
 
     private final String commitUser;
-    private final SnapshotManager snapshotManager;
+    protected final SnapshotManager snapshotManager;
     private final FileStoreScan scan;
 
     @Nullable protected IOManager ioManager;
 
-    protected final Map<BinaryRowData, Map<Integer, WriterWithCommit<T>>> writers;
+    protected final Map<BinaryRowData, Map<Integer, WriterContainer<T>>> writers;
     private final ExecutorService compactExecutor;
 
     private boolean overwrite = false;
@@ -83,14 +83,13 @@ public abstract class AbstractFileStoreWrite<T> implements FileStoreWrite<T> {
         return this;
     }
 
-    protected List<DataFileMeta> scanExistingFileMetas(BinaryRowData partition, int bucket) {
-        Long latestSnapshotId = snapshotManager.latestSnapshotId();
+    protected List<DataFileMeta> scanExistingFileMetas(
+            Long snapshotId, BinaryRowData partition, int bucket) {
         List<DataFileMeta> existingFileMetas = new ArrayList<>();
-        if (latestSnapshotId != null) {
+        if (snapshotId != null) {
             // Concat all the DataFileMeta of existing files into existingFileMetas.
-            scan.withSnapshot(latestSnapshotId)
-                    .withPartitionFilter(Collections.singletonList(partition)).withBucket(bucket)
-                    .plan().files().stream()
+            scan.withSnapshot(snapshotId).withPartitionFilter(Collections.singletonList(partition))
+                    .withBucket(bucket).plan().files().stream()
                     .map(ManifestEntry::file)
                     .forEach(existingFileMetas::add);
         }
@@ -103,14 +102,32 @@ public abstract class AbstractFileStoreWrite<T> implements FileStoreWrite<T> {
 
     @Override
     public void write(BinaryRowData partition, int bucket, T data) throws Exception {
-        RecordWriter<T> writer = getWriter(partition, bucket);
+        RecordWriter<T> writer = getWriterWrapper(partition, bucket).writer;
         writer.write(data);
     }
 
     @Override
     public void compact(BinaryRowData partition, int bucket, boolean fullCompaction)
             throws Exception {
-        getWriter(partition, bucket).compact(fullCompaction);
+        getWriterWrapper(partition, bucket).writer.compact(fullCompaction);
+    }
+
+    @Override
+    public void notifyNewFiles(
+            long snapshotId, BinaryRowData partition, int bucket, List<DataFileMeta> files) {
+        WriterContainer<T> writerContainer = getWriterWrapper(partition, bucket);
+        if (LOG.isDebugEnabled()) {
+            LOG.debug(
+                    "Get extra compact files for partition {}, bucket {}. Extra snapshot {}, base snapshot {}.\nFiles: {}",
+                    partition,
+                    bucket,
+                    snapshotId,
+                    writerContainer.baseSnapshotId,
+                    files);
+        }
+        if (snapshotId > writerContainer.baseSnapshotId) {
+            writerContainer.writer.addNewFiles(files);
+        }
     }
 
     @Override
@@ -142,20 +159,20 @@ public abstract class AbstractFileStoreWrite<T> implements FileStoreWrite<T> {
 
         List<FileCommittable> result = new ArrayList<>();
 
-        Iterator<Map.Entry<BinaryRowData, Map<Integer, WriterWithCommit<T>>>> partIter =
+        Iterator<Map.Entry<BinaryRowData, Map<Integer, WriterContainer<T>>>> partIter =
                 writers.entrySet().iterator();
         while (partIter.hasNext()) {
-            Map.Entry<BinaryRowData, Map<Integer, WriterWithCommit<T>>> partEntry = partIter.next();
+            Map.Entry<BinaryRowData, Map<Integer, WriterContainer<T>>> partEntry = partIter.next();
             BinaryRowData partition = partEntry.getKey();
-            Iterator<Map.Entry<Integer, WriterWithCommit<T>>> bucketIter =
+            Iterator<Map.Entry<Integer, WriterContainer<T>>> bucketIter =
                     partEntry.getValue().entrySet().iterator();
             while (bucketIter.hasNext()) {
-                Map.Entry<Integer, WriterWithCommit<T>> entry = bucketIter.next();
+                Map.Entry<Integer, WriterContainer<T>> entry = bucketIter.next();
                 int bucket = entry.getKey();
-                WriterWithCommit<T> writerWithCommit = entry.getValue();
+                WriterContainer<T> writerContainer = entry.getValue();
 
                 RecordWriter.CommitIncrement increment =
-                        writerWithCommit.writer.prepareCommit(blocking);
+                        writerContainer.writer.prepareCommit(blocking);
                 FileCommittable committable =
                         new FileCommittable(
                                 partition,
@@ -165,8 +182,7 @@ public abstract class AbstractFileStoreWrite<T> implements FileStoreWrite<T> {
                 result.add(committable);
 
                 if (committable.isEmpty()) {
-                    if (writerWithCommit.lastModifiedCommitIdentifier
-                            <= latestCommittedIdentifier) {
+                    if (writerContainer.lastModifiedCommitIdentifier <= latestCommittedIdentifier) {
                         // Clear writer if no update, and if its latest modification has committed.
                         //
                         // We need a mechanism to clear writers, otherwise there will be more and
@@ -178,14 +194,14 @@ public abstract class AbstractFileStoreWrite<T> implements FileStoreWrite<T> {
                                             + "while latest committed identifier is {}",
                                     partition,
                                     bucket,
-                                    writerWithCommit.lastModifiedCommitIdentifier,
+                                    writerContainer.lastModifiedCommitIdentifier,
                                     latestCommittedIdentifier);
                         }
-                        writerWithCommit.writer.close();
+                        writerContainer.writer.close();
                         bucketIter.remove();
                     }
                 } else {
-                    writerWithCommit.lastModifiedCommitIdentifier = commitIdentifier;
+                    writerContainer.lastModifiedCommitIdentifier = commitIdentifier;
                 }
             }
 
@@ -199,56 +215,64 @@ public abstract class AbstractFileStoreWrite<T> implements FileStoreWrite<T> {
 
     @Override
     public void close() throws Exception {
-        for (Map<Integer, WriterWithCommit<T>> bucketWriters : writers.values()) {
-            for (WriterWithCommit<T> writerWithCommit : bucketWriters.values()) {
-                writerWithCommit.writer.close();
+        for (Map<Integer, WriterContainer<T>> bucketWriters : writers.values()) {
+            for (WriterContainer<T> writerContainer : bucketWriters.values()) {
+                writerContainer.writer.close();
             }
         }
         writers.clear();
         compactExecutor.shutdownNow();
     }
 
-    private RecordWriter<T> getWriter(BinaryRowData partition, int bucket) {
-        Map<Integer, WriterWithCommit<T>> buckets = writers.get(partition);
+    private WriterContainer<T> getWriterWrapper(BinaryRowData partition, int bucket) {
+        Map<Integer, WriterContainer<T>> buckets = writers.get(partition);
         if (buckets == null) {
             buckets = new HashMap<>();
             writers.put(partition.copy(), buckets);
         }
-        return buckets.computeIfAbsent(bucket, k -> createWriter(partition.copy(), bucket)).writer;
+        return buckets.computeIfAbsent(
+                bucket, k -> createWriterContainer(partition.copy(), bucket));
     }
 
-    private WriterWithCommit<T> createWriter(BinaryRowData partition, int bucket) {
+    private WriterContainer<T> createWriterContainer(BinaryRowData partition, int bucket) {
         if (LOG.isDebugEnabled()) {
             LOG.debug("Creating writer for partition {}, bucket {}", partition, bucket);
         }
-        RecordWriter<T> writer =
+        WriterContainer<T> writerContainer =
                 overwrite
-                        ? createEmptyWriter(partition.copy(), bucket, compactExecutor)
-                        : createWriter(partition.copy(), bucket, compactExecutor);
-        notifyNewWriter(writer);
-        return new WriterWithCommit<>(writer);
+                        ? createEmptyWriterContainer(partition.copy(), bucket, compactExecutor)
+                        : createWriterContainer(partition.copy(), bucket, compactExecutor);
+        notifyNewWriter(writerContainer.writer);
+        return writerContainer;
     }
 
     protected void notifyNewWriter(RecordWriter<T> writer) {}
 
     /** Create a {@link RecordWriter} from partition and bucket. */
     @VisibleForTesting
-    public abstract RecordWriter<T> createWriter(
+    public abstract WriterContainer<T> createWriterContainer(
             BinaryRowData partition, int bucket, ExecutorService compactExecutor);
 
     /** Create an empty {@link RecordWriter} from partition and bucket. */
     @VisibleForTesting
-    public abstract RecordWriter<T> createEmptyWriter(
+    public abstract WriterContainer<T> createEmptyWriterContainer(
             BinaryRowData partition, int bucket, ExecutorService compactExecutor);
 
-    /** {@link RecordWriter} with identifier of its last modified commit. */
-    protected static class WriterWithCommit<T> {
+    /**
+     * {@link RecordWriter} with the snapshot id it is created upon and the identifier of its last
+     * modified commit.
+     */
+    @VisibleForTesting
+    public static class WriterContainer<T> {
 
-        protected final RecordWriter<T> writer;
+        public final RecordWriter<T> writer;
+        private final long baseSnapshotId;
         private long lastModifiedCommitIdentifier;
 
-        private WriterWithCommit(RecordWriter<T> writer) {
+        protected WriterContainer(RecordWriter<T> writer, Long baseSnapshotId) {
             this.writer = writer;
+            this.baseSnapshotId =
+                    baseSnapshotId == null ? Snapshot.FIRST_SNAPSHOT_ID - 1 : baseSnapshotId;
             this.lastModifiedCommitIdentifier = Long.MIN_VALUE;
         }
     }
diff --git a/flink-table-store-core/src/main/java/org/apache/flink/table/store/file/operation/AppendOnlyFileStoreWrite.java b/flink-table-store-core/src/main/java/org/apache/flink/table/store/file/operation/AppendOnlyFileStoreWrite.java
index bb7921c4..261b7c3f 100644
--- a/flink-table-store-core/src/main/java/org/apache/flink/table/store/file/operation/AppendOnlyFileStoreWrite.java
+++ b/flink-table-store-core/src/main/java/org/apache/flink/table/store/file/operation/AppendOnlyFileStoreWrite.java
@@ -81,16 +81,25 @@ public class AppendOnlyFileStoreWrite extends AbstractFileStoreWrite<RowData> {
     }
 
     @Override
-    public RecordWriter<RowData> createWriter(
+    public WriterContainer<RowData> createWriterContainer(
             BinaryRowData partition, int bucket, ExecutorService compactExecutor) {
-        return createWriter(
-                partition, bucket, scanExistingFileMetas(partition, bucket), compactExecutor);
+        Long latestSnapshotId = snapshotManager.latestSnapshotId();
+        RecordWriter<RowData> writer =
+                createWriter(
+                        partition,
+                        bucket,
+                        scanExistingFileMetas(latestSnapshotId, partition, bucket),
+                        compactExecutor);
+        return new WriterContainer<>(writer, latestSnapshotId);
     }
 
     @Override
-    public RecordWriter<RowData> createEmptyWriter(
+    public WriterContainer<RowData> createEmptyWriterContainer(
             BinaryRowData partition, int bucket, ExecutorService compactExecutor) {
-        return createWriter(partition, bucket, Collections.emptyList(), compactExecutor);
+        Long latestSnapshotId = snapshotManager.latestSnapshotId();
+        RecordWriter<RowData> writer =
+                createWriter(partition, bucket, Collections.emptyList(), compactExecutor);
+        return new WriterContainer<>(writer, latestSnapshotId);
     }
 
     private RecordWriter<RowData> createWriter(
@@ -140,7 +149,13 @@ public class AppendOnlyFileStoreWrite extends AbstractFileStoreWrite<RowData> {
                             new LongCounter(toCompact.get(0).minSequenceNumber()));
             rewriter.write(
                     new RecordReaderIterator<>(
-                            read.createReader(new DataSplit(partition, bucket, toCompact, false))));
+                            read.createReader(
+                                    new DataSplit(
+                                            0L /* unused */,
+                                            partition,
+                                            bucket,
+                                            toCompact,
+                                            false))));
             rewriter.close();
             return rewriter.result();
         };
diff --git a/flink-table-store-core/src/main/java/org/apache/flink/table/store/file/operation/FileStoreWrite.java b/flink-table-store-core/src/main/java/org/apache/flink/table/store/file/operation/FileStoreWrite.java
index 919a1eaf..87105b2c 100644
--- a/flink-table-store-core/src/main/java/org/apache/flink/table/store/file/operation/FileStoreWrite.java
+++ b/flink-table-store-core/src/main/java/org/apache/flink/table/store/file/operation/FileStoreWrite.java
@@ -21,6 +21,7 @@ package org.apache.flink.table.store.file.operation;
 import org.apache.flink.runtime.io.disk.iomanager.IOManager;
 import org.apache.flink.table.data.binary.BinaryRowData;
 import org.apache.flink.table.store.file.FileStore;
+import org.apache.flink.table.store.file.io.DataFileMeta;
 import org.apache.flink.table.store.file.utils.RecordWriter;
 import org.apache.flink.table.store.table.sink.FileCommittable;
 import org.apache.flink.table.store.table.sink.SinkRecord;
@@ -65,6 +66,20 @@ public interface FileStoreWrite<T> {
      */
     void compact(BinaryRowData partition, int bucket, boolean fullCompaction) throws Exception;
 
+    /**
+     * Notify that some new files are created at given snapshot in given bucket.
+     *
+     * <p>Most probably, these files are created by another job. Currently this method is only used
+     * by the dedicated compact job to see files created by writer jobs.
+     *
+     * @param snapshotId the snapshot id where new files are created
+     * @param partition the partition where new files are created
+     * @param bucket the bucket where new files are created
+     * @param files the new files themselves
+     */
+    void notifyNewFiles(
+            long snapshotId, BinaryRowData partition, int bucket, List<DataFileMeta> files);
+
     /**
      * Prepare commit in the write.
      *
diff --git a/flink-table-store-core/src/main/java/org/apache/flink/table/store/file/operation/KeyValueFileStoreWrite.java b/flink-table-store-core/src/main/java/org/apache/flink/table/store/file/operation/KeyValueFileStoreWrite.java
index ec6f8453..2a01106b 100644
--- a/flink-table-store-core/src/main/java/org/apache/flink/table/store/file/operation/KeyValueFileStoreWrite.java
+++ b/flink-table-store-core/src/main/java/org/apache/flink/table/store/file/operation/KeyValueFileStoreWrite.java
@@ -107,16 +107,25 @@ public class KeyValueFileStoreWrite extends MemoryFileStoreWrite<KeyValue> {
     }
 
     @Override
-    public RecordWriter<KeyValue> createWriter(
+    public WriterContainer<KeyValue> createWriterContainer(
             BinaryRowData partition, int bucket, ExecutorService compactExecutor) {
-        return createMergeTreeWriter(
-                partition, bucket, scanExistingFileMetas(partition, bucket), compactExecutor);
+        Long latestSnapshotId = snapshotManager.latestSnapshotId();
+        RecordWriter<KeyValue> writer =
+                createMergeTreeWriter(
+                        partition,
+                        bucket,
+                        scanExistingFileMetas(latestSnapshotId, partition, bucket),
+                        compactExecutor);
+        return new WriterContainer<>(writer, latestSnapshotId);
     }
 
     @Override
-    public RecordWriter<KeyValue> createEmptyWriter(
+    public WriterContainer<KeyValue> createEmptyWriterContainer(
             BinaryRowData partition, int bucket, ExecutorService compactExecutor) {
-        return createMergeTreeWriter(partition, bucket, Collections.emptyList(), compactExecutor);
+        Long latestSnapshotId = snapshotManager.latestSnapshotId();
+        RecordWriter<KeyValue> writer =
+                createMergeTreeWriter(partition, bucket, Collections.emptyList(), compactExecutor);
+        return new WriterContainer<>(writer, latestSnapshotId);
     }
 
     private MergeTreeWriter createMergeTreeWriter(
@@ -130,7 +139,6 @@ public class KeyValueFileStoreWrite extends MemoryFileStoreWrite<KeyValue> {
                     partition,
                     bucket,
                     restoreFiles);
-            new RuntimeException().printStackTrace();
         }
 
         KeyValueFileWriterFactory writerFactory = writerFactoryBuilder.build(partition, bucket);
diff --git a/flink-table-store-core/src/main/java/org/apache/flink/table/store/file/operation/MemoryFileStoreWrite.java b/flink-table-store-core/src/main/java/org/apache/flink/table/store/file/operation/MemoryFileStoreWrite.java
index 77bd7d7b..65788b4f 100644
--- a/flink-table-store-core/src/main/java/org/apache/flink/table/store/file/operation/MemoryFileStoreWrite.java
+++ b/flink-table-store-core/src/main/java/org/apache/flink/table/store/file/operation/MemoryFileStoreWrite.java
@@ -51,7 +51,7 @@ public abstract class MemoryFileStoreWrite<T> extends AbstractFileStoreWrite<T>
     }
 
     private Iterator<MemoryOwner> memoryOwners() {
-        Iterator<Map<Integer, WriterWithCommit<T>>> iterator = writers.values().iterator();
+        Iterator<Map<Integer, WriterContainer<T>>> iterator = writers.values().iterator();
         return Iterators.concat(
                 new Iterator<Iterator<MemoryOwner>>() {
                     @Override
@@ -63,10 +63,10 @@ public abstract class MemoryFileStoreWrite<T> extends AbstractFileStoreWrite<T>
                     public Iterator<MemoryOwner> next() {
                         return Iterators.transform(
                                 iterator.next().values().iterator(),
-                                writerWithCommit ->
-                                        writerWithCommit == null
+                                writerContainer ->
+                                        writerContainer == null
                                                 ? null
-                                                : (MemoryOwner) (writerWithCommit.writer));
+                                                : (MemoryOwner) (writerContainer.writer));
                     }
                 });
     }
diff --git a/flink-table-store-core/src/main/java/org/apache/flink/table/store/file/utils/ObjectSerializer.java b/flink-table-store-core/src/main/java/org/apache/flink/table/store/file/utils/ObjectSerializer.java
index 19421d84..de9b0255 100644
--- a/flink-table-store-core/src/main/java/org/apache/flink/table/store/file/utils/ObjectSerializer.java
+++ b/flink-table-store-core/src/main/java/org/apache/flink/table/store/file/utils/ObjectSerializer.java
@@ -19,12 +19,16 @@
 package org.apache.flink.table.store.file.utils;
 
 import org.apache.flink.core.memory.DataInputView;
+import org.apache.flink.core.memory.DataInputViewStreamWrapper;
 import org.apache.flink.core.memory.DataOutputView;
+import org.apache.flink.core.memory.DataOutputViewStreamWrapper;
 import org.apache.flink.table.data.RowData;
 import org.apache.flink.table.runtime.typeutils.InternalSerializers;
 import org.apache.flink.table.runtime.typeutils.RowDataSerializer;
 import org.apache.flink.table.types.logical.RowType;
 
+import java.io.ByteArrayInputStream;
+import java.io.ByteArrayOutputStream;
 import java.io.IOException;
 import java.io.Serializable;
 import java.util.ArrayList;
@@ -80,6 +84,13 @@ public abstract class ObjectSerializer<T> implements Serializable {
         }
     }
 
+    public final byte[] serializeList(List<T> records) throws IOException {
+        ByteArrayOutputStream baos = new ByteArrayOutputStream();
+        DataOutputViewStreamWrapper view = new DataOutputViewStreamWrapper(baos);
+        serializeList(records, view);
+        return baos.toByteArray();
+    }
+
     /** De-serializes a record list from the given source input view. */
     public final List<T> deserializeList(DataInputView source) throws IOException {
         int size = source.readInt();
@@ -90,6 +101,12 @@ public abstract class ObjectSerializer<T> implements Serializable {
         return records;
     }
 
+    public final List<T> deserializeList(byte[] bytes) throws IOException {
+        ByteArrayInputStream bais = new ByteArrayInputStream(bytes);
+        DataInputViewStreamWrapper view = new DataInputViewStreamWrapper(bais);
+        return deserializeList(view);
+    }
+
     /** Convert a {@link T} to {@link RowData}. */
     public abstract RowData toRow(T record);
 
diff --git a/flink-table-store-core/src/main/java/org/apache/flink/table/store/file/utils/RecordWriter.java b/flink-table-store-core/src/main/java/org/apache/flink/table/store/file/utils/RecordWriter.java
index bedac510..3dc1594c 100644
--- a/flink-table-store-core/src/main/java/org/apache/flink/table/store/file/utils/RecordWriter.java
+++ b/flink-table-store-core/src/main/java/org/apache/flink/table/store/file/utils/RecordWriter.java
@@ -19,8 +19,11 @@
 package org.apache.flink.table.store.file.utils;
 
 import org.apache.flink.table.store.file.io.CompactIncrement;
+import org.apache.flink.table.store.file.io.DataFileMeta;
 import org.apache.flink.table.store.file.io.NewFilesIncrement;
 
+import java.util.List;
+
 /**
  * The {@code RecordWriter} is responsible for writing data and handling in-progress files used to
  * write yet un-staged data. The incremental files ready to commit is returned to the system by the
@@ -41,6 +44,13 @@ public interface RecordWriter<T> {
      */
     void compact(boolean fullCompaction) throws Exception;
 
+    /**
+     * Add files to the internal {@link org.apache.flink.table.store.file.compact.CompactManager}.
+     *
+     * @param files files to add
+     */
+    void addNewFiles(List<DataFileMeta> files);
+
     /**
      * Prepare for a commit.
      *
diff --git a/flink-table-store-core/src/main/java/org/apache/flink/table/store/table/sink/TableWrite.java b/flink-table-store-core/src/main/java/org/apache/flink/table/store/table/sink/TableWrite.java
index dce0eae4..13bfdd0c 100644
--- a/flink-table-store-core/src/main/java/org/apache/flink/table/store/table/sink/TableWrite.java
+++ b/flink-table-store-core/src/main/java/org/apache/flink/table/store/table/sink/TableWrite.java
@@ -18,9 +18,11 @@
 
 package org.apache.flink.table.store.table.sink;
 
+import org.apache.flink.annotation.Internal;
 import org.apache.flink.runtime.io.disk.iomanager.IOManager;
 import org.apache.flink.table.data.RowData;
 import org.apache.flink.table.data.binary.BinaryRowData;
+import org.apache.flink.table.store.file.io.DataFileMeta;
 
 import java.util.List;
 
@@ -41,6 +43,16 @@ public interface TableWrite extends AutoCloseable {
 
     void compact(BinaryRowData partition, int bucket, boolean fullCompaction) throws Exception;
 
+    /**
+     * Notify that some new files are created at given snapshot in given bucket.
+     *
+     * <p>Most probably, these files are created by another job. Currently this method is only used
+     * by the dedicated compact job to see files created by writer jobs.
+     */
+    @Internal
+    void notifyNewFiles(
+            long snapshotId, BinaryRowData partition, int bucket, List<DataFileMeta> files);
+
     List<FileCommittable> prepareCommit(boolean blocking, long commitIdentifier) throws Exception;
 
     void close() throws Exception;
diff --git a/flink-table-store-core/src/main/java/org/apache/flink/table/store/table/sink/TableWriteImpl.java b/flink-table-store-core/src/main/java/org/apache/flink/table/store/table/sink/TableWriteImpl.java
index fb3c9b20..e5557650 100644
--- a/flink-table-store-core/src/main/java/org/apache/flink/table/store/table/sink/TableWriteImpl.java
+++ b/flink-table-store-core/src/main/java/org/apache/flink/table/store/table/sink/TableWriteImpl.java
@@ -21,6 +21,7 @@ package org.apache.flink.table.store.table.sink;
 import org.apache.flink.runtime.io.disk.iomanager.IOManager;
 import org.apache.flink.table.data.RowData;
 import org.apache.flink.table.data.binary.BinaryRowData;
+import org.apache.flink.table.store.file.io.DataFileMeta;
 import org.apache.flink.table.store.file.operation.FileStoreWrite;
 
 import java.util.List;
@@ -75,6 +76,12 @@ public class TableWriteImpl<T> implements TableWrite {
         write.compact(partition, bucket, fullCompaction);
     }
 
+    @Override
+    public void notifyNewFiles(
+            long snapshotId, BinaryRowData partition, int bucket, List<DataFileMeta> files) {
+        write.notifyNewFiles(snapshotId, partition, bucket, files);
+    }
+
     @Override
     public List<FileCommittable> prepareCommit(boolean blocking, long commitIdentifier)
             throws Exception {
diff --git a/flink-table-store-core/src/main/java/org/apache/flink/table/store/table/source/AbstractDataTableScan.java b/flink-table-store-core/src/main/java/org/apache/flink/table/store/table/source/AbstractDataTableScan.java
index c545aeca..73f9fcc2 100644
--- a/flink-table-store-core/src/main/java/org/apache/flink/table/store/table/source/AbstractDataTableScan.java
+++ b/flink-table-store-core/src/main/java/org/apache/flink/table/store/table/source/AbstractDataTableScan.java
@@ -21,6 +21,7 @@ package org.apache.flink.table.store.table.source;
 import org.apache.flink.annotation.VisibleForTesting;
 import org.apache.flink.table.data.binary.BinaryRowData;
 import org.apache.flink.table.store.CoreOptions;
+import org.apache.flink.table.store.file.Snapshot;
 import org.apache.flink.table.store.file.io.DataFileMeta;
 import org.apache.flink.table.store.file.manifest.FileKind;
 import org.apache.flink.table.store.file.operation.FileStoreScan;
@@ -115,18 +116,20 @@ public abstract class AbstractDataTableScan implements DataTableScan {
     @Override
     public DataFilePlan plan() {
         FileStoreScan.Plan plan = scan.plan();
-        return new DataFilePlan(
-                plan.snapshotId(), generateSplits(plan.groupByPartFiles(plan.files(FileKind.ADD))));
-    }
-
-    private List<DataSplit> generateSplits(
-            Map<BinaryRowData, Map<Integer, List<DataFileMeta>>> groupedDataFiles) {
-        return generateSplits(
-                scanKind != ScanKind.ALL, splitGenerator(pathFactory), groupedDataFiles);
+        Long snapshotId = plan.snapshotId();
+
+        List<DataSplit> splits =
+                generateSplits(
+                        snapshotId == null ? Snapshot.FIRST_SNAPSHOT_ID - 1 : snapshotId,
+                        scanKind != ScanKind.ALL,
+                        splitGenerator(pathFactory),
+                        plan.groupByPartFiles(plan.files(FileKind.ADD)));
+        return new DataFilePlan(snapshotId, splits);
     }
 
     @VisibleForTesting
     public static List<DataSplit> generateSplits(
+            long snapshotId,
             boolean isIncremental,
             SplitGenerator splitGenerator,
             Map<BinaryRowData, Map<Integer, List<DataFileMeta>>> groupedDataFiles) {
@@ -139,10 +142,15 @@ public abstract class AbstractDataTableScan implements DataTableScan {
                 int bucket = bucketEntry.getKey();
                 if (isIncremental) {
                     // Don't split when incremental
-                    splits.add(new DataSplit(partition, bucket, bucketEntry.getValue(), true));
+                    splits.add(
+                            new DataSplit(
+                                    snapshotId, partition, bucket, bucketEntry.getValue(), true));
                 } else {
                     splitGenerator.split(bucketEntry.getValue()).stream()
-                            .map(files -> new DataSplit(partition, bucket, files, false))
+                            .map(
+                                    files ->
+                                            new DataSplit(
+                                                    snapshotId, partition, bucket, files, false))
                             .forEach(splits::add);
                 }
             }
diff --git a/flink-table-store-core/src/main/java/org/apache/flink/table/store/table/source/DataSplit.java b/flink-table-store-core/src/main/java/org/apache/flink/table/store/table/source/DataSplit.java
index 8dfeac91..daba177c 100644
--- a/flink-table-store-core/src/main/java/org/apache/flink/table/store/table/source/DataSplit.java
+++ b/flink-table-store-core/src/main/java/org/apache/flink/table/store/table/source/DataSplit.java
@@ -37,26 +37,40 @@ import java.util.Objects;
 /** Input splits. Needed by most batch computation engines. */
 public class DataSplit implements Split {
 
-    private static final long serialVersionUID = 1L;
+    private static final long serialVersionUID = 2L;
 
+    private long snapshotId;
     private BinaryRowData partition;
     private int bucket;
     private List<DataFileMeta> files;
     private boolean isIncremental;
 
     public DataSplit(
-            BinaryRowData partition, int bucket, List<DataFileMeta> files, boolean isIncremental) {
-        init(partition, bucket, files, isIncremental);
+            long snapshotId,
+            BinaryRowData partition,
+            int bucket,
+            List<DataFileMeta> files,
+            boolean isIncremental) {
+        init(snapshotId, partition, bucket, files, isIncremental);
     }
 
     private void init(
-            BinaryRowData partition, int bucket, List<DataFileMeta> files, boolean isIncremental) {
+            long snapshotId,
+            BinaryRowData partition,
+            int bucket,
+            List<DataFileMeta> files,
+            boolean isIncremental) {
+        this.snapshotId = snapshotId;
         this.partition = partition;
         this.bucket = bucket;
         this.files = files;
         this.isIncremental = isIncremental;
     }
 
+    public long snapshotId() {
+        return snapshotId;
+    }
+
     public BinaryRowData partition() {
         return partition;
     }
@@ -108,10 +122,11 @@ public class DataSplit implements Split {
 
     private void readObject(ObjectInputStream in) throws IOException, ClassNotFoundException {
         DataSplit split = DataSplit.deserialize(new DataInputViewStreamWrapper(in));
-        init(split.partition, split.bucket, split.files, split.isIncremental);
+        init(split.snapshotId, split.partition, split.bucket, split.files, split.isIncremental);
     }
 
     public void serialize(DataOutputView out) throws IOException {
+        out.writeLong(snapshotId);
         SerializationUtils.serializeBinaryRow(partition, out);
         out.writeInt(bucket);
         out.writeInt(files.size());
@@ -123,6 +138,7 @@ public class DataSplit implements Split {
     }
 
     public static DataSplit deserialize(DataInputView in) throws IOException {
+        long snapshotId = in.readLong();
         BinaryRowData partition = SerializationUtils.deserializeBinaryRow(in);
         int bucket = in.readInt();
         int fileNumber = in.readInt();
@@ -131,6 +147,6 @@ public class DataSplit implements Split {
         for (int i = 0; i < fileNumber; i++) {
             files.add(dataFileSer.deserialize(in));
         }
-        return new DataSplit(partition, bucket, files, in.readBoolean());
+        return new DataSplit(snapshotId, partition, bucket, files, in.readBoolean());
     }
 }
diff --git a/flink-table-store-core/src/main/java/org/apache/flink/table/store/table/source/snapshot/ContinuousCompactorFollowUpScanner.java b/flink-table-store-core/src/main/java/org/apache/flink/table/store/table/source/snapshot/ContinuousCompactorFollowUpScanner.java
index 74e0f141..b69b5c82 100644
--- a/flink-table-store-core/src/main/java/org/apache/flink/table/store/table/source/snapshot/ContinuousCompactorFollowUpScanner.java
+++ b/flink-table-store-core/src/main/java/org/apache/flink/table/store/table/source/snapshot/ContinuousCompactorFollowUpScanner.java
@@ -33,13 +33,12 @@ public class ContinuousCompactorFollowUpScanner implements FollowUpScanner {
 
     @Override
     public boolean shouldScanSnapshot(Snapshot snapshot) {
-        if (snapshot.commitKind() == Snapshot.CommitKind.APPEND
-                || snapshot.commitKind() == Snapshot.CommitKind.OVERWRITE) {
+        if (snapshot.commitKind() == Snapshot.CommitKind.APPEND) {
             return true;
         }
 
         LOG.debug(
-                "Next snapshot id {} is neither APPEND nor OVERWRITE, but is {}, check next one.",
+                "Next snapshot id {} is not APPEND, but is {}, check next one.",
                 snapshot.id(),
                 snapshot.commitKind());
         return false;
diff --git a/flink-table-store-core/src/main/java/org/apache/flink/table/store/table/system/BucketsTable.java b/flink-table-store-core/src/main/java/org/apache/flink/table/store/table/system/BucketsTable.java
index b94c3c32..8c222159 100644
--- a/flink-table-store-core/src/main/java/org/apache/flink/table/store/table/system/BucketsTable.java
+++ b/flink-table-store-core/src/main/java/org/apache/flink/table/store/table/system/BucketsTable.java
@@ -23,6 +23,8 @@ import org.apache.flink.table.data.GenericRowData;
 import org.apache.flink.table.data.RowData;
 import org.apache.flink.table.data.utils.JoinedRowData;
 import org.apache.flink.table.store.CoreOptions;
+import org.apache.flink.table.store.file.io.DataFileMeta;
+import org.apache.flink.table.store.file.io.DataFileMetaSerializer;
 import org.apache.flink.table.store.file.predicate.Predicate;
 import org.apache.flink.table.store.file.utils.IteratorRecordReader;
 import org.apache.flink.table.store.file.utils.RecordReader;
@@ -34,8 +36,10 @@ import org.apache.flink.table.store.table.source.DataSplit;
 import org.apache.flink.table.store.table.source.DataTableScan;
 import org.apache.flink.table.store.table.source.Split;
 import org.apache.flink.table.store.table.source.TableRead;
+import org.apache.flink.table.types.logical.BigIntType;
 import org.apache.flink.table.types.logical.IntType;
 import org.apache.flink.table.types.logical.RowType;
+import org.apache.flink.table.types.logical.VarBinaryType;
 
 import java.io.IOException;
 import java.util.ArrayList;
@@ -46,16 +50,18 @@ import java.util.Map;
 /**
  * A table to produce modified partitions and buckets for each snapshot.
  *
- * <p>Only used internally by stand-alone compact job sources.
+ * <p>Only used internally by dedicated compact job sources.
  */
 public class BucketsTable implements DataTable {
 
     private static final long serialVersionUID = 1L;
 
     private final FileStoreTable wrapped;
+    private final boolean isContinuous;
 
-    public BucketsTable(FileStoreTable wrapped) {
+    public BucketsTable(FileStoreTable wrapped, boolean isContinuous) {
         this.wrapped = wrapped;
+        this.isContinuous = isContinuous;
     }
 
     @Override
@@ -76,10 +82,17 @@ public class BucketsTable implements DataTable {
     @Override
     public RowType rowType() {
         RowType partitionType = wrapped.schema().logicalPartitionType();
-        return rowType(partitionType);
+
+        List<RowType.RowField> fields = new ArrayList<>();
+        fields.add(new RowType.RowField("_SNAPSHOT_ID", new BigIntType()));
+        fields.addAll(partitionType.getFields());
+        // same with ManifestEntry.schema
+        fields.add(new RowType.RowField("_BUCKET", new IntType()));
+        fields.add(new RowType.RowField("_FILES", new VarBinaryType()));
+        return new RowType(fields);
     }
 
-    public static RowType rowType(RowType partitionType) {
+    public static RowType partitionWithBucketRowType(RowType partitionType) {
         List<RowType.RowField> fields = new ArrayList<>(partitionType.getFields());
         // same with ManifestEntry.schema
         fields.add(new RowType.RowField("_BUCKET", new IntType()));
@@ -103,13 +116,16 @@ public class BucketsTable implements DataTable {
 
     @Override
     public Table copy(Map<String, String> dynamicOptions) {
-        return new BucketsTable(wrapped.copy(dynamicOptions));
+        return new BucketsTable(wrapped.copy(dynamicOptions), isContinuous);
     }
 
-    private static class BucketsRead implements TableRead {
+    private class BucketsRead implements TableRead {
+
+        private final DataFileMetaSerializer dataFileMetaSerializer = new DataFileMetaSerializer();
 
         @Override
         public TableRead withFilter(Predicate predicate) {
+            // filter is done by scan
             return this;
         }
 
@@ -125,9 +141,25 @@ public class BucketsTable implements DataTable {
             }
 
             DataSplit dataSplit = (DataSplit) split;
+
             RowData row =
-                    new JoinedRowData()
-                            .replace(dataSplit.partition(), GenericRowData.of(dataSplit.bucket()));
+                    new JoinedRowData(
+                            GenericRowData.of(dataSplit.snapshotId()), dataSplit.partition());
+            row = new JoinedRowData(row, GenericRowData.of(dataSplit.bucket()));
+
+            List<DataFileMeta> files = Collections.emptyList();
+            if (isContinuous) {
+                // Serialized files are only useful in streaming jobs.
+                // Batch compact jobs only run once, so they only need to know what buckets should
+                // be compacted and don't need to concern incremental new files.
+                files = dataSplit.files();
+            }
+            row =
+                    new JoinedRowData(
+                            row,
+                            GenericRowData.of(
+                                    (Object) dataFileMetaSerializer.serializeList(files)));
+
             return new IteratorRecordReader<>(Collections.singletonList(row).iterator());
         }
     }
diff --git a/flink-table-store-core/src/test/java/org/apache/flink/table/store/file/TestFileStore.java b/flink-table-store-core/src/test/java/org/apache/flink/table/store/file/TestFileStore.java
index d8d46e03..376befa8 100644
--- a/flink-table-store-core/src/test/java/org/apache/flink/table/store/file/TestFileStore.java
+++ b/flink-table-store-core/src/test/java/org/apache/flink/table/store/file/TestFileStore.java
@@ -193,10 +193,12 @@ public class TestFileStore extends KeyValueFileStore {
                                     ExecutorService service = Executors.newSingleThreadExecutor();
                                     RecordWriter<KeyValue> writer =
                                             emptyWriter
-                                                    ? write.createEmptyWriter(
-                                                            partition, bucket, service)
-                                                    : write.createWriter(
-                                                            partition, bucket, service);
+                                                    ? write.createEmptyWriterContainer(
+                                                                    partition, bucket, service)
+                                                            .writer
+                                                    : write.createWriterContainer(
+                                                                    partition, bucket, service)
+                                                            .writer;
                                     ((MemoryOwner) writer)
                                             .setMemoryPool(
                                                     new HeapMemorySegmentPool(
@@ -312,6 +314,7 @@ public class TestFileStore extends KeyValueFileStore {
                         new RecordReaderIterator<>(
                                 read.createReader(
                                         new DataSplit(
+                                                0L /* unused */,
                                                 entryWithPartition.getKey(),
                                                 entryWithBucket.getKey(),
                                                 entryWithBucket.getValue(),
diff --git a/flink-table-store-core/src/test/java/org/apache/flink/table/store/file/operation/KeyValueFileStoreReadTest.java b/flink-table-store-core/src/test/java/org/apache/flink/table/store/file/operation/KeyValueFileStoreReadTest.java
index a191c20d..7f4ecf93 100644
--- a/flink-table-store-core/src/test/java/org/apache/flink/table/store/file/operation/KeyValueFileStoreReadTest.java
+++ b/flink-table-store-core/src/test/java/org/apache/flink/table/store/file/operation/KeyValueFileStoreReadTest.java
@@ -223,9 +223,9 @@ public class KeyValueFileStoreReadTest {
             throws Exception {
         store.commitData(data, partitionCalculator, kv -> 0);
         FileStoreScan scan = store.newScan();
+        Long snapshotId = store.snapshotManager().latestSnapshotId();
         Map<BinaryRowData, List<ManifestEntry>> filesGroupedByPartition =
-                scan.withSnapshot(store.snapshotManager().latestSnapshotId()).plan().files()
-                        .stream()
+                scan.withSnapshot(snapshotId).plan().files().stream()
                         .collect(Collectors.groupingBy(ManifestEntry::partition));
         KeyValueFileStoreRead read = store.newRead();
         if (keyProjection != null) {
@@ -241,6 +241,7 @@ public class KeyValueFileStoreReadTest {
             RecordReader<KeyValue> reader =
                     read.createReader(
                             new DataSplit(
+                                    snapshotId,
                                     entry.getKey(),
                                     0,
                                     entry.getValue().stream()
diff --git a/flink-table-store-core/src/test/java/org/apache/flink/table/store/file/operation/TestCommitThread.java b/flink-table-store-core/src/test/java/org/apache/flink/table/store/file/operation/TestCommitThread.java
index a3018088..03b9bc88 100644
--- a/flink-table-store-core/src/test/java/org/apache/flink/table/store/file/operation/TestCommitThread.java
+++ b/flink-table-store-core/src/test/java/org/apache/flink/table/store/file/operation/TestCommitThread.java
@@ -279,8 +279,10 @@ public class TestCommitThread extends Thread {
                         });
         MergeTreeWriter writer =
                 empty
-                        ? (MergeTreeWriter) write.createEmptyWriter(partition, 0, service)
-                        : (MergeTreeWriter) write.createWriter(partition, 0, service);
+                        ? (MergeTreeWriter)
+                                write.createEmptyWriterContainer(partition, 0, service).writer
+                        : (MergeTreeWriter)
+                                write.createWriterContainer(partition, 0, service).writer;
         writer.setMemoryPool(
                 new HeapMemorySegmentPool(
                         WRITE_BUFFER_SIZE.getBytes(), (int) PAGE_SIZE.getBytes()));
diff --git a/flink-table-store-core/src/test/java/org/apache/flink/table/store/table/source/SplitTest.java b/flink-table-store-core/src/test/java/org/apache/flink/table/store/table/source/SplitTest.java
index d2cbb40f..5c1b815c 100644
--- a/flink-table-store-core/src/test/java/org/apache/flink/table/store/table/source/SplitTest.java
+++ b/flink-table-store-core/src/test/java/org/apache/flink/table/store/table/source/SplitTest.java
@@ -44,7 +44,13 @@ public class SplitTest {
         for (int i = 0; i < ThreadLocalRandom.current().nextInt(10); i++) {
             files.add(gen.next().meta);
         }
-        DataSplit split = new DataSplit(data.partition, data.bucket, files, false);
+        DataSplit split =
+                new DataSplit(
+                        ThreadLocalRandom.current().nextLong(100),
+                        data.partition,
+                        data.bucket,
+                        files,
+                        false);
 
         ByteArrayOutputStream out = new ByteArrayOutputStream();
         split.serialize(new DataOutputViewStreamWrapper(out));
diff --git a/flink-table-store-core/src/test/java/org/apache/flink/table/store/table/source/snapshot/ContinuousCompactorFollowUpScannerTest.java b/flink-table-store-core/src/test/java/org/apache/flink/table/store/table/source/snapshot/ContinuousCompactorFollowUpScannerTest.java
index 66a21e3b..c23975c6 100644
--- a/flink-table-store-core/src/test/java/org/apache/flink/table/store/table/source/snapshot/ContinuousCompactorFollowUpScannerTest.java
+++ b/flink-table-store-core/src/test/java/org/apache/flink/table/store/table/source/snapshot/ContinuousCompactorFollowUpScannerTest.java
@@ -20,6 +20,7 @@ package org.apache.flink.table.store.table.source.snapshot;
 
 import org.apache.flink.table.data.RowData;
 import org.apache.flink.table.store.file.Snapshot;
+import org.apache.flink.table.store.file.io.DataFileMetaSerializer;
 import org.apache.flink.table.store.file.utils.SnapshotManager;
 import org.apache.flink.table.store.table.FileStoreTable;
 import org.apache.flink.table.store.table.sink.TableCommit;
@@ -31,6 +32,8 @@ import org.apache.flink.types.RowKind;
 
 import org.junit.jupiter.api.Test;
 
+import java.io.IOException;
+import java.io.UncheckedIOException;
 import java.util.Arrays;
 import java.util.Collections;
 import java.util.HashMap;
@@ -41,6 +44,8 @@ import static org.assertj.core.api.Assertions.assertThat;
 /** Tests for {@link ContinuousCompactorFollowUpScanner}. */
 public class ContinuousCompactorFollowUpScannerTest extends SnapshotEnumeratorTestBase {
 
+    private final DataFileMetaSerializer dataFileMetaSerializer = new DataFileMetaSerializer();
+
     @Test
     public void testGetPlan() throws Exception {
         FileStoreTable table = createFileStoreTable();
@@ -74,7 +79,7 @@ public class ContinuousCompactorFollowUpScannerTest extends SnapshotEnumeratorTe
 
         assertThat(snapshotManager.latestSnapshotId()).isEqualTo(4);
 
-        BucketsTable bucketsTable = new BucketsTable(table);
+        BucketsTable bucketsTable = new BucketsTable(table, true);
         DataTableScan scan = bucketsTable.newScan();
         TableRead read = bucketsTable.newRead();
         ContinuousCompactorFollowUpScanner scanner = new ContinuousCompactorFollowUpScanner();
@@ -85,7 +90,7 @@ public class ContinuousCompactorFollowUpScannerTest extends SnapshotEnumeratorTe
         DataTableScan.DataFilePlan plan = scanner.getPlan(1, scan);
         assertThat(plan.snapshotId).isEqualTo(1);
         assertThat(getResult(read, plan.splits()))
-                .hasSameElementsAs(Arrays.asList("+I 1|0", "+I 2|0"));
+                .hasSameElementsAs(Arrays.asList("+I 1|1|0|1", "+I 1|2|0|1"));
 
         snapshot = snapshotManager.snapshot(2);
         assertThat(snapshot.commitKind()).isEqualTo(Snapshot.CommitKind.APPEND);
@@ -93,7 +98,7 @@ public class ContinuousCompactorFollowUpScannerTest extends SnapshotEnumeratorTe
         plan = scanner.getPlan(2, scan);
         assertThat(plan.snapshotId).isEqualTo(2);
         assertThat(getResult(read, plan.splits()))
-                .hasSameElementsAs(Collections.singletonList("+I 2|0"));
+                .hasSameElementsAs(Collections.singletonList("+I 2|2|0|1"));
 
         snapshot = snapshotManager.snapshot(3);
         assertThat(snapshot.commitKind()).isEqualTo(Snapshot.CommitKind.COMPACT);
@@ -101,17 +106,24 @@ public class ContinuousCompactorFollowUpScannerTest extends SnapshotEnumeratorTe
 
         snapshot = snapshotManager.snapshot(4);
         assertThat(snapshot.commitKind()).isEqualTo(Snapshot.CommitKind.OVERWRITE);
-        assertThat(scanner.shouldScanSnapshot(snapshot)).isTrue();
-        plan = scanner.getPlan(4, scan);
-        assertThat(plan.snapshotId).isEqualTo(4);
-        assertThat(getResult(read, plan.splits()))
-                .hasSameElementsAs(Collections.singletonList("+I 1|0"));
+        assertThat(scanner.shouldScanSnapshot(snapshot)).isFalse();
     }
 
     @Override
     protected String rowDataToString(RowData rowData) {
+        int numFiles;
+        try {
+            numFiles = dataFileMetaSerializer.deserializeList(rowData.getBinary(3)).size();
+        } catch (IOException e) {
+            throw new UncheckedIOException(e);
+        }
+
         return String.format(
-                "%s %d|%d",
-                rowData.getRowKind().shortString(), rowData.getInt(0), rowData.getInt(1));
+                "%s %d|%d|%d|%d",
+                rowData.getRowKind().shortString(),
+                rowData.getLong(0),
+                rowData.getInt(1),
+                rowData.getInt(2),
+                numFiles);
     }
 }
diff --git a/flink-table-store-core/src/test/java/org/apache/flink/table/store/table/source/snapshot/ContinuousCompactorStartingScannerTest.java b/flink-table-store-core/src/test/java/org/apache/flink/table/store/table/source/snapshot/ContinuousCompactorStartingScannerTest.java
index ef89f24e..6d8d87a3 100644
--- a/flink-table-store-core/src/test/java/org/apache/flink/table/store/table/source/snapshot/ContinuousCompactorStartingScannerTest.java
+++ b/flink-table-store-core/src/test/java/org/apache/flink/table/store/table/source/snapshot/ContinuousCompactorStartingScannerTest.java
@@ -62,7 +62,7 @@ public class ContinuousCompactorStartingScannerTest extends SnapshotEnumeratorTe
 
         assertThat(snapshotManager.latestSnapshotId()).isEqualTo(5);
 
-        BucketsTable bucketsTable = new BucketsTable(table);
+        BucketsTable bucketsTable = new BucketsTable(table, true);
         ContinuousCompactorStartingScanner scanner = new ContinuousCompactorStartingScanner();
         DataTableScan.DataFilePlan plan = scanner.getPlan(snapshotManager, bucketsTable.newScan());
         assertThat(plan.snapshotId).isEqualTo(3);
@@ -76,7 +76,7 @@ public class ContinuousCompactorStartingScannerTest extends SnapshotEnumeratorTe
     public void testNoSnapshot() throws Exception {
         FileStoreTable table = createFileStoreTable();
         SnapshotManager snapshotManager = table.snapshotManager();
-        BucketsTable bucketsTable = new BucketsTable(table);
+        BucketsTable bucketsTable = new BucketsTable(table, true);
         ContinuousCompactorStartingScanner scanner = new ContinuousCompactorStartingScanner();
         assertThat(scanner.getPlan(snapshotManager, bucketsTable.newScan())).isNull();
     }
diff --git a/flink-table-store-core/src/test/resources/log4j2-test.properties b/flink-table-store-core/src/test/resources/log4j2-test.properties
index 835c2ec9..1b3980d1 100644
--- a/flink-table-store-core/src/test/resources/log4j2-test.properties
+++ b/flink-table-store-core/src/test/resources/log4j2-test.properties
@@ -25,4 +25,4 @@ appender.testlogger.name = TestLogger
 appender.testlogger.type = CONSOLE
 appender.testlogger.target = SYSTEM_ERR
 appender.testlogger.layout.type = PatternLayout
-appender.testlogger.layout.pattern = %-4r [%t] %-5p %c %x - %m%n
+appender.testlogger.layout.pattern = %-4r [%tid %t] %-5p %c %x - %m%n
diff --git a/flink-table-store-e2e-tests/src/test/java/org/apache/flink/table/store/tests/FlinkActionsE2eTest.java b/flink-table-store-e2e-tests/src/test/java/org/apache/flink/table/store/tests/FlinkActionsE2eTest.java
index ada923b2..3d682efa 100644
--- a/flink-table-store-e2e-tests/src/test/java/org/apache/flink/table/store/tests/FlinkActionsE2eTest.java
+++ b/flink-table-store-e2e-tests/src/test/java/org/apache/flink/table/store/tests/FlinkActionsE2eTest.java
@@ -89,7 +89,7 @@ public class FlinkActionsE2eTest extends E2eTestBase {
                 tableDdl,
                 testDataSourceDdl);
 
-        // run stand-alone compact job
+        // run dedicated compact job
         Container.ExecResult execResult =
                 jobManager.execInContainer(
                         "bin/flink",
diff --git a/flink-table-store-e2e-tests/src/test/resources/log4j2-test.properties b/flink-table-store-e2e-tests/src/test/resources/log4j2-test.properties
index 835c2ec9..1b3980d1 100644
--- a/flink-table-store-e2e-tests/src/test/resources/log4j2-test.properties
+++ b/flink-table-store-e2e-tests/src/test/resources/log4j2-test.properties
@@ -25,4 +25,4 @@ appender.testlogger.name = TestLogger
 appender.testlogger.type = CONSOLE
 appender.testlogger.target = SYSTEM_ERR
 appender.testlogger.layout.type = PatternLayout
-appender.testlogger.layout.pattern = %-4r [%t] %-5p %c %x - %m%n
+appender.testlogger.layout.pattern = %-4r [%tid %t] %-5p %c %x - %m%n
diff --git a/flink-table-store-hive/flink-table-store-hive-connector/src/test/java/org/apache/flink/table/store/mapred/TableStoreInputSplitTest.java b/flink-table-store-hive/flink-table-store-hive-connector/src/test/java/org/apache/flink/table/store/mapred/TableStoreInputSplitTest.java
index ef3c3042..7f12b093 100644
--- a/flink-table-store-hive/flink-table-store-hive-connector/src/test/java/org/apache/flink/table/store/mapred/TableStoreInputSplitTest.java
+++ b/flink-table-store-hive/flink-table-store-hive-connector/src/test/java/org/apache/flink/table/store/mapred/TableStoreInputSplitTest.java
@@ -55,6 +55,7 @@ public class TableStoreInputSplitTest {
                 new TableStoreInputSplit(
                         tempDir.toString(),
                         new DataSplit(
+                                ThreadLocalRandom.current().nextLong(100),
                                 wantedPartition,
                                 0,
                                 generated.stream()
diff --git a/flink-table-store-hive/flink-table-store-hive-connector/src/test/resources/log4j2-test.properties b/flink-table-store-hive/flink-table-store-hive-connector/src/test/resources/log4j2-test.properties
index 835c2ec9..1b3980d1 100644
--- a/flink-table-store-hive/flink-table-store-hive-connector/src/test/resources/log4j2-test.properties
+++ b/flink-table-store-hive/flink-table-store-hive-connector/src/test/resources/log4j2-test.properties
@@ -25,4 +25,4 @@ appender.testlogger.name = TestLogger
 appender.testlogger.type = CONSOLE
 appender.testlogger.target = SYSTEM_ERR
 appender.testlogger.layout.type = PatternLayout
-appender.testlogger.layout.pattern = %-4r [%t] %-5p %c %x - %m%n
+appender.testlogger.layout.pattern = %-4r [%tid %t] %-5p %c %x - %m%n
diff --git a/flink-table-store-kafka/src/test/resources/log4j2-test.properties b/flink-table-store-kafka/src/test/resources/log4j2-test.properties
index 863665cf..6f324f58 100644
--- a/flink-table-store-kafka/src/test/resources/log4j2-test.properties
+++ b/flink-table-store-kafka/src/test/resources/log4j2-test.properties
@@ -25,7 +25,7 @@ appender.testlogger.name = TestLogger
 appender.testlogger.type = CONSOLE
 appender.testlogger.target = SYSTEM_ERR
 appender.testlogger.layout.type = PatternLayout
-appender.testlogger.layout.pattern = %-4r [%t] %-5p %c %x - %m%n
+appender.testlogger.layout.pattern = %-4r [%tid %t] %-5p %c %x - %m%n
 
 logger.kafka.name = kafka
 logger.kafka.level = OFF
diff --git a/flink-table-store-spark/src/test/resources/log4j2-test.properties b/flink-table-store-spark/src/test/resources/log4j2-test.properties
index 863665cf..6f324f58 100644
--- a/flink-table-store-spark/src/test/resources/log4j2-test.properties
+++ b/flink-table-store-spark/src/test/resources/log4j2-test.properties
@@ -25,7 +25,7 @@ appender.testlogger.name = TestLogger
 appender.testlogger.type = CONSOLE
 appender.testlogger.target = SYSTEM_ERR
 appender.testlogger.layout.type = PatternLayout
-appender.testlogger.layout.pattern = %-4r [%t] %-5p %c %x - %m%n
+appender.testlogger.layout.pattern = %-4r [%tid %t] %-5p %c %x - %m%n
 
 logger.kafka.name = kafka
 logger.kafka.level = OFF
diff --git a/flink-table-store-spark2/src/test/resources/log4j2-test.properties b/flink-table-store-spark2/src/test/resources/log4j2-test.properties
index 863665cf..6f324f58 100644
--- a/flink-table-store-spark2/src/test/resources/log4j2-test.properties
+++ b/flink-table-store-spark2/src/test/resources/log4j2-test.properties
@@ -25,7 +25,7 @@ appender.testlogger.name = TestLogger
 appender.testlogger.type = CONSOLE
 appender.testlogger.target = SYSTEM_ERR
 appender.testlogger.layout.type = PatternLayout
-appender.testlogger.layout.pattern = %-4r [%t] %-5p %c %x - %m%n
+appender.testlogger.layout.pattern = %-4r [%tid %t] %-5p %c %x - %m%n
 
 logger.kafka.name = kafka
 logger.kafka.level = OFF