You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@paimon.apache.org by lz...@apache.org on 2023/03/17 03:31:39 UTC

[incubator-paimon] branch master updated: [FLINK-31430] Support migrating states between different instances of TableWriteImpl and AbstractFileStoreWrite (#605)

This is an automated email from the ASF dual-hosted git repository.

lzljs3620320 pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/incubator-paimon.git


The following commit(s) were added to refs/heads/master by this push:
     new 1af16a5a [FLINK-31430] Support migrating states between different instances of TableWriteImpl and AbstractFileStoreWrite (#605)
1af16a5a is described below

commit 1af16a5a1cb27b6b72bbcef9e5862fda84d8c996
Author: tsreaper <ts...@gmail.com>
AuthorDate: Fri Mar 17 11:31:34 2023 +0800

    [FLINK-31430] Support migrating states between different instances of TableWriteImpl and AbstractFileStoreWrite (#605)
---
 .../file/append/AppendOnlyCompactManager.java      |   5 +
 .../table/store/file/append/AppendOnlyWriter.java  |  29 +--
 .../table/store/file/compact/CompactManager.java   |   3 +
 .../store/file/compact/NoopCompactManager.java     |   7 +
 .../store/file/mergetree/MergeTreeWriter.java      |  33 ++--
 .../mergetree/compact/MergeTreeCompactManager.java |   5 +
 .../file/operation/AbstractFileStoreWrite.java     | 210 ++++++++++++++++-----
 .../file/operation/AppendOnlyFileStoreWrite.java   |  30 +--
 .../file/operation/KeyValueFileStoreWrite.java     |  32 +---
 .../table/store/file/utils/CommitIncrement.java    |  47 +++++
 .../flink/table/store/file/utils/RecordWriter.java |  13 +-
 .../utils/Restorable.java}                         |  23 +--
 .../table/store/table/sink/TableWriteImpl.java     |  19 +-
 .../table/store/table/source/StreamTableScan.java  |  13 +-
 .../flink/table/store/file/TestFileStore.java      |  17 +-
 .../store/file/append/AppendOnlyWriterTest.java    |  14 +-
 .../store/file/format/FileFormatSuffixTest.java    |   7 +-
 .../table/store/file/mergetree/MergeTreeTest.java  |  10 +-
 .../store/file/operation/TestCommitThread.java     |  23 +--
 .../table/store/table/sink/TableWriteTest.java     | 187 ++++++++++++++++++
 .../source/FileStoreSourceReaderTest.java          |   2 +-
 .../source/FileStoreSourceSplitReaderTest.java     |  33 +---
 .../source/TestChangelogDataReadWrite.java         |  10 +-
 23 files changed, 538 insertions(+), 234 deletions(-)

diff --git a/flink-table-store-core/src/main/java/org/apache/flink/table/store/file/append/AppendOnlyCompactManager.java b/flink-table-store-core/src/main/java/org/apache/flink/table/store/file/append/AppendOnlyCompactManager.java
index 132995e0..c7519a54 100644
--- a/flink-table-store-core/src/main/java/org/apache/flink/table/store/file/append/AppendOnlyCompactManager.java
+++ b/flink-table-store-core/src/main/java/org/apache/flink/table/store/file/append/AppendOnlyCompactManager.java
@@ -116,6 +116,11 @@ public class AppendOnlyCompactManager extends CompactFutureManager {
         toCompact.add(file);
     }
 
+    @Override
+    public List<DataFileMeta> allFiles() {
+        return toCompact;
+    }
+
     /** Finish current task, and update result files to {@link #toCompact}. */
     @Override
     public Optional<CompactResult> getCompactionResult(boolean blocking)
diff --git a/flink-table-store-core/src/main/java/org/apache/flink/table/store/file/append/AppendOnlyWriter.java b/flink-table-store-core/src/main/java/org/apache/flink/table/store/file/append/AppendOnlyWriter.java
index 6ec7883c..91005835 100644
--- a/flink-table-store-core/src/main/java/org/apache/flink/table/store/file/append/AppendOnlyWriter.java
+++ b/flink-table-store-core/src/main/java/org/apache/flink/table/store/file/append/AppendOnlyWriter.java
@@ -26,6 +26,7 @@ import org.apache.flink.table.store.file.io.DataFileMeta;
 import org.apache.flink.table.store.file.io.DataFilePathFactory;
 import org.apache.flink.table.store.file.io.NewFilesIncrement;
 import org.apache.flink.table.store.file.io.RowDataRollingFileWriter;
+import org.apache.flink.table.store.file.utils.CommitIncrement;
 import org.apache.flink.table.store.file.utils.RecordWriter;
 import org.apache.flink.table.store.format.FileFormat;
 import org.apache.flink.table.store.fs.FileIO;
@@ -34,6 +35,8 @@ import org.apache.flink.table.store.types.RowType;
 import org.apache.flink.table.store.utils.LongCounter;
 import org.apache.flink.table.store.utils.Preconditions;
 
+import javax.annotation.Nullable;
+
 import java.util.ArrayList;
 import java.util.Collections;
 import java.util.List;
@@ -71,7 +74,8 @@ public class AppendOnlyWriter implements RecordWriter<InternalRow> {
             long maxSequenceNumber,
             CompactManager compactManager,
             boolean forceCompact,
-            DataFilePathFactory pathFactory) {
+            DataFilePathFactory pathFactory,
+            @Nullable CommitIncrement increment) {
         this.fileIO = fileIO;
         this.schemaId = schemaId;
         this.fileFormat = fileFormat;
@@ -86,6 +90,12 @@ public class AppendOnlyWriter implements RecordWriter<InternalRow> {
         this.seqNumCounter = new LongCounter(maxSequenceNumber + 1);
 
         this.writer = createRollingRowWriter();
+
+        if (increment != null) {
+            newFiles.addAll(increment.newFilesIncrement().newFiles());
+            compactBefore.addAll(increment.compactIncrement().compactBefore());
+            compactAfter.addAll(increment.compactIncrement().compactAfter());
+        }
     }
 
     @Override
@@ -107,6 +117,11 @@ public class AppendOnlyWriter implements RecordWriter<InternalRow> {
         files.forEach(compactManager::addNewFile);
     }
 
+    @Override
+    public List<DataFileMeta> dataFiles() {
+        return compactManager.allFiles();
+    }
+
     @Override
     public CommitIncrement prepareCommit(boolean waitCompaction) throws Exception {
         flushWriter(false, false);
@@ -186,16 +201,6 @@ public class AppendOnlyWriter implements RecordWriter<InternalRow> {
         compactBefore.clear();
         compactAfter.clear();
 
-        return new CommitIncrement() {
-            @Override
-            public NewFilesIncrement newFilesIncrement() {
-                return newFilesIncrement;
-            }
-
-            @Override
-            public CompactIncrement compactIncrement() {
-                return compactIncrement;
-            }
-        };
+        return new CommitIncrement(newFilesIncrement, compactIncrement);
     }
 }
diff --git a/flink-table-store-core/src/main/java/org/apache/flink/table/store/file/compact/CompactManager.java b/flink-table-store-core/src/main/java/org/apache/flink/table/store/file/compact/CompactManager.java
index 982405d4..24d0dfdb 100644
--- a/flink-table-store-core/src/main/java/org/apache/flink/table/store/file/compact/CompactManager.java
+++ b/flink-table-store-core/src/main/java/org/apache/flink/table/store/file/compact/CompactManager.java
@@ -21,6 +21,7 @@ package org.apache.flink.table.store.file.compact;
 import org.apache.flink.table.store.file.io.DataFileMeta;
 
 import java.io.Closeable;
+import java.util.List;
 import java.util.Optional;
 import java.util.concurrent.ExecutionException;
 
@@ -33,6 +34,8 @@ public interface CompactManager extends Closeable {
     /** Add a new file. */
     void addNewFile(DataFileMeta file);
 
+    List<DataFileMeta> allFiles();
+
     /**
      * Trigger a new compaction task.
      *
diff --git a/flink-table-store-core/src/main/java/org/apache/flink/table/store/file/compact/NoopCompactManager.java b/flink-table-store-core/src/main/java/org/apache/flink/table/store/file/compact/NoopCompactManager.java
index 3aff478d..071dbd76 100644
--- a/flink-table-store-core/src/main/java/org/apache/flink/table/store/file/compact/NoopCompactManager.java
+++ b/flink-table-store-core/src/main/java/org/apache/flink/table/store/file/compact/NoopCompactManager.java
@@ -23,6 +23,8 @@ import org.apache.flink.table.store.file.io.DataFileMeta;
 import org.apache.flink.table.store.utils.Preconditions;
 
 import java.io.IOException;
+import java.util.Collections;
+import java.util.List;
 import java.util.Optional;
 import java.util.concurrent.ExecutionException;
 
@@ -39,6 +41,11 @@ public class NoopCompactManager implements CompactManager {
     @Override
     public void addNewFile(DataFileMeta file) {}
 
+    @Override
+    public List<DataFileMeta> allFiles() {
+        return Collections.emptyList();
+    }
+
     @Override
     public void triggerCompaction(boolean fullCompaction) {
         Preconditions.checkArgument(
diff --git a/flink-table-store-core/src/main/java/org/apache/flink/table/store/file/mergetree/MergeTreeWriter.java b/flink-table-store-core/src/main/java/org/apache/flink/table/store/file/mergetree/MergeTreeWriter.java
index 28a1f742..4111e7f8 100644
--- a/flink-table-store-core/src/main/java/org/apache/flink/table/store/file/mergetree/MergeTreeWriter.java
+++ b/flink-table-store-core/src/main/java/org/apache/flink/table/store/file/mergetree/MergeTreeWriter.java
@@ -33,9 +33,12 @@ import org.apache.flink.table.store.file.io.RollingFileWriter;
 import org.apache.flink.table.store.file.memory.MemoryOwner;
 import org.apache.flink.table.store.file.memory.MemorySegmentPool;
 import org.apache.flink.table.store.file.mergetree.compact.MergeFunction;
+import org.apache.flink.table.store.file.utils.CommitIncrement;
 import org.apache.flink.table.store.file.utils.RecordWriter;
 import org.apache.flink.table.store.types.RowType;
 
+import javax.annotation.Nullable;
+
 import java.util.ArrayList;
 import java.util.Comparator;
 import java.util.LinkedHashMap;
@@ -80,7 +83,8 @@ public class MergeTreeWriter implements RecordWriter<KeyValue>, MemoryOwner {
             MergeFunction<KeyValue> mergeFunction,
             KeyValueFileWriterFactory writerFactory,
             boolean commitForceCompact,
-            ChangelogProducer changelogProducer) {
+            ChangelogProducer changelogProducer,
+            @Nullable CommitIncrement increment) {
         this.writeBufferSpillable = writeBufferSpillable;
         this.sortMaxFan = sortMaxFan;
         this.ioManager = ioManager;
@@ -99,6 +103,16 @@ public class MergeTreeWriter implements RecordWriter<KeyValue>, MemoryOwner {
         this.compactBefore = new LinkedHashMap<>();
         this.compactAfter = new LinkedHashSet<>();
         this.compactChangelog = new LinkedHashSet<>();
+        if (increment != null) {
+            newFiles.addAll(increment.newFilesIncrement().newFiles());
+            newFilesChangelog.addAll(increment.newFilesIncrement().changelogFiles());
+            increment
+                    .compactIncrement()
+                    .compactBefore()
+                    .forEach(f -> compactBefore.put(f.fileName(), f));
+            compactAfter.addAll(increment.compactIncrement().compactAfter());
+            compactChangelog.addAll(increment.compactIncrement().changelogFiles());
+        }
     }
 
     private long newSequenceNumber() {
@@ -148,6 +162,11 @@ public class MergeTreeWriter implements RecordWriter<KeyValue>, MemoryOwner {
         files.forEach(compactManager::addNewFile);
     }
 
+    @Override
+    public List<DataFileMeta> dataFiles() {
+        return compactManager.allFiles();
+    }
+
     @Override
     public long memoryOccupancy() {
         return writeBuffer.memoryOccupancy();
@@ -232,17 +251,7 @@ public class MergeTreeWriter implements RecordWriter<KeyValue>, MemoryOwner {
         compactAfter.clear();
         compactChangelog.clear();
 
-        return new CommitIncrement() {
-            @Override
-            public NewFilesIncrement newFilesIncrement() {
-                return newFilesIncrement;
-            }
-
-            @Override
-            public CompactIncrement compactIncrement() {
-                return compactIncrement;
-            }
-        };
+        return new CommitIncrement(newFilesIncrement, compactIncrement);
     }
 
     private void updateCompactResult(CompactResult result) {
diff --git a/flink-table-store-core/src/main/java/org/apache/flink/table/store/file/mergetree/compact/MergeTreeCompactManager.java b/flink-table-store-core/src/main/java/org/apache/flink/table/store/file/mergetree/compact/MergeTreeCompactManager.java
index 58c02c4c..9efc971b 100644
--- a/flink-table-store-core/src/main/java/org/apache/flink/table/store/file/mergetree/compact/MergeTreeCompactManager.java
+++ b/flink-table-store-core/src/main/java/org/apache/flink/table/store/file/mergetree/compact/MergeTreeCompactManager.java
@@ -79,6 +79,11 @@ public class MergeTreeCompactManager extends CompactFutureManager {
         levels.addLevel0File(file);
     }
 
+    @Override
+    public List<DataFileMeta> allFiles() {
+        return levels.allFiles();
+    }
+
     @Override
     public void triggerCompaction(boolean fullCompaction) {
         Optional<CompactUnit> optionalUnit;
diff --git a/flink-table-store-core/src/main/java/org/apache/flink/table/store/file/operation/AbstractFileStoreWrite.java b/flink-table-store-core/src/main/java/org/apache/flink/table/store/file/operation/AbstractFileStoreWrite.java
index 74d581df..3a3e79a9 100644
--- a/flink-table-store-core/src/main/java/org/apache/flink/table/store/file/operation/AbstractFileStoreWrite.java
+++ b/flink-table-store-core/src/main/java/org/apache/flink/table/store/file/operation/AbstractFileStoreWrite.java
@@ -24,8 +24,10 @@ import org.apache.flink.table.store.file.Snapshot;
 import org.apache.flink.table.store.file.disk.IOManager;
 import org.apache.flink.table.store.file.io.DataFileMeta;
 import org.apache.flink.table.store.file.manifest.ManifestEntry;
+import org.apache.flink.table.store.file.utils.CommitIncrement;
 import org.apache.flink.table.store.file.utils.ExecutorThreadFactory;
 import org.apache.flink.table.store.file.utils.RecordWriter;
+import org.apache.flink.table.store.file.utils.Restorable;
 import org.apache.flink.table.store.file.utils.SnapshotManager;
 import org.apache.flink.table.store.table.sink.CommitMessage;
 import org.apache.flink.table.store.table.sink.CommitMessageImpl;
@@ -50,12 +52,13 @@ import java.util.concurrent.Executors;
  *
  * @param <T> type of record to write.
  */
-public abstract class AbstractFileStoreWrite<T> implements FileStoreWrite<T> {
+public abstract class AbstractFileStoreWrite<T>
+        implements FileStoreWrite<T>, Restorable<List<AbstractFileStoreWrite.State>> {
 
     private static final Logger LOG = LoggerFactory.getLogger(AbstractFileStoreWrite.class);
 
     private final String commitUser;
-    protected final SnapshotManager snapshotManager;
+    private final SnapshotManager snapshotManager;
     private final FileStoreScan scan;
 
     @Nullable protected IOManager ioManager;
@@ -74,35 +77,12 @@ public abstract class AbstractFileStoreWrite<T> implements FileStoreWrite<T> {
         this.writers = new HashMap<>();
     }
 
-    private ExecutorService compactExecutor() {
-        if (lazyCompactExecutor == null) {
-            lazyCompactExecutor =
-                    Executors.newSingleThreadScheduledExecutor(
-                            new ExecutorThreadFactory(
-                                    Thread.currentThread().getName() + "-compaction"));
-        }
-        return lazyCompactExecutor;
-    }
-
     @Override
     public FileStoreWrite<T> withIOManager(IOManager ioManager) {
         this.ioManager = ioManager;
         return this;
     }
 
-    protected List<DataFileMeta> scanExistingFileMetas(
-            Long snapshotId, BinaryRow partition, int bucket) {
-        List<DataFileMeta> existingFileMetas = new ArrayList<>();
-        if (snapshotId != null) {
-            // Concat all the DataFileMeta of existing files into existingFileMetas.
-            scan.withSnapshot(snapshotId).withPartitionFilter(Collections.singletonList(partition))
-                    .withBucket(bucket).plan().files().stream()
-                    .map(ManifestEntry::file)
-                    .forEach(existingFileMetas::add);
-        }
-        return existingFileMetas;
-    }
-
     public void withOverwrite(boolean overwrite) {
         this.overwrite = overwrite;
     }
@@ -177,8 +157,7 @@ public abstract class AbstractFileStoreWrite<T> implements FileStoreWrite<T> {
                 int bucket = entry.getKey();
                 WriterContainer<T> writerContainer = entry.getValue();
 
-                RecordWriter.CommitIncrement increment =
-                        writerContainer.writer.prepareCommit(waitCompaction);
+                CommitIncrement increment = writerContainer.writer.prepareCommit(waitCompaction);
                 CommitMessageImpl committable =
                         new CommitMessageImpl(
                                 partition,
@@ -232,6 +211,68 @@ public abstract class AbstractFileStoreWrite<T> implements FileStoreWrite<T> {
         }
     }
 
+    @Override
+    public List<State> checkpoint() {
+        List<State> result = new ArrayList<>();
+
+        for (Map.Entry<BinaryRow, Map<Integer, WriterContainer<T>>> partitionEntry :
+                writers.entrySet()) {
+            BinaryRow partition = partitionEntry.getKey();
+            for (Map.Entry<Integer, WriterContainer<T>> bucketEntry :
+                    partitionEntry.getValue().entrySet()) {
+                int bucket = bucketEntry.getKey();
+                WriterContainer<T> writerContainer = bucketEntry.getValue();
+
+                CommitIncrement increment;
+                try {
+                    increment = writerContainer.writer.prepareCommit(false);
+                } catch (Exception e) {
+                    throw new RuntimeException(
+                            "Failed to extract state from writer of partition "
+                                    + partition
+                                    + " bucket "
+                                    + bucket,
+                            e);
+                }
+                // writer.allFiles() must be fetched after writer.prepareCommit(), because
+                // compaction result might be updated during prepareCommit
+                List<DataFileMeta> dataFiles = writerContainer.writer.dataFiles();
+                result.add(
+                        new State(
+                                partition,
+                                bucket,
+                                writerContainer.baseSnapshotId,
+                                writerContainer.lastModifiedCommitIdentifier,
+                                dataFiles,
+                                increment));
+            }
+        }
+
+        if (LOG.isDebugEnabled()) {
+            LOG.debug("Extracted state " + result.toString());
+        }
+        return result;
+    }
+
+    @Override
+    public void restore(List<State> states) {
+        for (State state : states) {
+            RecordWriter<T> writer =
+                    createWriter(
+                            state.partition,
+                            state.bucket,
+                            state.dataFiles,
+                            state.commitIncrement,
+                            compactExecutor());
+            notifyNewWriter(writer);
+            WriterContainer<T> writerContainer =
+                    new WriterContainer<>(writer, state.baseSnapshotId);
+            writerContainer.lastModifiedCommitIdentifier = state.lastModifiedCommitIdentifier;
+            writers.computeIfAbsent(state.partition, k -> new HashMap<>())
+                    .put(state.bucket, writerContainer);
+        }
+    }
+
     private WriterContainer<T> getWriterWrapper(BinaryRow partition, int bucket) {
         Map<Integer, WriterContainer<T>> buckets = writers.get(partition);
         if (buckets == null) {
@@ -239,32 +280,70 @@ public abstract class AbstractFileStoreWrite<T> implements FileStoreWrite<T> {
             writers.put(partition.copy(), buckets);
         }
         return buckets.computeIfAbsent(
-                bucket, k -> createWriterContainer(partition.copy(), bucket));
+                bucket, k -> createWriterContainer(partition.copy(), bucket, overwrite));
     }
 
-    private WriterContainer<T> createWriterContainer(BinaryRow partition, int bucket) {
+    @VisibleForTesting
+    public WriterContainer<T> createWriterContainer(
+            BinaryRow partition, int bucket, boolean emptyWriter) {
         if (LOG.isDebugEnabled()) {
             LOG.debug("Creating writer for partition {}, bucket {}", partition, bucket);
         }
-        WriterContainer<T> writerContainer =
-                overwrite
-                        ? createEmptyWriterContainer(partition.copy(), bucket, compactExecutor())
-                        : createWriterContainer(partition.copy(), bucket, compactExecutor());
-        notifyNewWriter(writerContainer.writer);
-        return writerContainer;
+
+        Long latestSnapshotId = snapshotManager.latestSnapshotId();
+        RecordWriter<T> writer;
+        if (emptyWriter) {
+            writer =
+                    createWriter(
+                            partition.copy(),
+                            bucket,
+                            Collections.emptyList(),
+                            null,
+                            compactExecutor());
+        } else {
+            writer =
+                    createWriter(
+                            partition.copy(),
+                            bucket,
+                            scanExistingFileMetas(latestSnapshotId, partition, bucket),
+                            null,
+                            compactExecutor());
+        }
+        notifyNewWriter(writer);
+        return new WriterContainer<>(writer, latestSnapshotId);
     }
 
-    protected void notifyNewWriter(RecordWriter<T> writer) {}
+    private List<DataFileMeta> scanExistingFileMetas(
+            Long snapshotId, BinaryRow partition, int bucket) {
+        List<DataFileMeta> existingFileMetas = new ArrayList<>();
+        if (snapshotId != null) {
+            // Concat all the DataFileMeta of existing files into existingFileMetas.
+            scan.withSnapshot(snapshotId).withPartitionFilter(Collections.singletonList(partition))
+                    .withBucket(bucket).plan().files().stream()
+                    .map(ManifestEntry::file)
+                    .forEach(existingFileMetas::add);
+        }
+        return existingFileMetas;
+    }
 
-    /** Create a {@link RecordWriter} from partition and bucket. */
-    @VisibleForTesting
-    public abstract WriterContainer<T> createWriterContainer(
-            BinaryRow partition, int bucket, ExecutorService compactExecutor);
+    private ExecutorService compactExecutor() {
+        if (lazyCompactExecutor == null) {
+            lazyCompactExecutor =
+                    Executors.newSingleThreadScheduledExecutor(
+                            new ExecutorThreadFactory(
+                                    Thread.currentThread().getName() + "-compaction"));
+        }
+        return lazyCompactExecutor;
+    }
 
-    /** Create an empty {@link RecordWriter} from partition and bucket. */
-    @VisibleForTesting
-    public abstract WriterContainer<T> createEmptyWriterContainer(
-            BinaryRow partition, int bucket, ExecutorService compactExecutor);
+    protected void notifyNewWriter(RecordWriter<T> writer) {}
+
+    protected abstract RecordWriter<T> createWriter(
+            BinaryRow partition,
+            int bucket,
+            List<DataFileMeta> restoreFiles,
+            @Nullable CommitIncrement restoreIncrement,
+            ExecutorService compactExecutor);
 
     /**
      * {@link RecordWriter} with the snapshot id it is created upon and the identifier of its last
@@ -272,10 +351,9 @@ public abstract class AbstractFileStoreWrite<T> implements FileStoreWrite<T> {
      */
     @VisibleForTesting
     public static class WriterContainer<T> {
-
         public final RecordWriter<T> writer;
-        private final long baseSnapshotId;
-        private long lastModifiedCommitIdentifier;
+        protected final long baseSnapshotId;
+        protected long lastModifiedCommitIdentifier;
 
         protected WriterContainer(RecordWriter<T> writer, Long baseSnapshotId) {
             this.writer = writer;
@@ -284,4 +362,42 @@ public abstract class AbstractFileStoreWrite<T> implements FileStoreWrite<T> {
             this.lastModifiedCommitIdentifier = Long.MIN_VALUE;
         }
     }
+
+    /** Recoverable state of {@link AbstractFileStoreWrite}. */
+    public static class State {
+        protected final BinaryRow partition;
+        protected final int bucket;
+
+        protected final long baseSnapshotId;
+        protected final long lastModifiedCommitIdentifier;
+        protected final List<DataFileMeta> dataFiles;
+        protected final CommitIncrement commitIncrement;
+
+        protected State(
+                BinaryRow partition,
+                int bucket,
+                long baseSnapshotId,
+                long lastModifiedCommitIdentifier,
+                List<DataFileMeta> dataFiles,
+                CommitIncrement commitIncrement) {
+            this.partition = partition;
+            this.bucket = bucket;
+            this.baseSnapshotId = baseSnapshotId;
+            this.lastModifiedCommitIdentifier = lastModifiedCommitIdentifier;
+            this.dataFiles = dataFiles;
+            this.commitIncrement = commitIncrement;
+        }
+
+        @Override
+        public String toString() {
+            return String.format(
+                    "{%s, %d, %d, %d, %s, %s}",
+                    partition,
+                    bucket,
+                    baseSnapshotId,
+                    lastModifiedCommitIdentifier,
+                    dataFiles,
+                    commitIncrement);
+        }
+    }
 }
diff --git a/flink-table-store-core/src/main/java/org/apache/flink/table/store/file/operation/AppendOnlyFileStoreWrite.java b/flink-table-store-core/src/main/java/org/apache/flink/table/store/file/operation/AppendOnlyFileStoreWrite.java
index c0b106a6..e46a4e8a 100644
--- a/flink-table-store-core/src/main/java/org/apache/flink/table/store/file/operation/AppendOnlyFileStoreWrite.java
+++ b/flink-table-store-core/src/main/java/org/apache/flink/table/store/file/operation/AppendOnlyFileStoreWrite.java
@@ -28,6 +28,7 @@ import org.apache.flink.table.store.file.compact.NoopCompactManager;
 import org.apache.flink.table.store.file.io.DataFileMeta;
 import org.apache.flink.table.store.file.io.DataFilePathFactory;
 import org.apache.flink.table.store.file.io.RowDataRollingFileWriter;
+import org.apache.flink.table.store.file.utils.CommitIncrement;
 import org.apache.flink.table.store.file.utils.FileStorePathFactory;
 import org.apache.flink.table.store.file.utils.RecordWriter;
 import org.apache.flink.table.store.file.utils.SnapshotManager;
@@ -38,6 +39,8 @@ import org.apache.flink.table.store.table.source.DataSplit;
 import org.apache.flink.table.store.types.RowType;
 import org.apache.flink.table.store.utils.LongCounter;
 
+import javax.annotation.Nullable;
+
 import java.util.Collections;
 import java.util.LinkedList;
 import java.util.List;
@@ -85,31 +88,11 @@ public class AppendOnlyFileStoreWrite extends AbstractFileStoreWrite<InternalRow
     }
 
     @Override
-    public WriterContainer<InternalRow> createWriterContainer(
-            BinaryRow partition, int bucket, ExecutorService compactExecutor) {
-        Long latestSnapshotId = snapshotManager.latestSnapshotId();
-        RecordWriter<InternalRow> writer =
-                createWriter(
-                        partition,
-                        bucket,
-                        scanExistingFileMetas(latestSnapshotId, partition, bucket),
-                        compactExecutor);
-        return new WriterContainer<>(writer, latestSnapshotId);
-    }
-
-    @Override
-    public WriterContainer<InternalRow> createEmptyWriterContainer(
-            BinaryRow partition, int bucket, ExecutorService compactExecutor) {
-        Long latestSnapshotId = snapshotManager.latestSnapshotId();
-        RecordWriter<InternalRow> writer =
-                createWriter(partition, bucket, Collections.emptyList(), compactExecutor);
-        return new WriterContainer<>(writer, latestSnapshotId);
-    }
-
-    private RecordWriter<InternalRow> createWriter(
+    protected RecordWriter<InternalRow> createWriter(
             BinaryRow partition,
             int bucket,
             List<DataFileMeta> restoredFiles,
+            @Nullable CommitIncrement restoreIncrement,
             ExecutorService compactExecutor) {
         // let writer and compact manager hold the same reference
         // and make restore files mutable to update
@@ -136,7 +119,8 @@ public class AppendOnlyFileStoreWrite extends AbstractFileStoreWrite<InternalRow
                 getMaxSequenceNumber(restored),
                 compactManager,
                 commitForceCompact,
-                factory);
+                factory,
+                restoreIncrement);
     }
 
     private AppendOnlyCompactManager.CompactRewriter compactRewriter(
diff --git a/flink-table-store-core/src/main/java/org/apache/flink/table/store/file/operation/KeyValueFileStoreWrite.java b/flink-table-store-core/src/main/java/org/apache/flink/table/store/file/operation/KeyValueFileStoreWrite.java
index 97778e29..cd5822a4 100644
--- a/flink-table-store-core/src/main/java/org/apache/flink/table/store/file/operation/KeyValueFileStoreWrite.java
+++ b/flink-table-store-core/src/main/java/org/apache/flink/table/store/file/operation/KeyValueFileStoreWrite.java
@@ -42,8 +42,8 @@ import org.apache.flink.table.store.file.mergetree.compact.MergeTreeCompactRewri
 import org.apache.flink.table.store.file.mergetree.compact.UniversalCompaction;
 import org.apache.flink.table.store.file.schema.KeyValueFieldsExtractor;
 import org.apache.flink.table.store.file.schema.SchemaManager;
+import org.apache.flink.table.store.file.utils.CommitIncrement;
 import org.apache.flink.table.store.file.utils.FileStorePathFactory;
-import org.apache.flink.table.store.file.utils.RecordWriter;
 import org.apache.flink.table.store.file.utils.SnapshotManager;
 import org.apache.flink.table.store.format.FileFormatDiscover;
 import org.apache.flink.table.store.fs.FileIO;
@@ -53,7 +53,8 @@ import org.apache.flink.table.store.types.RowType;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
-import java.util.Collections;
+import javax.annotation.Nullable;
+
 import java.util.Comparator;
 import java.util.List;
 import java.util.concurrent.ExecutorService;
@@ -118,31 +119,11 @@ public class KeyValueFileStoreWrite extends MemoryFileStoreWrite<KeyValue> {
     }
 
     @Override
-    public WriterContainer<KeyValue> createWriterContainer(
-            BinaryRow partition, int bucket, ExecutorService compactExecutor) {
-        Long latestSnapshotId = snapshotManager.latestSnapshotId();
-        RecordWriter<KeyValue> writer =
-                createMergeTreeWriter(
-                        partition,
-                        bucket,
-                        scanExistingFileMetas(latestSnapshotId, partition, bucket),
-                        compactExecutor);
-        return new WriterContainer<>(writer, latestSnapshotId);
-    }
-
-    @Override
-    public WriterContainer<KeyValue> createEmptyWriterContainer(
-            BinaryRow partition, int bucket, ExecutorService compactExecutor) {
-        Long latestSnapshotId = snapshotManager.latestSnapshotId();
-        RecordWriter<KeyValue> writer =
-                createMergeTreeWriter(partition, bucket, Collections.emptyList(), compactExecutor);
-        return new WriterContainer<>(writer, latestSnapshotId);
-    }
-
-    private MergeTreeWriter createMergeTreeWriter(
+    protected MergeTreeWriter createWriter(
             BinaryRow partition,
             int bucket,
             List<DataFileMeta> restoreFiles,
+            @Nullable CommitIncrement restoreIncrement,
             ExecutorService compactExecutor) {
         if (LOG.isDebugEnabled()) {
             LOG.debug(
@@ -178,7 +159,8 @@ public class KeyValueFileStoreWrite extends MemoryFileStoreWrite<KeyValue> {
                 mfFactory.create(),
                 writerFactory,
                 options.commitForceCompact(),
-                options.changelogProducer());
+                options.changelogProducer(),
+                restoreIncrement);
     }
 
     private boolean bufferSpillable() {
diff --git a/flink-table-store-core/src/main/java/org/apache/flink/table/store/file/utils/CommitIncrement.java b/flink-table-store-core/src/main/java/org/apache/flink/table/store/file/utils/CommitIncrement.java
new file mode 100644
index 00000000..e1aa61ec
--- /dev/null
+++ b/flink-table-store-core/src/main/java/org/apache/flink/table/store/file/utils/CommitIncrement.java
@@ -0,0 +1,47 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.store.file.utils;
+
+import org.apache.flink.table.store.file.io.CompactIncrement;
+import org.apache.flink.table.store.file.io.NewFilesIncrement;
+
+/** Changes to commit. */
+public class CommitIncrement {
+
+    private final NewFilesIncrement newFilesIncrement;
+    private final CompactIncrement compactIncrement;
+
+    public CommitIncrement(NewFilesIncrement newFilesIncrement, CompactIncrement compactIncrement) {
+        this.newFilesIncrement = newFilesIncrement;
+        this.compactIncrement = compactIncrement;
+    }
+
+    public NewFilesIncrement newFilesIncrement() {
+        return newFilesIncrement;
+    }
+
+    public CompactIncrement compactIncrement() {
+        return compactIncrement;
+    }
+
+    @Override
+    public String toString() {
+        return newFilesIncrement.toString() + "\n" + compactIncrement;
+    }
+}
diff --git a/flink-table-store-core/src/main/java/org/apache/flink/table/store/file/utils/RecordWriter.java b/flink-table-store-core/src/main/java/org/apache/flink/table/store/file/utils/RecordWriter.java
index 6c561486..447a23f1 100644
--- a/flink-table-store-core/src/main/java/org/apache/flink/table/store/file/utils/RecordWriter.java
+++ b/flink-table-store-core/src/main/java/org/apache/flink/table/store/file/utils/RecordWriter.java
@@ -18,9 +18,7 @@
 
 package org.apache.flink.table.store.file.utils;
 
-import org.apache.flink.table.store.file.io.CompactIncrement;
 import org.apache.flink.table.store.file.io.DataFileMeta;
-import org.apache.flink.table.store.file.io.NewFilesIncrement;
 
 import java.util.List;
 
@@ -51,6 +49,9 @@ public interface RecordWriter<T> {
      */
     void addNewFiles(List<DataFileMeta> files);
 
+    /** Get all data files maintained by this writer. */
+    List<DataFileMeta> dataFiles();
+
     /**
      * Prepare for a commit.
      *
@@ -67,12 +68,4 @@ public interface RecordWriter<T> {
 
     /** Close this writer, the call will delete newly generated but not committed files. */
     void close() throws Exception;
-
-    /** Changes to commit. */
-    interface CommitIncrement {
-
-        NewFilesIncrement newFilesIncrement();
-
-        CompactIncrement compactIncrement();
-    }
 }
diff --git a/flink-table-store-core/src/main/java/org/apache/flink/table/store/table/source/StreamTableScan.java b/flink-table-store-core/src/main/java/org/apache/flink/table/store/file/utils/Restorable.java
similarity index 59%
copy from flink-table-store-core/src/main/java/org/apache/flink/table/store/table/source/StreamTableScan.java
copy to flink-table-store-core/src/main/java/org/apache/flink/table/store/file/utils/Restorable.java
index 98526a1e..f56ccf5f 100644
--- a/flink-table-store-core/src/main/java/org/apache/flink/table/store/table/source/StreamTableScan.java
+++ b/flink-table-store-core/src/main/java/org/apache/flink/table/store/file/utils/Restorable.java
@@ -16,24 +16,19 @@
  * limitations under the License.
  */
 
-package org.apache.flink.table.store.table.source;
-
-import org.apache.flink.table.store.annotation.Experimental;
-
-import javax.annotation.Nullable;
+package org.apache.flink.table.store.file.utils;
 
 /**
- * {@link TableScan} for streaming, supports {@link #checkpoint} and {@link #restore}.
+ * Operations implementing this interface can checkpoint and restore their states between different
+ * instances.
  *
- * @since 0.4.0
+ * @param <S> type of state
  */
-@Experimental
-public interface StreamTableScan extends TableScan {
+public interface Restorable<S> {
 
-    /** Checkpoint this stream table scan, return next snapshot id. */
-    @Nullable
-    Long checkpoint();
+    /** Extract state of the current operation instance. */
+    S checkpoint();
 
-    /** Restore this stream table scan, read incremental from {@code nextSnapshotId}. */
-    void restore(@Nullable Long nextSnapshotId);
+    /** Restore state of a previous operation instance into the current operation instance. */
+    void restore(S state);
 }
diff --git a/flink-table-store-core/src/main/java/org/apache/flink/table/store/table/sink/TableWriteImpl.java b/flink-table-store-core/src/main/java/org/apache/flink/table/store/table/sink/TableWriteImpl.java
index 2dd51b92..b304d22d 100644
--- a/flink-table-store-core/src/main/java/org/apache/flink/table/store/table/sink/TableWriteImpl.java
+++ b/flink-table-store-core/src/main/java/org/apache/flink/table/store/table/sink/TableWriteImpl.java
@@ -22,7 +22,9 @@ import org.apache.flink.table.store.data.BinaryRow;
 import org.apache.flink.table.store.data.InternalRow;
 import org.apache.flink.table.store.file.disk.IOManager;
 import org.apache.flink.table.store.file.io.DataFileMeta;
+import org.apache.flink.table.store.file.operation.AbstractFileStoreWrite;
 import org.apache.flink.table.store.file.operation.FileStoreWrite;
+import org.apache.flink.table.store.file.utils.Restorable;
 
 import java.util.List;
 
@@ -33,9 +35,10 @@ import static org.apache.flink.table.store.utils.Preconditions.checkState;
  *
  * @param <T> type of record to write into {@link org.apache.flink.table.store.file.FileStore}.
  */
-public class TableWriteImpl<T> implements InnerTableWrite {
+public class TableWriteImpl<T>
+        implements InnerTableWrite, Restorable<List<AbstractFileStoreWrite.State>> {
 
-    private final FileStoreWrite<T> write;
+    private final AbstractFileStoreWrite<T> write;
     private final SinkRecordConverter recordConverter;
     private final RecordExtractor<T> recordExtractor;
 
@@ -45,7 +48,7 @@ public class TableWriteImpl<T> implements InnerTableWrite {
             FileStoreWrite<T> write,
             SinkRecordConverter recordConverter,
             RecordExtractor<T> recordExtractor) {
-        this.write = write;
+        this.write = (AbstractFileStoreWrite<T>) write;
         this.recordConverter = recordConverter;
         this.recordExtractor = recordExtractor;
     }
@@ -121,6 +124,16 @@ public class TableWriteImpl<T> implements InnerTableWrite {
         write.close();
     }
 
+    @Override
+    public List<AbstractFileStoreWrite.State> checkpoint() {
+        return write.checkpoint();
+    }
+
+    @Override
+    public void restore(List<AbstractFileStoreWrite.State> state) {
+        write.restore(state);
+    }
+
     /** Extractor to extract {@link T} from the {@link SinkRecord}. */
     public interface RecordExtractor<T> {
 
diff --git a/flink-table-store-core/src/main/java/org/apache/flink/table/store/table/source/StreamTableScan.java b/flink-table-store-core/src/main/java/org/apache/flink/table/store/table/source/StreamTableScan.java
index 98526a1e..9fd99f51 100644
--- a/flink-table-store-core/src/main/java/org/apache/flink/table/store/table/source/StreamTableScan.java
+++ b/flink-table-store-core/src/main/java/org/apache/flink/table/store/table/source/StreamTableScan.java
@@ -19,8 +19,7 @@
 package org.apache.flink.table.store.table.source;
 
 import org.apache.flink.table.store.annotation.Experimental;
-
-import javax.annotation.Nullable;
+import org.apache.flink.table.store.file.utils.Restorable;
 
 /**
  * {@link TableScan} for streaming, supports {@link #checkpoint} and {@link #restore}.
@@ -28,12 +27,4 @@ import javax.annotation.Nullable;
  * @since 0.4.0
  */
 @Experimental
-public interface StreamTableScan extends TableScan {
-
-    /** Checkpoint this stream table scan, return next snapshot id. */
-    @Nullable
-    Long checkpoint();
-
-    /** Restore this stream table scan, read incremental from {@code nextSnapshotId}. */
-    void restore(@Nullable Long nextSnapshotId);
-}
+public interface StreamTableScan extends TableScan, Restorable<Long> {}
diff --git a/flink-table-store-core/src/test/java/org/apache/flink/table/store/file/TestFileStore.java b/flink-table-store-core/src/test/java/org/apache/flink/table/store/file/TestFileStore.java
index c776b39a..3261ee59 100644
--- a/flink-table-store-core/src/test/java/org/apache/flink/table/store/file/TestFileStore.java
+++ b/flink-table-store-core/src/test/java/org/apache/flink/table/store/file/TestFileStore.java
@@ -38,6 +38,7 @@ import org.apache.flink.table.store.file.operation.FileStoreScan;
 import org.apache.flink.table.store.file.operation.ScanKind;
 import org.apache.flink.table.store.file.schema.KeyValueFieldsExtractor;
 import org.apache.flink.table.store.file.schema.SchemaManager;
+import org.apache.flink.table.store.file.utils.CommitIncrement;
 import org.apache.flink.table.store.file.utils.FileStorePathFactory;
 import org.apache.flink.table.store.file.utils.RecordWriter;
 import org.apache.flink.table.store.file.utils.SnapshotManager;
@@ -65,8 +66,6 @@ import java.util.List;
 import java.util.Map;
 import java.util.Set;
 import java.util.UUID;
-import java.util.concurrent.ExecutorService;
-import java.util.concurrent.Executors;
 import java.util.concurrent.ThreadLocalRandom;
 import java.util.function.BiConsumer;
 import java.util.function.Function;
@@ -214,15 +213,10 @@ public class TestFileStore extends KeyValueFileStore {
                             bucket,
                             (b, w) -> {
                                 if (w == null) {
-                                    ExecutorService service = Executors.newSingleThreadExecutor();
                                     RecordWriter<KeyValue> writer =
-                                            emptyWriter
-                                                    ? write.createEmptyWriterContainer(
-                                                                    partition, bucket, service)
-                                                            .writer
-                                                    : write.createWriterContainer(
-                                                                    partition, bucket, service)
-                                                            .writer;
+                                            write.createWriterContainer(
+                                                            partition, bucket, emptyWriter)
+                                                    .writer;
                                     ((MemoryOwner) writer)
                                             .setMemoryPool(
                                                     new HeapMemorySegmentPool(
@@ -244,8 +238,7 @@ public class TestFileStore extends KeyValueFileStore {
                 writers.entrySet()) {
             for (Map.Entry<Integer, RecordWriter<KeyValue>> entryWithBucket :
                     entryWithPartition.getValue().entrySet()) {
-                RecordWriter.CommitIncrement increment =
-                        entryWithBucket.getValue().prepareCommit(emptyWriter);
+                CommitIncrement increment = entryWithBucket.getValue().prepareCommit(emptyWriter);
                 committable.addFileCommittable(
                         new CommitMessageImpl(
                                 entryWithPartition.getKey(),
diff --git a/flink-table-store-core/src/test/java/org/apache/flink/table/store/file/append/AppendOnlyWriterTest.java b/flink-table-store-core/src/test/java/org/apache/flink/table/store/file/append/AppendOnlyWriterTest.java
index 0c06c00b..fcd1ecdb 100644
--- a/flink-table-store-core/src/test/java/org/apache/flink/table/store/file/append/AppendOnlyWriterTest.java
+++ b/flink-table-store-core/src/test/java/org/apache/flink/table/store/file/append/AppendOnlyWriterTest.java
@@ -27,6 +27,7 @@ import org.apache.flink.table.store.data.InternalRow;
 import org.apache.flink.table.store.file.io.DataFileMeta;
 import org.apache.flink.table.store.file.io.DataFilePathFactory;
 import org.apache.flink.table.store.file.stats.FieldStatsArraySerializer;
+import org.apache.flink.table.store.file.utils.CommitIncrement;
 import org.apache.flink.table.store.file.utils.ExecutorThreadFactory;
 import org.apache.flink.table.store.file.utils.RecordWriter;
 import org.apache.flink.table.store.format.FieldStats;
@@ -87,7 +88,7 @@ public class AppendOnlyWriterTest {
 
         for (int i = 0; i < 3; i++) {
             writer.sync();
-            RecordWriter.CommitIncrement inc = writer.prepareCommit(true);
+            CommitIncrement inc = writer.prepareCommit(true);
 
             assertThat(inc.newFilesIncrement().isEmpty()).isTrue();
             assertThat(inc.compactIncrement().isEmpty()).isTrue();
@@ -98,7 +99,7 @@ public class AppendOnlyWriterTest {
     public void testSingleWrite() throws Exception {
         RecordWriter<InternalRow> writer = createEmptyWriter(1024 * 1024L);
         writer.write(row(1, "AAA", PART));
-        RecordWriter.CommitIncrement increment = writer.prepareCommit(true);
+        CommitIncrement increment = writer.prepareCommit(true);
         writer.close();
 
         assertThat(increment.newFilesIncrement().newFiles().size()).isEqualTo(1);
@@ -140,7 +141,7 @@ public class AppendOnlyWriterTest {
             }
 
             writer.sync();
-            RecordWriter.CommitIncrement inc = writer.prepareCommit(true);
+            CommitIncrement inc = writer.prepareCommit(true);
             if (txn > 0 && txn % 3 == 0) {
                 assertThat(inc.compactIncrement().compactBefore()).hasSize(4);
                 assertThat(inc.compactIncrement().compactAfter()).hasSize(1);
@@ -199,7 +200,7 @@ public class AppendOnlyWriterTest {
         }
 
         writer.sync();
-        RecordWriter.CommitIncrement firstInc = writer.prepareCommit(true);
+        CommitIncrement firstInc = writer.prepareCommit(true);
         assertThat(firstInc.compactIncrement().compactBefore()).isEqualTo(Collections.emptyList());
         assertThat(firstInc.compactIncrement().compactAfter()).isEqualTo(Collections.emptyList());
 
@@ -241,7 +242,7 @@ public class AppendOnlyWriterTest {
         assertThat(toCompact).containsExactlyElementsOf(firstInc.newFilesIncrement().newFiles());
         writer.write(row(id, String.format("%03d", id), PART));
         writer.sync();
-        RecordWriter.CommitIncrement secInc = writer.prepareCommit(true);
+        CommitIncrement secInc = writer.prepareCommit(true);
 
         // check compact before and after
         List<DataFileMeta> compactBefore = secInc.compactIncrement().compactBefore();
@@ -328,7 +329,8 @@ public class AppendOnlyWriterTest {
                                                         generateCompactAfter(compactBefore)),
                                 pathFactory),
                         forceCompact,
-                        pathFactory),
+                        pathFactory,
+                        null),
                 toCompact);
     }
 
diff --git a/flink-table-store-core/src/test/java/org/apache/flink/table/store/file/format/FileFormatSuffixTest.java b/flink-table-store-core/src/test/java/org/apache/flink/table/store/file/format/FileFormatSuffixTest.java
index 8a1707e2..9024ac42 100644
--- a/flink-table-store-core/src/test/java/org/apache/flink/table/store/file/format/FileFormatSuffixTest.java
+++ b/flink-table-store-core/src/test/java/org/apache/flink/table/store/file/format/FileFormatSuffixTest.java
@@ -26,7 +26,7 @@ import org.apache.flink.table.store.file.io.DataFileMeta;
 import org.apache.flink.table.store.file.io.DataFilePathFactory;
 import org.apache.flink.table.store.file.io.KeyValueFileReadWriteTest;
 import org.apache.flink.table.store.file.io.KeyValueFileWriterFactory;
-import org.apache.flink.table.store.file.utils.RecordWriter;
+import org.apache.flink.table.store.file.utils.CommitIncrement;
 import org.apache.flink.table.store.format.FileFormat;
 import org.apache.flink.table.store.fs.Path;
 import org.apache.flink.table.store.fs.local.LocalFileIO;
@@ -79,10 +79,11 @@ public class FileFormatSuffixTest extends KeyValueFileReadWriteTest {
                                 null,
                                 dataFilePathFactory), // not used
                         false,
-                        dataFilePathFactory);
+                        dataFilePathFactory,
+                        null);
         appendOnlyWriter.write(
                 GenericRow.of(1, BinaryString.fromString("aaa"), BinaryString.fromString("1")));
-        RecordWriter.CommitIncrement increment = appendOnlyWriter.prepareCommit(true);
+        CommitIncrement increment = appendOnlyWriter.prepareCommit(true);
         appendOnlyWriter.close();
 
         DataFileMeta meta = increment.newFilesIncrement().newFiles().get(0);
diff --git a/flink-table-store-core/src/test/java/org/apache/flink/table/store/file/mergetree/MergeTreeTest.java b/flink-table-store-core/src/test/java/org/apache/flink/table/store/file/mergetree/MergeTreeTest.java
index 6707649c..3dcb3453 100644
--- a/flink-table-store-core/src/test/java/org/apache/flink/table/store/file/mergetree/MergeTreeTest.java
+++ b/flink-table-store-core/src/test/java/org/apache/flink/table/store/file/mergetree/MergeTreeTest.java
@@ -40,6 +40,7 @@ import org.apache.flink.table.store.file.mergetree.compact.UniversalCompaction;
 import org.apache.flink.table.store.file.schema.KeyValueFieldsExtractor;
 import org.apache.flink.table.store.file.schema.SchemaManager;
 import org.apache.flink.table.store.file.schema.TableSchema;
+import org.apache.flink.table.store.file.utils.CommitIncrement;
 import org.apache.flink.table.store.file.utils.FileStorePathFactory;
 import org.apache.flink.table.store.file.utils.RecordWriter;
 import org.apache.flink.table.store.format.FileFormat;
@@ -254,7 +255,7 @@ public class MergeTreeTest {
             }
             writeAll(records);
             expected.addAll(records);
-            RecordWriter.CommitIncrement increment = writer.prepareCommit(true);
+            CommitIncrement increment = writer.prepareCommit(true);
             mergeCompacted(newFileNames, compactedFiles, increment);
         }
         writer.close();
@@ -285,7 +286,7 @@ public class MergeTreeTest {
                 writer.sync();
             }
 
-            RecordWriter.CommitIncrement increment = writer.prepareCommit(true);
+            CommitIncrement increment = writer.prepareCommit(true);
             newFiles.addAll(increment.newFilesIncrement().newFiles());
             mergeCompacted(newFileNames, compactedFiles, increment);
         }
@@ -327,7 +328,8 @@ public class MergeTreeTest {
                         DeduplicateMergeFunction.factory().create(),
                         writerFactory,
                         options.commitForceCompact(),
-                        ChangelogProducer.NONE);
+                        ChangelogProducer.NONE,
+                        null);
         writer.setMemoryPool(
                 new HeapMemorySegmentPool(options.writeBufferSize(), options.pageSize()));
         return writer;
@@ -354,7 +356,7 @@ public class MergeTreeTest {
     private void mergeCompacted(
             Set<String> newFileNames,
             List<DataFileMeta> compactedFiles,
-            RecordWriter.CommitIncrement increment) {
+            CommitIncrement increment) {
         increment.newFilesIncrement().newFiles().stream()
                 .map(DataFileMeta::fileName)
                 .forEach(newFileNames::add);
diff --git a/flink-table-store-core/src/test/java/org/apache/flink/table/store/file/operation/TestCommitThread.java b/flink-table-store-core/src/test/java/org/apache/flink/table/store/file/operation/TestCommitThread.java
index 4780421c..232c6f7f 100644
--- a/flink-table-store-core/src/test/java/org/apache/flink/table/store/file/operation/TestCommitThread.java
+++ b/flink-table-store-core/src/test/java/org/apache/flink/table/store/file/operation/TestCommitThread.java
@@ -25,7 +25,7 @@ import org.apache.flink.table.store.file.TestKeyValueGenerator;
 import org.apache.flink.table.store.file.manifest.ManifestCommittable;
 import org.apache.flink.table.store.file.memory.HeapMemorySegmentPool;
 import org.apache.flink.table.store.file.mergetree.MergeTreeWriter;
-import org.apache.flink.table.store.file.utils.RecordWriter;
+import org.apache.flink.table.store.file.utils.CommitIncrement;
 import org.apache.flink.table.store.table.sink.CommitMessageImpl;
 import org.apache.flink.table.store.types.RowType;
 
@@ -41,8 +41,6 @@ import java.util.List;
 import java.util.Map;
 import java.util.Set;
 import java.util.UUID;
-import java.util.concurrent.ExecutorService;
-import java.util.concurrent.Executors;
 import java.util.concurrent.ThreadLocalRandom;
 import java.util.stream.Collectors;
 
@@ -139,7 +137,7 @@ public class TestCommitThread extends Thread {
         }
         ManifestCommittable committable = new ManifestCommittable(commitIdentifier++);
         for (Map.Entry<BinaryRow, MergeTreeWriter> entry : writers.entrySet()) {
-            RecordWriter.CommitIncrement inc = entry.getValue().prepareCommit(true);
+            CommitIncrement inc = entry.getValue().prepareCommit(true);
             committable.addFileCommittable(
                     new CommitMessageImpl(
                             entry.getKey(), 0, inc.newFilesIncrement(), inc.compactIncrement()));
@@ -151,7 +149,7 @@ public class TestCommitThread extends Thread {
     private void doOverwrite() throws Exception {
         BinaryRow partition = overwriteData();
         ManifestCommittable committable = new ManifestCommittable(commitIdentifier++);
-        RecordWriter.CommitIncrement inc = writers.get(partition).prepareCommit(true);
+        CommitIncrement inc = writers.get(partition).prepareCommit(true);
         committable.addFileCommittable(
                 new CommitMessageImpl(
                         partition, 0, inc.newFilesIncrement(), inc.compactIncrement()));
@@ -174,7 +172,7 @@ public class TestCommitThread extends Thread {
                     MergeTreeWriter writer =
                             writers.computeIfAbsent(partition, p -> createWriter(p, false));
                     writer.compact(true);
-                    RecordWriter.CommitIncrement inc = writer.prepareCommit(true);
+                    CommitIncrement inc = writer.prepareCommit(true);
                     committable.addFileCommittable(
                             new CommitMessageImpl(
                                     partition, 0, inc.newFilesIncrement(), inc.compactIncrement()));
@@ -275,19 +273,8 @@ public class TestCommitThread extends Thread {
     }
 
     private MergeTreeWriter createWriter(BinaryRow partition, boolean empty) {
-        ExecutorService service =
-                Executors.newSingleThreadExecutor(
-                        r -> {
-                            Thread t = new Thread(r);
-                            t.setName(Thread.currentThread().getName() + "-writer-service-pool");
-                            return t;
-                        });
         MergeTreeWriter writer =
-                empty
-                        ? (MergeTreeWriter)
-                                write.createEmptyWriterContainer(partition, 0, service).writer
-                        : (MergeTreeWriter)
-                                write.createWriterContainer(partition, 0, service).writer;
+                (MergeTreeWriter) write.createWriterContainer(partition, 0, empty).writer;
         writer.setMemoryPool(
                 new HeapMemorySegmentPool(
                         WRITE_BUFFER_SIZE.getBytes(), (int) PAGE_SIZE.getBytes()));
diff --git a/flink-table-store-core/src/test/java/org/apache/flink/table/store/table/sink/TableWriteTest.java b/flink-table-store-core/src/test/java/org/apache/flink/table/store/table/sink/TableWriteTest.java
new file mode 100644
index 00000000..c9f6cd93
--- /dev/null
+++ b/flink-table-store-core/src/test/java/org/apache/flink/table/store/table/sink/TableWriteTest.java
@@ -0,0 +1,187 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.store.table.sink;
+
+import org.apache.flink.table.store.CoreOptions;
+import org.apache.flink.table.store.data.GenericRow;
+import org.apache.flink.table.store.data.InternalRow;
+import org.apache.flink.table.store.file.operation.AbstractFileStoreWrite;
+import org.apache.flink.table.store.file.schema.Schema;
+import org.apache.flink.table.store.file.schema.SchemaManager;
+import org.apache.flink.table.store.file.schema.SchemaUtils;
+import org.apache.flink.table.store.file.schema.TableSchema;
+import org.apache.flink.table.store.file.utils.TraceableFileIO;
+import org.apache.flink.table.store.fs.Path;
+import org.apache.flink.table.store.fs.local.LocalFileIO;
+import org.apache.flink.table.store.options.MemorySize;
+import org.apache.flink.table.store.options.Options;
+import org.apache.flink.table.store.reader.RecordReaderIterator;
+import org.apache.flink.table.store.table.FileStoreTable;
+import org.apache.flink.table.store.table.FileStoreTableFactory;
+import org.apache.flink.table.store.table.source.TableScan;
+import org.apache.flink.table.store.types.DataType;
+import org.apache.flink.table.store.types.DataTypes;
+import org.apache.flink.table.store.types.RowType;
+
+import org.junit.jupiter.api.AfterEach;
+import org.junit.jupiter.api.BeforeEach;
+import org.junit.jupiter.api.Test;
+import org.junit.jupiter.api.io.TempDir;
+
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.UUID;
+import java.util.concurrent.ThreadLocalRandom;
+import java.util.function.Predicate;
+
+import static org.assertj.core.api.Assertions.assertThat;
+
+/** Tests for {@link TableWriteImpl}. */
+public class TableWriteTest {
+
+    private static final RowType ROW_TYPE =
+            RowType.of(
+                    new DataType[] {DataTypes.INT(), DataTypes.INT(), DataTypes.BIGINT()},
+                    new String[] {"pt", "k", "v"});
+
+    @TempDir java.nio.file.Path tempDir;
+
+    private Path tablePath;
+    private String commitUser;
+
+    @BeforeEach
+    public void before() {
+        tablePath = new Path(TraceableFileIO.SCHEME + "://" + tempDir.toString());
+        commitUser = UUID.randomUUID().toString();
+    }
+
+    @AfterEach
+    public void after() {
+        // assert all connections are closed
+        Predicate<Path> pathPredicate = path -> path.toString().contains(tempDir.toString());
+        assertThat(TraceableFileIO.openInputStreams(pathPredicate)).isEmpty();
+        assertThat(TraceableFileIO.openOutputStreams(pathPredicate)).isEmpty();
+    }
+
+    @Test
+    public void testExtractAndRecoverState() throws Exception {
+        ThreadLocalRandom random = ThreadLocalRandom.current();
+        int commitCount = random.nextInt(10) + 1;
+        int extractCount = random.nextInt(10) + 1;
+        int numRecords = 1000;
+        int numPartitions = 2;
+        int numKeys = 100;
+
+        Map<Integer, List<Event>> events = new HashMap<>();
+        for (int i = 0; i < commitCount; i++) {
+            int prepareTime = random.nextInt(numRecords);
+            int commitTime = random.nextInt(prepareTime, numRecords);
+            events.computeIfAbsent(prepareTime, k -> new ArrayList<>()).add(Event.PREPARE_COMMIT);
+            events.computeIfAbsent(commitTime, k -> new ArrayList<>()).add(Event.COMMIT);
+        }
+        for (int i = 0; i < extractCount; i++) {
+            int extractTime = random.nextInt(numRecords);
+            List<Event> eventList = events.computeIfAbsent(extractTime, k -> new ArrayList<>());
+            eventList.add(random.nextInt(eventList.size() + 1), Event.EXTRACT_STATE);
+        }
+
+        FileStoreTable table = createFileStoreTable();
+        TableWriteImpl<?> write = table.newWrite(commitUser);
+        StreamTableCommit commit = table.newCommit(commitUser);
+
+        Map<String, Long> expected = new HashMap<>();
+        List<List<CommitMessage>> commitList = new ArrayList<>();
+        int commitId = 0;
+        for (int i = 0; i < numRecords; i++) {
+            if (events.containsKey(i)) {
+                List<Event> eventList = events.get(i);
+                for (Event event : eventList) {
+                    switch (event) {
+                        case PREPARE_COMMIT:
+                            List<CommitMessage> messages =
+                                    write.prepareCommit(false, commitList.size());
+                            commitList.add(messages);
+                            break;
+                        case COMMIT:
+                            commit.commit(commitId, commitList.get(commitId));
+                            commitId++;
+                            break;
+                        case EXTRACT_STATE:
+                            List<AbstractFileStoreWrite.State> state = write.checkpoint();
+                            write.close();
+                            write = table.newWrite(commitUser);
+                            write.restore(state);
+                            break;
+                    }
+                }
+            }
+
+            int partition = random.nextInt(numPartitions);
+            int key = random.nextInt(numKeys);
+            long value = random.nextLong();
+            write.write(GenericRow.of(partition, key, value));
+            expected.put(partition + "|" + key, value);
+        }
+
+        assertThat(commitId).isEqualTo(commitCount);
+        List<CommitMessage> messages = write.prepareCommit(false, commitCount);
+        commit.commit(commitCount, messages);
+        write.close();
+        commit.close();
+
+        Map<String, Long> actual = new HashMap<>();
+        TableScan.Plan plan = table.newScan().plan();
+        try (RecordReaderIterator<InternalRow> it =
+                new RecordReaderIterator<>(table.newRead().createReader(plan))) {
+            while (it.hasNext()) {
+                InternalRow row = it.next();
+                actual.put(row.getInt(0) + "|" + row.getInt(1), row.getLong(2));
+            }
+        }
+        assertThat(actual).isEqualTo(expected);
+    }
+
+    private enum Event {
+        PREPARE_COMMIT,
+        COMMIT,
+        EXTRACT_STATE
+    }
+
+    private FileStoreTable createFileStoreTable() throws Exception {
+        Options conf = new Options();
+        conf.set(CoreOptions.BUCKET, 2);
+        conf.set(CoreOptions.WRITE_BUFFER_SIZE, new MemorySize(4096 * 3));
+        conf.set(CoreOptions.PAGE_SIZE, new MemorySize(4096));
+
+        TableSchema tableSchema =
+                SchemaUtils.forceCommit(
+                        new SchemaManager(LocalFileIO.create(), tablePath),
+                        new Schema(
+                                ROW_TYPE.getFields(),
+                                Collections.singletonList("pt"),
+                                Arrays.asList("pt", "k"),
+                                conf.toMap(),
+                                ""));
+        return FileStoreTableFactory.create(LocalFileIO.create(), tablePath, tableSchema);
+    }
+}
diff --git a/flink-table-store-flink/flink-table-store-flink-common/src/test/java/org/apache/flink/table/store/connector/source/FileStoreSourceReaderTest.java b/flink-table-store-flink/flink-table-store-flink-common/src/test/java/org/apache/flink/table/store/connector/source/FileStoreSourceReaderTest.java
index 12c797d4..df4e3d6b 100644
--- a/flink-table-store-flink/flink-table-store-flink-common/src/test/java/org/apache/flink/table/store/connector/source/FileStoreSourceReaderTest.java
+++ b/flink-table-store-flink/flink-table-store-flink-common/src/test/java/org/apache/flink/table/store/connector/source/FileStoreSourceReaderTest.java
@@ -89,7 +89,7 @@ public class FileStoreSourceReaderTest {
         return new FileStoreSourceReader<>(
                 RecordsFunction.forIterate(),
                 context,
-                new TestChangelogDataReadWrite(tempDir.toString(), null).createReadWithKey(),
+                new TestChangelogDataReadWrite(tempDir.toString()).createReadWithKey(),
                 null);
     }
 
diff --git a/flink-table-store-flink/flink-table-store-flink-common/src/test/java/org/apache/flink/table/store/connector/source/FileStoreSourceSplitReaderTest.java b/flink-table-store-flink/flink-table-store-flink-common/src/test/java/org/apache/flink/table/store/connector/source/FileStoreSourceSplitReaderTest.java
index 9ab21e06..4c65cf54 100644
--- a/flink-table-store-flink/flink-table-store-flink-common/src/test/java/org/apache/flink/table/store/connector/source/FileStoreSourceSplitReaderTest.java
+++ b/flink-table-store-flink/flink-table-store-flink-common/src/test/java/org/apache/flink/table/store/connector/source/FileStoreSourceSplitReaderTest.java
@@ -38,8 +38,6 @@ import org.apache.flink.table.types.logical.IntType;
 import org.apache.flink.table.types.logical.RowType;
 import org.apache.flink.types.RowKind;
 
-import org.junit.jupiter.api.AfterAll;
-import org.junit.jupiter.api.BeforeAll;
 import org.junit.jupiter.api.BeforeEach;
 import org.junit.jupiter.api.Test;
 import org.junit.jupiter.api.io.TempDir;
@@ -50,8 +48,6 @@ import java.util.ArrayList;
 import java.util.Arrays;
 import java.util.Collections;
 import java.util.List;
-import java.util.concurrent.ExecutorService;
-import java.util.concurrent.Executors;
 import java.util.stream.Collectors;
 import java.util.stream.Stream;
 
@@ -65,21 +61,8 @@ import static org.assertj.core.api.Assertions.assertThatThrownBy;
 /** Test for {@link FileStoreSourceSplitReader}. */
 public class FileStoreSourceSplitReaderTest {
 
-    private static ExecutorService service;
-
     @TempDir java.nio.file.Path tempDir;
 
-    @BeforeAll
-    public static void before() {
-        service = Executors.newSingleThreadExecutor();
-    }
-
-    @AfterAll
-    public static void after() {
-        service.shutdownNow();
-        service = null;
-    }
-
     @BeforeEach
     public void beforeEach() throws Exception {
         SchemaManager schemaManager =
@@ -126,7 +109,7 @@ public class FileStoreSourceSplitReaderTest {
     }
 
     private void innerTestOnce(boolean valueCountMode, int skip) throws Exception {
-        TestChangelogDataReadWrite rw = new TestChangelogDataReadWrite(tempDir.toString(), service);
+        TestChangelogDataReadWrite rw = new TestChangelogDataReadWrite(tempDir.toString());
         FileStoreSourceSplitReader<RecordAndPosition<RowData>> reader =
                 createReader(
                         valueCountMode ? rw.createReadWithValueCount() : rw.createReadWithKey(),
@@ -170,7 +153,7 @@ public class FileStoreSourceSplitReaderTest {
 
     @Test
     public void testPrimaryKeyWithDelete() throws Exception {
-        TestChangelogDataReadWrite rw = new TestChangelogDataReadWrite(tempDir.toString(), service);
+        TestChangelogDataReadWrite rw = new TestChangelogDataReadWrite(tempDir.toString());
         FileStoreSourceSplitReader<RecordAndPosition<RowData>> reader =
                 createReader(rw.createReadWithKey(), null);
 
@@ -213,7 +196,7 @@ public class FileStoreSourceSplitReaderTest {
 
     @Test
     public void testMultipleBatchInSplit() throws Exception {
-        TestChangelogDataReadWrite rw = new TestChangelogDataReadWrite(tempDir.toString(), service);
+        TestChangelogDataReadWrite rw = new TestChangelogDataReadWrite(tempDir.toString());
         FileStoreSourceSplitReader<RecordAndPosition<RowData>> reader =
                 createReader(rw.createReadWithKey(), null);
 
@@ -250,7 +233,7 @@ public class FileStoreSourceSplitReaderTest {
 
     @Test
     public void testRestore() throws Exception {
-        TestChangelogDataReadWrite rw = new TestChangelogDataReadWrite(tempDir.toString(), service);
+        TestChangelogDataReadWrite rw = new TestChangelogDataReadWrite(tempDir.toString());
         FileStoreSourceSplitReader<RecordAndPosition<RowData>> reader =
                 createReader(rw.createReadWithKey(), null);
 
@@ -277,7 +260,7 @@ public class FileStoreSourceSplitReaderTest {
 
     @Test
     public void testRestoreMultipleBatchInSplit() throws Exception {
-        TestChangelogDataReadWrite rw = new TestChangelogDataReadWrite(tempDir.toString(), service);
+        TestChangelogDataReadWrite rw = new TestChangelogDataReadWrite(tempDir.toString());
         FileStoreSourceSplitReader<RecordAndPosition<RowData>> reader =
                 createReader(rw.createReadWithKey(), null);
 
@@ -309,7 +292,7 @@ public class FileStoreSourceSplitReaderTest {
 
     @Test
     public void testMultipleSplits() throws Exception {
-        TestChangelogDataReadWrite rw = new TestChangelogDataReadWrite(tempDir.toString(), service);
+        TestChangelogDataReadWrite rw = new TestChangelogDataReadWrite(tempDir.toString());
         FileStoreSourceSplitReader<RecordAndPosition<RowData>> reader =
                 createReader(rw.createReadWithKey(), null);
 
@@ -348,7 +331,7 @@ public class FileStoreSourceSplitReaderTest {
 
     @Test
     public void testNoSplit() throws Exception {
-        TestChangelogDataReadWrite rw = new TestChangelogDataReadWrite(tempDir.toString(), service);
+        TestChangelogDataReadWrite rw = new TestChangelogDataReadWrite(tempDir.toString());
         FileStoreSourceSplitReader<RecordAndPosition<RowData>> reader =
                 createReader(rw.createReadWithKey(), null);
         assertThatThrownBy(reader::fetch).hasMessageContaining("no split remaining");
@@ -357,7 +340,7 @@ public class FileStoreSourceSplitReaderTest {
 
     @Test
     public void testLimit() throws Exception {
-        TestChangelogDataReadWrite rw = new TestChangelogDataReadWrite(tempDir.toString(), service);
+        TestChangelogDataReadWrite rw = new TestChangelogDataReadWrite(tempDir.toString());
         FileStoreSourceSplitReader<RecordAndPosition<RowData>> reader =
                 createReader(rw.createReadWithKey(), 2L);
 
diff --git a/flink-table-store-flink/flink-table-store-flink-common/src/test/java/org/apache/flink/table/store/connector/source/TestChangelogDataReadWrite.java b/flink-table-store-flink/flink-table-store-flink-common/src/test/java/org/apache/flink/table/store/connector/source/TestChangelogDataReadWrite.java
index 397bdf2f..294e9d9c 100644
--- a/flink-table-store-flink/flink-table-store-flink-common/src/test/java/org/apache/flink/table/store/connector/source/TestChangelogDataReadWrite.java
+++ b/flink-table-store-flink/flink-table-store-flink-common/src/test/java/org/apache/flink/table/store/connector/source/TestChangelogDataReadWrite.java
@@ -51,14 +51,12 @@ import org.apache.flink.table.store.types.DataField;
 import org.apache.flink.table.store.types.IntType;
 import org.apache.flink.table.store.types.RowKind;
 import org.apache.flink.table.store.types.RowType;
-import org.apache.flink.table.store.utils.Preconditions;
 
 import java.util.ArrayList;
 import java.util.Collections;
 import java.util.Comparator;
 import java.util.List;
 import java.util.UUID;
-import java.util.concurrent.ExecutorService;
 import java.util.function.Function;
 
 import static java.util.Collections.singletonList;
@@ -97,10 +95,9 @@ public class TestChangelogDataReadWrite {
     private final Path tablePath;
     private final FileStorePathFactory pathFactory;
     private final SnapshotManager snapshotManager;
-    private final ExecutorService service;
     private final String commitUser;
 
-    public TestChangelogDataReadWrite(String root, ExecutorService service) {
+    public TestChangelogDataReadWrite(String root) {
         this.avro = FileFormat.fromIdentifier("avro", new Options());
         this.tablePath = new Path(root);
         this.pathFactory =
@@ -110,7 +107,6 @@ public class TestChangelogDataReadWrite {
                         "default",
                         CoreOptions.FILE_FORMAT.defaultValue());
         this.snapshotManager = new SnapshotManager(LocalFileIO.create(), new Path(root));
-        this.service = service;
         this.commitUser = UUID.randomUUID().toString();
     }
 
@@ -160,8 +156,6 @@ public class TestChangelogDataReadWrite {
 
     public List<DataFileMeta> writeFiles(
             BinaryRow partition, int bucket, List<Tuple2<Long, Long>> kvs) throws Exception {
-        Preconditions.checkNotNull(
-                service, "ExecutorService must be provided if writeFiles is needed");
         RecordWriter<KeyValue> writer = createMergeTreeWriter(partition, bucket);
         for (Tuple2<Long, Long> tuple2 : kvs) {
             writer.write(
@@ -194,7 +188,7 @@ public class TestChangelogDataReadWrite {
                                 null, // not used, we only create an empty writer
                                 options,
                                 EXTRACTOR)
-                        .createEmptyWriterContainer(partition, bucket, service)
+                        .createWriterContainer(partition, bucket, true)
                         .writer;
         ((MemoryOwner) writer)
                 .setMemoryPool(