You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by lz...@apache.org on 2022/01/29 02:42:29 UTC

[flink-table-store] branch master updated: [FLINK-25803] Implement partition and bucket filter in FileStoreScanImpl

This is an automated email from the ASF dual-hosted git repository.

lzljs3620320 pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/flink-table-store.git


The following commit(s) were added to refs/heads/master by this push:
     new b647508  [FLINK-25803] Implement partition and bucket filter in FileStoreScanImpl
b647508 is described below

commit b647508352eec54051550e2da9852fc144579824
Author: tsreaper <ts...@gmail.com>
AuthorDate: Sat Jan 29 10:40:29 2022 +0800

    [FLINK-25803] Implement partition and bucket filter in FileStoreScanImpl
    
    This closes #14
---
 .../table/store/file/manifest/ManifestFile.java    |  53 ++++-
 .../table/store/file/manifest/ManifestList.java    |  43 +++-
 .../store/file/mergetree/MergeTreeFactory.java     |  64 ++++++
 .../table/store/file/mergetree/sst/SstFile.java    |  57 ++++-
 .../store/file/mergetree/sst/SstPathFactory.java   |  10 +-
 .../store/file/operation/FileStoreCommitImpl.java  |  30 ++-
 .../table/store/file/operation/FileStoreScan.java  |   3 +
 .../store/file/operation/FileStoreScanImpl.java    | 155 ++++++++++---
 .../store/file/operation/FileStoreWriteImpl.java   |  71 ++++++
 .../file/stats/FieldStatsArraySerializer.java      |  22 +-
 .../store/file/stats/FieldStatsCollector.java      |  17 +-
 .../store/file/utils/FileStorePathFactory.java     |  22 +-
 .../flink/table/store/file/utils/FileUtils.java    |  18 ++
 .../file/utils/RowDataToObjectArrayConverter.java  |  55 +++++
 .../store/file/manifest/ManifestFileMetaTest.java  |  13 +-
 .../store/file/manifest/ManifestFileTest.java      |  13 +-
 .../store/file/manifest/ManifestListTest.java      |   3 +-
 .../table/store/file/mergetree/MergeTreeTest.java  |  31 +--
 .../store/file/mergetree/sst/SstFileTest.java      |  23 +-
 .../file/operation/FileStoreCommitTestBase.java    | 113 ++-------
 .../store/file/operation/FileStoreScanTest.java    | 255 +++++++++++++++++++++
 .../store/file/operation/OperationTestUtils.java   | 190 +++++++++++++++
 .../store/file/operation/TestCommitThread.java     | 115 +---------
 23 files changed, 1033 insertions(+), 343 deletions(-)

diff --git a/flink-table-store-core/src/main/java/org/apache/flink/table/store/file/manifest/ManifestFile.java b/flink-table-store-core/src/main/java/org/apache/flink/table/store/file/manifest/ManifestFile.java
index 1b2bb92..77b3ba7 100644
--- a/flink-table-store-core/src/main/java/org/apache/flink/table/store/file/manifest/ManifestFile.java
+++ b/flink-table-store-core/src/main/java/org/apache/flink/table/store/file/manifest/ManifestFile.java
@@ -47,17 +47,16 @@ public class ManifestFile {
     private final BulkWriter.Factory<RowData> writerFactory;
     private final FileStorePathFactory pathFactory;
 
-    public ManifestFile(
+    private ManifestFile(
             RowType partitionType,
-            RowType keyType,
-            RowType rowType,
-            FileFormat fileFormat,
+            ManifestEntrySerializer serializer,
+            BulkFormat<RowData, FileSourceSplit> readerFactory,
+            BulkWriter.Factory<RowData> writerFactory,
             FileStorePathFactory pathFactory) {
         this.partitionType = partitionType;
-        this.serializer = new ManifestEntrySerializer(partitionType, keyType, rowType);
-        RowType entryType = ManifestEntry.schema(partitionType, keyType, rowType);
-        this.readerFactory = fileFormat.createReaderFactory(entryType);
-        this.writerFactory = fileFormat.createWriterFactory(entryType);
+        this.serializer = serializer;
+        this.readerFactory = readerFactory;
+        this.writerFactory = writerFactory;
         this.pathFactory = pathFactory;
     }
 
@@ -123,4 +122,42 @@ public class ManifestFile {
     public void delete(String fileName) {
         FileUtils.deleteOrWarn(pathFactory.toManifestFilePath(fileName));
     }
+
+    /**
+     * Creator of {@link ManifestFile}. It reueses {@link BulkFormat} and {@link BulkWriter.Factory}
+     * from {@link FileFormat}.
+     */
+    public static class Factory {
+
+        private final RowType partitionType;
+        private final RowType keyType;
+        private final RowType rowType;
+        private final BulkFormat<RowData, FileSourceSplit> readerFactory;
+        private final BulkWriter.Factory<RowData> writerFactory;
+        private final FileStorePathFactory pathFactory;
+
+        public Factory(
+                RowType partitionType,
+                RowType keyType,
+                RowType rowType,
+                FileFormat fileFormat,
+                FileStorePathFactory pathFactory) {
+            this.partitionType = partitionType;
+            this.keyType = keyType;
+            this.rowType = rowType;
+            RowType entryType = ManifestEntry.schema(partitionType, keyType, rowType);
+            this.readerFactory = fileFormat.createReaderFactory(entryType);
+            this.writerFactory = fileFormat.createWriterFactory(entryType);
+            this.pathFactory = pathFactory;
+        }
+
+        public ManifestFile create() {
+            return new ManifestFile(
+                    partitionType,
+                    new ManifestEntrySerializer(partitionType, keyType, rowType),
+                    readerFactory,
+                    writerFactory,
+                    pathFactory);
+        }
+    }
 }
diff --git a/flink-table-store-core/src/main/java/org/apache/flink/table/store/file/manifest/ManifestList.java b/flink-table-store-core/src/main/java/org/apache/flink/table/store/file/manifest/ManifestList.java
index 1b008dd..8a9707b 100644
--- a/flink-table-store-core/src/main/java/org/apache/flink/table/store/file/manifest/ManifestList.java
+++ b/flink-table-store-core/src/main/java/org/apache/flink/table/store/file/manifest/ManifestList.java
@@ -45,12 +45,14 @@ public class ManifestList {
     private final BulkWriter.Factory<RowData> writerFactory;
     private final FileStorePathFactory pathFactory;
 
-    public ManifestList(
-            RowType partitionType, FileFormat fileFormat, FileStorePathFactory pathFactory) {
-        this.serializer = new ManifestFileMetaSerializer(partitionType);
-        RowType metaType = ManifestFileMeta.schema(partitionType);
-        this.readerFactory = fileFormat.createReaderFactory(metaType);
-        this.writerFactory = fileFormat.createWriterFactory(metaType);
+    private ManifestList(
+            ManifestFileMetaSerializer serializer,
+            BulkFormat<RowData, FileSourceSplit> readerFactory,
+            BulkWriter.Factory<RowData> writerFactory,
+            FileStorePathFactory pathFactory) {
+        this.serializer = serializer;
+        this.readerFactory = readerFactory;
+        this.writerFactory = writerFactory;
         this.pathFactory = pathFactory;
     }
 
@@ -97,4 +99,33 @@ public class ManifestList {
     public void delete(String fileName) {
         FileUtils.deleteOrWarn(pathFactory.toManifestListPath(fileName));
     }
+
+    /**
+     * Creator of {@link ManifestList}. It reueses {@link BulkFormat} and {@link BulkWriter.Factory}
+     * from {@link FileFormat}.
+     */
+    public static class Factory {
+
+        private final RowType partitionType;
+        private final BulkFormat<RowData, FileSourceSplit> readerFactory;
+        private final BulkWriter.Factory<RowData> writerFactory;
+        private final FileStorePathFactory pathFactory;
+
+        public Factory(
+                RowType partitionType, FileFormat fileFormat, FileStorePathFactory pathFactory) {
+            this.partitionType = partitionType;
+            RowType metaType = ManifestFileMeta.schema(partitionType);
+            this.readerFactory = fileFormat.createReaderFactory(metaType);
+            this.writerFactory = fileFormat.createWriterFactory(metaType);
+            this.pathFactory = pathFactory;
+        }
+
+        public ManifestList create() {
+            return new ManifestList(
+                    new ManifestFileMetaSerializer(partitionType),
+                    readerFactory,
+                    writerFactory,
+                    pathFactory);
+        }
+    }
 }
diff --git a/flink-table-store-core/src/main/java/org/apache/flink/table/store/file/mergetree/MergeTreeFactory.java b/flink-table-store-core/src/main/java/org/apache/flink/table/store/file/mergetree/MergeTreeFactory.java
new file mode 100644
index 0000000..d24d517
--- /dev/null
+++ b/flink-table-store-core/src/main/java/org/apache/flink/table/store/file/mergetree/MergeTreeFactory.java
@@ -0,0 +1,64 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.store.file.mergetree;
+
+import org.apache.flink.table.data.RowData;
+import org.apache.flink.table.data.binary.BinaryRowData;
+import org.apache.flink.table.store.file.FileFormat;
+import org.apache.flink.table.store.file.mergetree.compact.Accumulator;
+import org.apache.flink.table.store.file.mergetree.sst.SstFile;
+import org.apache.flink.table.store.file.utils.FileStorePathFactory;
+import org.apache.flink.table.types.logical.RowType;
+
+import java.util.Comparator;
+import java.util.concurrent.ExecutorService;
+
+/** Create a new {@link MergeTree} with specific partition and bucket. */
+public class MergeTreeFactory {
+
+    private final Comparator<RowData> keyComparator;
+    private final Accumulator accumulator;
+    private final MergeTreeOptions mergeTreeOptions;
+    private final SstFile.Factory sstFileFactory;
+
+    public MergeTreeFactory(
+            RowType keyType,
+            RowType rowType,
+            Comparator<RowData> keyComparator,
+            Accumulator accumulator,
+            FileFormat fileFormat,
+            FileStorePathFactory pathFactory,
+            MergeTreeOptions mergeTreeOptions) {
+        this.keyComparator = keyComparator;
+        this.accumulator = accumulator;
+        this.mergeTreeOptions = mergeTreeOptions;
+        this.sstFileFactory =
+                new SstFile.Factory(
+                        keyType, rowType, fileFormat, pathFactory, mergeTreeOptions.targetFileSize);
+    }
+
+    public MergeTree create(BinaryRowData partition, int bucket, ExecutorService compactExecutor) {
+        return new MergeTree(
+                mergeTreeOptions,
+                sstFileFactory.create(partition, bucket),
+                keyComparator,
+                compactExecutor,
+                accumulator);
+    }
+}
diff --git a/flink-table-store-core/src/main/java/org/apache/flink/table/store/file/mergetree/sst/SstFile.java b/flink-table-store-core/src/main/java/org/apache/flink/table/store/file/mergetree/sst/SstFile.java
index caae17b..058bf8a 100644
--- a/flink-table-store-core/src/main/java/org/apache/flink/table/store/file/mergetree/sst/SstFile.java
+++ b/flink-table-store-core/src/main/java/org/apache/flink/table/store/file/mergetree/sst/SstFile.java
@@ -33,6 +33,7 @@ import org.apache.flink.table.store.file.FileFormat;
 import org.apache.flink.table.store.file.KeyValue;
 import org.apache.flink.table.store.file.KeyValueSerializer;
 import org.apache.flink.table.store.file.stats.FieldStats;
+import org.apache.flink.table.store.file.utils.FileStorePathFactory;
 import org.apache.flink.table.store.file.utils.FileUtils;
 import org.apache.flink.table.store.file.utils.RecordReader;
 import org.apache.flink.table.types.logical.RowType;
@@ -57,24 +58,22 @@ public class SstFile {
 
     private final RowType keyType;
     private final RowType valueType;
-
     private final BulkFormat<RowData, FileSourceSplit> readerFactory;
     private final BulkWriter.Factory<RowData> writerFactory;
     private final SstPathFactory pathFactory;
     private final long suggestedFileSize;
 
-    public SstFile(
+    private SstFile(
             RowType keyType,
             RowType valueType,
-            FileFormat fileFormat,
+            BulkFormat<RowData, FileSourceSplit> readerFactory,
+            BulkWriter.Factory<RowData> writerFactory,
             SstPathFactory pathFactory,
             long suggestedFileSize) {
         this.keyType = keyType;
         this.valueType = valueType;
-
-        RowType recordType = KeyValue.schema(keyType, valueType);
-        this.readerFactory = fileFormat.createReaderFactory(recordType);
-        this.writerFactory = fileFormat.createWriterFactory(recordType);
+        this.readerFactory = readerFactory;
+        this.writerFactory = writerFactory;
         this.pathFactory = pathFactory;
         this.suggestedFileSize = suggestedFileSize;
     }
@@ -88,6 +87,11 @@ public class SstFile {
     }
 
     @VisibleForTesting
+    public SstPathFactory pathFactory() {
+        return pathFactory;
+    }
+
+    @VisibleForTesting
     public long suggestedFileSize() {
         return suggestedFileSize;
     }
@@ -277,4 +281,43 @@ public class SstFile {
                     level);
         }
     }
+
+    /**
+     * Creator of {@link SstFile}. It reueses {@link BulkFormat} and {@link BulkWriter.Factory} from
+     * {@link FileFormat}.
+     */
+    public static class Factory {
+
+        private final RowType keyType;
+        private final RowType valueType;
+        private final BulkFormat<RowData, FileSourceSplit> readerFactory;
+        private final BulkWriter.Factory<RowData> writerFactory;
+        private final FileStorePathFactory pathFactory;
+        private final long suggestedFileSize;
+
+        public Factory(
+                RowType keyType,
+                RowType valueType,
+                FileFormat fileFormat,
+                FileStorePathFactory pathFactory,
+                long suggestedFileSize) {
+            this.keyType = keyType;
+            this.valueType = valueType;
+            RowType recordType = KeyValue.schema(keyType, valueType);
+            this.readerFactory = fileFormat.createReaderFactory(recordType);
+            this.writerFactory = fileFormat.createWriterFactory(recordType);
+            this.pathFactory = pathFactory;
+            this.suggestedFileSize = suggestedFileSize;
+        }
+
+        public SstFile create(BinaryRowData partition, int bucket) {
+            return new SstFile(
+                    keyType,
+                    valueType,
+                    readerFactory,
+                    writerFactory,
+                    pathFactory.createSstPathFactory(partition, bucket),
+                    suggestedFileSize);
+        }
+    }
 }
diff --git a/flink-table-store-core/src/main/java/org/apache/flink/table/store/file/mergetree/sst/SstPathFactory.java b/flink-table-store-core/src/main/java/org/apache/flink/table/store/file/mergetree/sst/SstPathFactory.java
index 8634098..ccc8ec6 100644
--- a/flink-table-store-core/src/main/java/org/apache/flink/table/store/file/mergetree/sst/SstPathFactory.java
+++ b/flink-table-store-core/src/main/java/org/apache/flink/table/store/file/mergetree/sst/SstPathFactory.java
@@ -21,25 +21,29 @@ package org.apache.flink.table.store.file.mergetree.sst;
 import org.apache.flink.annotation.VisibleForTesting;
 import org.apache.flink.core.fs.Path;
 
+import javax.annotation.concurrent.ThreadSafe;
+
 import java.util.UUID;
+import java.util.concurrent.atomic.AtomicInteger;
 
 /** Factory which produces new {@link Path}s for sst files. */
+@ThreadSafe
 public class SstPathFactory {
 
     private final Path bucketDir;
     private final String uuid;
 
-    private int pathCount;
+    private final AtomicInteger pathCount;
 
     public SstPathFactory(Path root, String partition, int bucket) {
         this.bucketDir = new Path(root + "/" + partition + "/bucket-" + bucket);
         this.uuid = UUID.randomUUID().toString();
 
-        this.pathCount = 0;
+        this.pathCount = new AtomicInteger(0);
     }
 
     public Path newPath() {
-        return new Path(bucketDir + "/sst-" + uuid + "-" + (pathCount++));
+        return new Path(bucketDir + "/sst-" + uuid + "-" + pathCount.getAndIncrement());
     }
 
     public Path toPath(String fileName) {
diff --git a/flink-table-store-core/src/main/java/org/apache/flink/table/store/file/operation/FileStoreCommitImpl.java b/flink-table-store-core/src/main/java/org/apache/flink/table/store/file/operation/FileStoreCommitImpl.java
index 8cb9fc0..22eeec6 100644
--- a/flink-table-store-core/src/main/java/org/apache/flink/table/store/file/operation/FileStoreCommitImpl.java
+++ b/flink-table-store-core/src/main/java/org/apache/flink/table/store/file/operation/FileStoreCommitImpl.java
@@ -73,12 +73,11 @@ public class FileStoreCommitImpl implements FileStoreCommit {
 
     private final String commitUser;
     private final ManifestCommittableSerializer committableSerializer;
-
     private final FileStorePathFactory pathFactory;
     private final ManifestFile manifestFile;
     private final ManifestList manifestList;
-    private final FileStoreOptions fileStoreOptions;
     private final FileStoreScan scan;
+    private final FileStoreOptions fileStoreOptions;
 
     @Nullable private Lock lock;
 
@@ -86,18 +85,17 @@ public class FileStoreCommitImpl implements FileStoreCommit {
             String commitUser,
             ManifestCommittableSerializer committableSerializer,
             FileStorePathFactory pathFactory,
-            ManifestFile manifestFile,
-            ManifestList manifestList,
-            FileStoreOptions fileStoreOptions,
-            FileStoreScan scan) {
+            ManifestFile.Factory manifestFileFactory,
+            ManifestList.Factory manifestListFactory,
+            FileStoreScan scan,
+            FileStoreOptions fileStoreOptions) {
         this.commitUser = commitUser;
         this.committableSerializer = committableSerializer;
-
         this.pathFactory = pathFactory;
-        this.manifestFile = manifestFile;
-        this.manifestList = manifestList;
-        this.fileStoreOptions = fileStoreOptions;
+        this.manifestFile = manifestFileFactory.create();
+        this.manifestList = manifestListFactory.create();
         this.scan = scan;
+        this.fileStoreOptions = fileStoreOptions;
 
         this.lock = null;
     }
@@ -327,9 +325,17 @@ public class FileStoreCommitImpl implements FileStoreCommit {
             return;
         }
 
+        List<BinaryRowData> changedPartitions =
+                changes.stream()
+                        .map(ManifestEntry::partition)
+                        .distinct()
+                        .collect(Collectors.toList());
         try {
-            // TODO use partition filter of scan when implemented
-            for (ManifestEntry entry : scan.withSnapshot(snapshotId).plan().files()) {
+            for (ManifestEntry entry :
+                    scan.withSnapshot(snapshotId)
+                            .withPartitionFilter(changedPartitions)
+                            .plan()
+                            .files()) {
                 removedFiles.remove(entry.identifier());
             }
         } catch (Throwable e) {
diff --git a/flink-table-store-core/src/main/java/org/apache/flink/table/store/file/operation/FileStoreScan.java b/flink-table-store-core/src/main/java/org/apache/flink/table/store/file/operation/FileStoreScan.java
index 52edea3..2175f09 100644
--- a/flink-table-store-core/src/main/java/org/apache/flink/table/store/file/operation/FileStoreScan.java
+++ b/flink-table-store-core/src/main/java/org/apache/flink/table/store/file/operation/FileStoreScan.java
@@ -18,6 +18,7 @@
 
 package org.apache.flink.table.store.file.operation;
 
+import org.apache.flink.table.data.binary.BinaryRowData;
 import org.apache.flink.table.store.file.manifest.ManifestEntry;
 import org.apache.flink.table.store.file.manifest.ManifestFileMeta;
 import org.apache.flink.table.store.file.predicate.Predicate;
@@ -31,6 +32,8 @@ public interface FileStoreScan {
 
     FileStoreScan withPartitionFilter(Predicate predicate);
 
+    FileStoreScan withPartitionFilter(List<BinaryRowData> partitions);
+
     FileStoreScan withKeyFilter(Predicate predicate);
 
     FileStoreScan withValueFilter(Predicate predicate);
diff --git a/flink-table-store-core/src/main/java/org/apache/flink/table/store/file/operation/FileStoreScanImpl.java b/flink-table-store-core/src/main/java/org/apache/flink/table/store/file/operation/FileStoreScanImpl.java
index da72dbf..b75dc35 100644
--- a/flink-table-store-core/src/main/java/org/apache/flink/table/store/file/operation/FileStoreScanImpl.java
+++ b/flink-table-store-core/src/main/java/org/apache/flink/table/store/file/operation/FileStoreScanImpl.java
@@ -18,39 +18,58 @@
 
 package org.apache.flink.table.store.file.operation;
 
+import org.apache.flink.table.data.binary.BinaryRowData;
 import org.apache.flink.table.store.file.Snapshot;
 import org.apache.flink.table.store.file.manifest.ManifestEntry;
 import org.apache.flink.table.store.file.manifest.ManifestFile;
 import org.apache.flink.table.store.file.manifest.ManifestFileMeta;
 import org.apache.flink.table.store.file.manifest.ManifestList;
+import org.apache.flink.table.store.file.predicate.And;
+import org.apache.flink.table.store.file.predicate.Equal;
+import org.apache.flink.table.store.file.predicate.Literal;
+import org.apache.flink.table.store.file.predicate.Or;
 import org.apache.flink.table.store.file.predicate.Predicate;
 import org.apache.flink.table.store.file.utils.FileStorePathFactory;
+import org.apache.flink.table.store.file.utils.FileUtils;
+import org.apache.flink.table.store.file.utils.RowDataToObjectArrayConverter;
+import org.apache.flink.table.types.logical.RowType;
 import org.apache.flink.util.Preconditions;
 
 import javax.annotation.Nullable;
 
 import java.util.ArrayList;
-import java.util.LinkedHashMap;
+import java.util.HashMap;
 import java.util.List;
 import java.util.Map;
+import java.util.Optional;
+import java.util.concurrent.ExecutionException;
+import java.util.function.Function;
+import java.util.stream.Collectors;
 
 /** Default implementation of {@link FileStoreScan}. */
 public class FileStoreScanImpl implements FileStoreScan {
 
+    private final RowDataToObjectArrayConverter partitionConverter;
     private final FileStorePathFactory pathFactory;
-    private final ManifestFile manifestFile;
+    private final ManifestFile.Factory manifestFileFactory;
     private final ManifestList manifestList;
 
     private Long snapshotId;
     private List<ManifestFileMeta> manifests;
+    private Predicate partitionFilter;
+    private Predicate keyFilter;
+    private Predicate valueFilter;
+    private Integer bucket;
 
     public FileStoreScanImpl(
+            RowType partitionType,
             FileStorePathFactory pathFactory,
-            ManifestFile manifestFile,
-            ManifestList manifestList) {
+            ManifestFile.Factory manifestFileFactory,
+            ManifestList.Factory manifestListFactory) {
+        this.partitionConverter = new RowDataToObjectArrayConverter(partitionType);
         this.pathFactory = pathFactory;
-        this.manifestFile = manifestFile;
-        this.manifestList = manifestList;
+        this.manifestFileFactory = manifestFileFactory;
+        this.manifestList = manifestListFactory.create();
 
         this.snapshotId = null;
         this.manifests = new ArrayList<>();
@@ -58,22 +77,53 @@ public class FileStoreScanImpl implements FileStoreScan {
 
     @Override
     public FileStoreScan withPartitionFilter(Predicate predicate) {
-        throw new UnsupportedOperationException();
+        this.partitionFilter = predicate;
+        return this;
+    }
+
+    @Override
+    public FileStoreScan withPartitionFilter(List<BinaryRowData> partitions) {
+        Function<BinaryRowData, Predicate> partitionToPredicate =
+                p -> {
+                    List<Predicate> fieldPredicates = new ArrayList<>();
+                    Object[] partitionObjects = partitionConverter.convert(p);
+                    for (int i = 0; i < partitionConverter.getArity(); i++) {
+                        Literal l =
+                                new Literal(
+                                        partitionConverter.rowType().getTypeAt(i),
+                                        partitionObjects[i]);
+                        fieldPredicates.add(new Equal(i, l));
+                    }
+                    return fieldPredicates.stream().reduce(And::new).get();
+                };
+        Optional<Predicate> predicate =
+                partitions.stream()
+                        .filter(p -> p.getArity() > 0)
+                        .map(partitionToPredicate)
+                        .reduce(Or::new);
+        if (predicate.isPresent()) {
+            return withPartitionFilter(predicate.get());
+        } else {
+            return this;
+        }
     }
 
     @Override
     public FileStoreScan withKeyFilter(Predicate predicate) {
-        throw new UnsupportedOperationException();
+        this.keyFilter = predicate;
+        return this;
     }
 
     @Override
     public FileStoreScan withValueFilter(Predicate predicate) {
-        throw new UnsupportedOperationException();
+        this.valueFilter = predicate;
+        return this;
     }
 
     @Override
     public FileStoreScan withBucket(int bucket) {
-        throw new UnsupportedOperationException();
+        this.bucket = bucket;
+        return this;
     }
 
     @Override
@@ -109,34 +159,67 @@ public class FileStoreScanImpl implements FileStoreScan {
     }
 
     private List<ManifestEntry> scan() {
-        Map<ManifestEntry.Identifier, ManifestEntry> map = new LinkedHashMap<>();
-        for (ManifestFileMeta manifest : manifests) {
-            // TODO read each manifest file concurrently
-            for (ManifestEntry entry : manifestFile.read(manifest.fileName())) {
-                ManifestEntry.Identifier identifier = entry.identifier();
-                switch (entry.kind()) {
-                    case ADD:
-                        Preconditions.checkState(
-                                !map.containsKey(identifier),
-                                "Trying to add file %s which is already added. "
-                                        + "Manifest might be corrupted.",
-                                identifier);
-                        map.put(identifier, entry);
-                        break;
-                    case DELETE:
-                        Preconditions.checkState(
-                                map.containsKey(identifier),
-                                "Trying to delete file %s which is not previously added. "
-                                        + "Manifest might be corrupted.",
-                                identifier);
-                        map.remove(identifier);
-                        break;
-                    default:
-                        throw new UnsupportedOperationException(
-                                "Unknown value kind " + entry.kind().name());
-                }
+        List<ManifestEntry> entries;
+        try {
+            entries =
+                    FileUtils.COMMON_IO_FORK_JOIN_POOL
+                            .submit(
+                                    () ->
+                                            manifests
+                                                    .parallelStream()
+                                                    .filter(this::filterManifestFileMeta)
+                                                    .flatMap(m -> readManifestFileMeta(m).stream())
+                                                    .filter(this::filterManifestEntry)
+                                                    .collect(Collectors.toList()))
+                            .get();
+        } catch (InterruptedException | ExecutionException e) {
+            throw new RuntimeException("Failed to read ManifestEntry list concurrently", e);
+        }
+
+        Map<ManifestEntry.Identifier, ManifestEntry> map = new HashMap<>();
+        for (ManifestEntry entry : entries) {
+            ManifestEntry.Identifier identifier = entry.identifier();
+            switch (entry.kind()) {
+                case ADD:
+                    Preconditions.checkState(
+                            !map.containsKey(identifier),
+                            "Trying to add file %s which is already added. "
+                                    + "Manifest might be corrupted.",
+                            identifier);
+                    map.put(identifier, entry);
+                    break;
+                case DELETE:
+                    Preconditions.checkState(
+                            map.containsKey(identifier),
+                            "Trying to delete file %s which is not previously added. "
+                                    + "Manifest might be corrupted.",
+                            identifier);
+                    map.remove(identifier);
+                    break;
+                default:
+                    throw new UnsupportedOperationException(
+                            "Unknown value kind " + entry.kind().name());
             }
         }
         return new ArrayList<>(map.values());
     }
+
+    private boolean filterManifestFileMeta(ManifestFileMeta manifest) {
+        return partitionFilter == null
+                || partitionFilter.test(
+                        manifest.numAddedFiles() + manifest.numDeletedFiles(),
+                        manifest.partitionStats());
+    }
+
+    private boolean filterManifestEntry(ManifestEntry entry) {
+        // TODO apply key & value filter after field stats are collected in
+        //  SstFile.RollingFile#finish
+        return (partitionFilter == null
+                        || partitionFilter.test(partitionConverter.convert(entry.partition())))
+                && (bucket == null || entry.bucket() == bucket);
+    }
+
+    private List<ManifestEntry> readManifestFileMeta(ManifestFileMeta manifest) {
+        return manifestFileFactory.create().read(manifest.fileName());
+    }
 }
diff --git a/flink-table-store-core/src/main/java/org/apache/flink/table/store/file/operation/FileStoreWriteImpl.java b/flink-table-store-core/src/main/java/org/apache/flink/table/store/file/operation/FileStoreWriteImpl.java
new file mode 100644
index 0000000..5734166
--- /dev/null
+++ b/flink-table-store-core/src/main/java/org/apache/flink/table/store/file/operation/FileStoreWriteImpl.java
@@ -0,0 +1,71 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.store.file.operation;
+
+import org.apache.flink.table.data.binary.BinaryRowData;
+import org.apache.flink.table.store.file.manifest.ManifestEntry;
+import org.apache.flink.table.store.file.mergetree.MergeTree;
+import org.apache.flink.table.store.file.mergetree.MergeTreeFactory;
+import org.apache.flink.table.store.file.utils.FileStorePathFactory;
+import org.apache.flink.table.store.file.utils.RecordWriter;
+
+import java.util.Collections;
+import java.util.concurrent.ExecutorService;
+import java.util.stream.Collectors;
+
+/** Default implementation of {@link FileStoreWrite}. */
+public class FileStoreWriteImpl implements FileStoreWrite {
+
+    private final FileStorePathFactory pathFactory;
+    private final MergeTreeFactory mergeTreeFactory;
+    private final FileStoreScan scan;
+
+    public FileStoreWriteImpl(
+            FileStorePathFactory pathFactory,
+            MergeTreeFactory mergeTreeFactory,
+            FileStoreScan scan) {
+        this.pathFactory = pathFactory;
+        this.mergeTreeFactory = mergeTreeFactory;
+        this.scan = scan;
+    }
+
+    @Override
+    public RecordWriter createWriter(
+            BinaryRowData partition, int bucket, ExecutorService compactExecutor) {
+        Long latestSnapshotId = pathFactory.latestSnapshotId();
+        if (latestSnapshotId == null) {
+            return createEmptyWriter(partition, bucket, compactExecutor);
+        } else {
+            MergeTree mergeTree = mergeTreeFactory.create(partition, bucket, compactExecutor);
+            return mergeTree.createWriter(
+                    scan.withSnapshot(latestSnapshotId)
+                            .withPartitionFilter(Collections.singletonList(partition))
+                            .withBucket(bucket).plan().files().stream()
+                            .map(ManifestEntry::file)
+                            .collect(Collectors.toList()));
+        }
+    }
+
+    @Override
+    public RecordWriter createEmptyWriter(
+            BinaryRowData partition, int bucket, ExecutorService compactExecutor) {
+        MergeTree mergeTree = mergeTreeFactory.create(partition, bucket, compactExecutor);
+        return mergeTree.createWriter(Collections.emptyList());
+    }
+}
diff --git a/flink-table-store-core/src/main/java/org/apache/flink/table/store/file/stats/FieldStatsArraySerializer.java b/flink-table-store-core/src/main/java/org/apache/flink/table/store/file/stats/FieldStatsArraySerializer.java
index b1a2aa2..2ddf607 100644
--- a/flink-table-store-core/src/main/java/org/apache/flink/table/store/file/stats/FieldStatsArraySerializer.java
+++ b/flink-table-store-core/src/main/java/org/apache/flink/table/store/file/stats/FieldStatsArraySerializer.java
@@ -22,6 +22,7 @@ import org.apache.flink.table.data.GenericArrayData;
 import org.apache.flink.table.data.GenericRowData;
 import org.apache.flink.table.data.RowData;
 import org.apache.flink.table.store.file.utils.ObjectSerializer;
+import org.apache.flink.table.store.file.utils.RowDataToObjectArrayConverter;
 import org.apache.flink.table.types.logical.ArrayType;
 import org.apache.flink.table.types.logical.BigIntType;
 import org.apache.flink.table.types.logical.LogicalType;
@@ -29,18 +30,17 @@ import org.apache.flink.table.types.logical.RowType;
 
 import java.util.ArrayList;
 import java.util.List;
-import java.util.stream.IntStream;
 
 /** Serializer for array of {@link FieldStats}. */
 public class FieldStatsArraySerializer extends ObjectSerializer<FieldStats[]> {
 
     private static final long serialVersionUID = 1L;
 
-    private final RowData.FieldGetter[] fieldGetters;
+    private final RowDataToObjectArrayConverter converter;
 
     public FieldStatsArraySerializer(RowType rowType) {
         super(schema(rowType));
-        this.fieldGetters = createFieldGetters(toAllFieldsNullableRowType(rowType));
+        this.converter = new RowDataToObjectArrayConverter(toAllFieldsNullableRowType(rowType));
     }
 
     @Override
@@ -59,18 +59,16 @@ public class FieldStatsArraySerializer extends ObjectSerializer<FieldStats[]> {
 
     @Override
     public FieldStats[] fromRow(RowData row) {
-        int rowFieldCount = fieldGetters.length;
+        int rowFieldCount = converter.getArity();
         RowData minValues = row.getRow(0, rowFieldCount);
+        Object[] minValueObjects = converter.convert(minValues);
         RowData maxValues = row.getRow(1, rowFieldCount);
+        Object[] maxValueObjects = converter.convert(maxValues);
         long[] nullValues = row.getArray(2).toLongArray();
 
         FieldStats[] stats = new FieldStats[rowFieldCount];
         for (int i = 0; i < rowFieldCount; i++) {
-            stats[i] =
-                    new FieldStats(
-                            fieldGetters[i].getFieldOrNull(minValues),
-                            fieldGetters[i].getFieldOrNull(maxValues),
-                            nullValues[i]);
+            stats[i] = new FieldStats(minValueObjects[i], maxValueObjects[i], nullValues[i]);
         }
         return stats;
     }
@@ -84,12 +82,6 @@ public class FieldStatsArraySerializer extends ObjectSerializer<FieldStats[]> {
         return new RowType(fields);
     }
 
-    public static RowData.FieldGetter[] createFieldGetters(RowType rowType) {
-        return IntStream.range(0, rowType.getFieldCount())
-                .mapToObj(i -> RowData.createFieldGetter(rowType.getTypeAt(i), i))
-                .toArray(RowData.FieldGetter[]::new);
-    }
-
     private static RowType toAllFieldsNullableRowType(RowType rowType) {
         // as stated in SstFile.RollingFile#finish, field stats are not collected currently so
         // min/max values are all nulls
diff --git a/flink-table-store-core/src/main/java/org/apache/flink/table/store/file/stats/FieldStatsCollector.java b/flink-table-store-core/src/main/java/org/apache/flink/table/store/file/stats/FieldStatsCollector.java
index d7fa281..89ed0f9 100644
--- a/flink-table-store-core/src/main/java/org/apache/flink/table/store/file/stats/FieldStatsCollector.java
+++ b/flink-table-store-core/src/main/java/org/apache/flink/table/store/file/stats/FieldStatsCollector.java
@@ -19,8 +19,8 @@
 package org.apache.flink.table.store.file.stats;
 
 import org.apache.flink.table.data.RowData;
+import org.apache.flink.table.store.file.utils.RowDataToObjectArrayConverter;
 import org.apache.flink.table.types.logical.RowType;
-import org.apache.flink.util.Preconditions;
 
 /** Collector to extract statistics of each fields from a series of records. */
 public class FieldStatsCollector {
@@ -28,15 +28,14 @@ public class FieldStatsCollector {
     private final Object[] minValues;
     private final Object[] maxValues;
     private final long[] nullCounts;
-
-    private final RowData.FieldGetter[] fieldGetters;
+    private final RowDataToObjectArrayConverter converter;
 
     public FieldStatsCollector(RowType rowType) {
         int numFields = rowType.getFieldCount();
         this.minValues = new Object[numFields];
         this.maxValues = new Object[numFields];
         this.nullCounts = new long[numFields];
-        this.fieldGetters = FieldStatsArraySerializer.createFieldGetters(rowType);
+        this.converter = new RowDataToObjectArrayConverter(rowType);
     }
 
     /**
@@ -46,13 +45,9 @@ public class FieldStatsCollector {
      * the collector.
      */
     public void collect(RowData row) {
-        Preconditions.checkArgument(
-                fieldGetters.length == row.getArity(),
-                "Expecting row data with %d fields but found row data with %d fields",
-                fieldGetters.length,
-                row.getArity());
+        Object[] objects = converter.convert(row);
         for (int i = 0; i < row.getArity(); i++) {
-            Object obj = fieldGetters[i].getFieldOrNull(row);
+            Object obj = objects[i];
             if (obj == null) {
                 nullCounts[i]++;
                 continue;
@@ -73,7 +68,7 @@ public class FieldStatsCollector {
     }
 
     public FieldStats[] extract() {
-        FieldStats[] stats = new FieldStats[fieldGetters.length];
+        FieldStats[] stats = new FieldStats[nullCounts.length];
         for (int i = 0; i < stats.length; i++) {
             stats[i] = new FieldStats(minValues[i], maxValues[i], nullCounts[i]);
         }
diff --git a/flink-table-store-core/src/main/java/org/apache/flink/table/store/file/utils/FileStorePathFactory.java b/flink-table-store-core/src/main/java/org/apache/flink/table/store/file/utils/FileStorePathFactory.java
index 9aa9844..679e367 100644
--- a/flink-table-store-core/src/main/java/org/apache/flink/table/store/file/utils/FileStorePathFactory.java
+++ b/flink-table-store-core/src/main/java/org/apache/flink/table/store/file/utils/FileStorePathFactory.java
@@ -37,11 +37,14 @@ import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
 import javax.annotation.Nullable;
+import javax.annotation.concurrent.ThreadSafe;
 
 import java.io.IOException;
 import java.util.UUID;
+import java.util.concurrent.atomic.AtomicInteger;
 
 /** Factory which produces {@link Path}s for each type of files. */
+@ThreadSafe
 public class FileStorePathFactory {
 
     private static final Logger LOG = LoggerFactory.getLogger(FileStorePathFactory.class);
@@ -51,8 +54,8 @@ public class FileStorePathFactory {
     private final String uuid;
     private final RowDataPartitionComputer partitionComputer;
 
-    private int manifestFileCount;
-    private int manifestListCount;
+    private final AtomicInteger manifestFileCount;
+    private final AtomicInteger manifestListCount;
 
     public FileStorePathFactory(Path root) {
         this(root, RowType.of(), FileSystemConnectorOptions.PARTITION_DEFAULT_NAME.defaultValue());
@@ -73,16 +76,22 @@ public class FileStorePathFactory {
                                 .toArray(DataType[]::new),
                         partitionColumns);
 
-        this.manifestFileCount = 0;
-        this.manifestListCount = 0;
+        this.manifestFileCount = new AtomicInteger(0);
+        this.manifestListCount = new AtomicInteger(0);
     }
 
     public Path newManifestFile() {
-        return new Path(root + "/manifest/manifest-" + uuid + "-" + (manifestFileCount++));
+        return new Path(
+                root + "/manifest/manifest-" + uuid + "-" + manifestFileCount.getAndIncrement());
     }
 
     public Path newManifestList() {
-        return new Path(root + "/manifest/manifest-list-" + uuid + "-" + (manifestListCount++));
+        return new Path(
+                root
+                        + "/manifest/manifest-list-"
+                        + uuid
+                        + "-"
+                        + manifestListCount.getAndIncrement());
     }
 
     public Path toManifestFilePath(String manifestFileName) {
@@ -105,6 +114,7 @@ public class FileStorePathFactory {
         return new SstPathFactory(root, getPartitionString(partition), bucket);
     }
 
+    /** IMPORTANT: This method is NOT THREAD SAFE. */
     public String getPartitionString(BinaryRowData partition) {
         return PartitionPathUtils.generatePartitionPath(
                 partitionComputer.generatePartValues(
diff --git a/flink-table-store-core/src/main/java/org/apache/flink/table/store/file/utils/FileUtils.java b/flink-table-store-core/src/main/java/org/apache/flink/table/store/file/utils/FileUtils.java
index 33f78b5..50cce1d 100644
--- a/flink-table-store-core/src/main/java/org/apache/flink/table/store/file/utils/FileUtils.java
+++ b/flink-table-store-core/src/main/java/org/apache/flink/table/store/file/utils/FileUtils.java
@@ -39,6 +39,8 @@ import java.io.OutputStreamWriter;
 import java.nio.charset.StandardCharsets;
 import java.util.ArrayList;
 import java.util.List;
+import java.util.concurrent.ForkJoinPool;
+import java.util.concurrent.ForkJoinWorkerThread;
 
 /** Utils for file reading and writing. */
 public class FileUtils {
@@ -51,6 +53,22 @@ public class FileUtils {
         DEFAULT_READER_CONFIG.setInteger(SourceReaderOptions.ELEMENT_QUEUE_CAPACITY, 1);
     }
 
+    public static final ForkJoinPool COMMON_IO_FORK_JOIN_POOL;
+
+    // if we want to name threads in the fork join pool we need all these
+    // see https://stackoverflow.com/questions/34303094/
+    static {
+        ForkJoinPool.ForkJoinWorkerThreadFactory factory =
+                pool -> {
+                    ForkJoinWorkerThread worker =
+                            ForkJoinPool.defaultForkJoinWorkerThreadFactory.newThread(pool);
+                    worker.setName("file-store-common-io-" + worker.getPoolIndex());
+                    return worker;
+                };
+        COMMON_IO_FORK_JOIN_POOL =
+                new ForkJoinPool(Runtime.getRuntime().availableProcessors(), factory, null, false);
+    }
+
     public static <T> List<T> readListFromFile(
             Path path,
             ObjectSerializer<T> serializer,
diff --git a/flink-table-store-core/src/main/java/org/apache/flink/table/store/file/utils/RowDataToObjectArrayConverter.java b/flink-table-store-core/src/main/java/org/apache/flink/table/store/file/utils/RowDataToObjectArrayConverter.java
new file mode 100644
index 0000000..0def1c1
--- /dev/null
+++ b/flink-table-store-core/src/main/java/org/apache/flink/table/store/file/utils/RowDataToObjectArrayConverter.java
@@ -0,0 +1,55 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.store.file.utils;
+
+import org.apache.flink.table.data.RowData;
+import org.apache.flink.table.types.logical.RowType;
+
+import java.util.stream.IntStream;
+
+/** Convert {@link RowData} to object array. */
+public class RowDataToObjectArrayConverter {
+
+    private final RowType rowType;
+    private final RowData.FieldGetter[] fieldGetters;
+
+    public RowDataToObjectArrayConverter(RowType rowType) {
+        this.rowType = rowType;
+        this.fieldGetters =
+                IntStream.range(0, rowType.getFieldCount())
+                        .mapToObj(i -> RowData.createFieldGetter(rowType.getTypeAt(i), i))
+                        .toArray(RowData.FieldGetter[]::new);
+    }
+
+    public RowType rowType() {
+        return rowType;
+    }
+
+    public int getArity() {
+        return fieldGetters.length;
+    }
+
+    public Object[] convert(RowData rowData) {
+        Object[] result = new Object[fieldGetters.length];
+        for (int i = 0; i < fieldGetters.length; i++) {
+            result[i] = fieldGetters[i].getFieldOrNull(rowData);
+        }
+        return result;
+    }
+}
diff --git a/flink-table-store-core/src/test/java/org/apache/flink/table/store/file/manifest/ManifestFileMetaTest.java b/flink-table-store-core/src/test/java/org/apache/flink/table/store/file/manifest/ManifestFileMetaTest.java
index 027dda5..8ea0d88 100644
--- a/flink-table-store-core/src/test/java/org/apache/flink/table/store/file/manifest/ManifestFileMetaTest.java
+++ b/flink-table-store-core/src/test/java/org/apache/flink/table/store/file/manifest/ManifestFileMetaTest.java
@@ -135,12 +135,13 @@ public class ManifestFileMetaTest {
     }
 
     private ManifestFile createManifestFile(String path) {
-        return new ManifestFile(
-                PARTITION_TYPE,
-                KEY_TYPE,
-                ROW_TYPE,
-                avro,
-                new FileStorePathFactory(new Path(path), PARTITION_TYPE, "default"));
+        return new ManifestFile.Factory(
+                        PARTITION_TYPE,
+                        KEY_TYPE,
+                        ROW_TYPE,
+                        avro,
+                        new FileStorePathFactory(new Path(path), PARTITION_TYPE, "default"))
+                .create();
     }
 
     private void createData(List<ManifestFileMeta> input, List<ManifestFileMeta> expected) {
diff --git a/flink-table-store-core/src/test/java/org/apache/flink/table/store/file/manifest/ManifestFileTest.java b/flink-table-store-core/src/test/java/org/apache/flink/table/store/file/manifest/ManifestFileTest.java
index ee09d9a..99a8b40 100644
--- a/flink-table-store-core/src/test/java/org/apache/flink/table/store/file/manifest/ManifestFileTest.java
+++ b/flink-table-store-core/src/test/java/org/apache/flink/table/store/file/manifest/ManifestFileTest.java
@@ -91,12 +91,13 @@ public class ManifestFileTest {
         FileStorePathFactory pathFactory =
                 new FileStorePathFactory(
                         new Path(path), TestKeyValueGenerator.PARTITION_TYPE, "default");
-        return new ManifestFile(
-                TestKeyValueGenerator.PARTITION_TYPE,
-                TestKeyValueGenerator.KEY_TYPE,
-                TestKeyValueGenerator.ROW_TYPE,
-                avro,
-                pathFactory);
+        return new ManifestFile.Factory(
+                        TestKeyValueGenerator.PARTITION_TYPE,
+                        TestKeyValueGenerator.KEY_TYPE,
+                        TestKeyValueGenerator.ROW_TYPE,
+                        avro,
+                        pathFactory)
+                .create();
     }
 
     private void checkMetaIgnoringFileNameAndSize(
diff --git a/flink-table-store-core/src/test/java/org/apache/flink/table/store/file/manifest/ManifestListTest.java b/flink-table-store-core/src/test/java/org/apache/flink/table/store/file/manifest/ManifestListTest.java
index e9df1c1..6b295c7 100644
--- a/flink-table-store-core/src/test/java/org/apache/flink/table/store/file/manifest/ManifestListTest.java
+++ b/flink-table-store-core/src/test/java/org/apache/flink/table/store/file/manifest/ManifestListTest.java
@@ -94,6 +94,7 @@ public class ManifestListTest {
         FileStorePathFactory pathFactory =
                 new FileStorePathFactory(
                         new Path(path), TestKeyValueGenerator.PARTITION_TYPE, "default");
-        return new ManifestList(TestKeyValueGenerator.PARTITION_TYPE, avro, pathFactory);
+        return new ManifestList.Factory(TestKeyValueGenerator.PARTITION_TYPE, avro, pathFactory)
+                .create();
     }
 }
diff --git a/flink-table-store-core/src/test/java/org/apache/flink/table/store/file/mergetree/MergeTreeTest.java b/flink-table-store-core/src/test/java/org/apache/flink/table/store/file/mergetree/MergeTreeTest.java
index 74cabd1..ca85674 100644
--- a/flink-table-store-core/src/test/java/org/apache/flink/table/store/file/mergetree/MergeTreeTest.java
+++ b/flink-table-store-core/src/test/java/org/apache/flink/table/store/file/mergetree/MergeTreeTest.java
@@ -24,6 +24,7 @@ import org.apache.flink.core.fs.FileStatus;
 import org.apache.flink.core.fs.Path;
 import org.apache.flink.table.data.GenericRowData;
 import org.apache.flink.table.data.RowData;
+import org.apache.flink.table.data.binary.BinaryRowDataUtil;
 import org.apache.flink.table.store.file.KeyValue;
 import org.apache.flink.table.store.file.ValueKind;
 import org.apache.flink.table.store.file.mergetree.compact.DeduplicateAccumulator;
@@ -31,7 +32,7 @@ import org.apache.flink.table.store.file.mergetree.compact.IntervalPartition;
 import org.apache.flink.table.store.file.mergetree.sst.SstFile;
 import org.apache.flink.table.store.file.mergetree.sst.SstFileMeta;
 import org.apache.flink.table.store.file.mergetree.sst.SstFileTest;
-import org.apache.flink.table.store.file.mergetree.sst.SstPathFactory;
+import org.apache.flink.table.store.file.utils.FileStorePathFactory;
 import org.apache.flink.table.store.file.utils.RecordReader;
 import org.apache.flink.table.store.file.utils.RecordReaderIterator;
 import org.apache.flink.table.store.file.utils.RecordWriter;
@@ -70,7 +71,7 @@ public class MergeTreeTest {
 
     private static ExecutorService service;
 
-    private SstPathFactory fileFactory;
+    private FileStorePathFactory pathFactory;
 
     private Comparator<RowData> comparator;
 
@@ -82,12 +83,11 @@ public class MergeTreeTest {
 
     @BeforeEach
     public void beforeEach() throws IOException {
-        fileFactory = new SstPathFactory(new Path(tempDir.toString()), null, 123);
-        Path bucketDir = fileFactory.toPath("ignore").getParent();
-        bucketDir.getFileSystem().mkdirs(bucketDir);
-
+        pathFactory = new FileStorePathFactory(new Path(tempDir.toString()));
         comparator = Comparator.comparingInt(o -> o.getInt(0));
         recreateWriter(1024 * 1024);
+        Path bucketDir = sstFile.pathFactory().toPath("ignore").getParent();
+        bucketDir.getFileSystem().mkdirs(bucketDir);
     }
 
     private void recreateWriter(long targetFileSize) {
@@ -97,12 +97,15 @@ public class MergeTreeTest {
         configuration.set(MergeTreeOptions.TARGET_FILE_SIZE, new MemorySize(targetFileSize));
         MergeTreeOptions options = new MergeTreeOptions(configuration);
         sstFile =
-                new SstFile(
-                        new RowType(singletonList(new RowType.RowField("k", new IntType()))),
-                        new RowType(singletonList(new RowType.RowField("v", new IntType()))),
-                        new SstFileTest.FlushingAvroFormat(),
-                        fileFactory,
-                        options.targetFileSize);
+                new SstFile.Factory(
+                                new RowType(
+                                        singletonList(new RowType.RowField("k", new IntType()))),
+                                new RowType(
+                                        singletonList(new RowType.RowField("v", new IntType()))),
+                                new SstFileTest.FlushingAvroFormat(),
+                                pathFactory,
+                                options.targetFileSize)
+                        .create(BinaryRowDataUtil.EMPTY_ROW, 0);
         mergeTree =
                 new MergeTree(options, sstFile, comparator, service, new DeduplicateAccumulator());
         writer = mergeTree.createWriter(new ArrayList<>());
@@ -160,7 +163,7 @@ public class MergeTreeTest {
         doTestWriteRead(6);
         List<SstFileMeta> files = writer.close();
         for (SstFileMeta file : files) {
-            Path path = fileFactory.toPath(file.fileName());
+            Path path = sstFile.pathFactory().toPath(file.fileName());
             assertThat(path.getFileSystem().exists(path)).isFalse();
         }
     }
@@ -233,7 +236,7 @@ public class MergeTreeTest {
         // assert records from increment compacted files
         assertRecords(expected, compactedFiles, true);
 
-        Path bucketDir = fileFactory.toPath("ignore").getParent();
+        Path bucketDir = sstFile.pathFactory().toPath("ignore").getParent();
         Set<String> files =
                 Arrays.stream(bucketDir.getFileSystem().listStatus(bucketDir))
                         .map(FileStatus::getPath)
diff --git a/flink-table-store-core/src/test/java/org/apache/flink/table/store/file/mergetree/sst/SstFileTest.java b/flink-table-store-core/src/test/java/org/apache/flink/table/store/file/mergetree/sst/SstFileTest.java
index b8fecd3..c09545c 100644
--- a/flink-table-store-core/src/test/java/org/apache/flink/table/store/file/mergetree/sst/SstFileTest.java
+++ b/flink-table-store-core/src/test/java/org/apache/flink/table/store/file/mergetree/sst/SstFileTest.java
@@ -118,19 +118,18 @@ public class SstFileTest {
     }
 
     private SstFile createSstFile(String path) {
-        FileStorePathFactory fileStorePathFactory = new FileStorePathFactory(new Path(path));
-        SstPathFactory sstPathFactory =
-                fileStorePathFactory.createSstPathFactory(BinaryRowDataUtil.EMPTY_ROW, 0);
+        FileStorePathFactory pathFactory = new FileStorePathFactory(new Path(path));
         int suggestedFileSize = ThreadLocalRandom.current().nextInt(8192) + 1024;
-        return new SstFile(
-                TestKeyValueGenerator.KEY_TYPE,
-                TestKeyValueGenerator.ROW_TYPE,
-                // normal avro format will buffer changes in memory and we can't determine
-                // if the written file size is really larger than suggested, so we use a
-                // special avro format which flushes for every added element
-                flushingAvro,
-                sstPathFactory,
-                suggestedFileSize);
+        return new SstFile.Factory(
+                        TestKeyValueGenerator.KEY_TYPE,
+                        TestKeyValueGenerator.ROW_TYPE,
+                        // normal avro format will buffer changes in memory and we can't determine
+                        // if the written file size is really larger than suggested, so we use a
+                        // special avro format which flushes for every added element
+                        flushingAvro,
+                        pathFactory,
+                        suggestedFileSize)
+                .create(BinaryRowDataUtil.EMPTY_ROW, 0);
     }
 
     private void checkRollingFiles(
diff --git a/flink-table-store-core/src/test/java/org/apache/flink/table/store/file/operation/FileStoreCommitTestBase.java b/flink-table-store-core/src/test/java/org/apache/flink/table/store/file/operation/FileStoreCommitTestBase.java
index 0c8f678..0851ff4 100644
--- a/flink-table-store-core/src/test/java/org/apache/flink/table/store/file/operation/FileStoreCommitTestBase.java
+++ b/flink-table-store-core/src/test/java/org/apache/flink/table/store/file/operation/FileStoreCommitTestBase.java
@@ -25,13 +25,8 @@ import org.apache.flink.table.store.file.FileFormat;
 import org.apache.flink.table.store.file.KeyValue;
 import org.apache.flink.table.store.file.TestKeyValueGenerator;
 import org.apache.flink.table.store.file.ValueKind;
-import org.apache.flink.table.store.file.manifest.ManifestEntry;
-import org.apache.flink.table.store.file.manifest.ManifestFile;
-import org.apache.flink.table.store.file.manifest.ManifestList;
-import org.apache.flink.table.store.file.mergetree.sst.SstFile;
 import org.apache.flink.table.store.file.utils.FailingAtomicRenameFileSystem;
 import org.apache.flink.table.store.file.utils.FileStorePathFactory;
-import org.apache.flink.table.store.file.utils.RecordReaderIterator;
 import org.apache.flink.table.store.file.utils.TestAtomicRenameFileSystem;
 
 import org.junit.jupiter.api.BeforeEach;
@@ -71,7 +66,7 @@ public abstract class FileStoreCommitTestBase {
         root.getFileSystem().mkdirs(new Path(root + "/snapshot"));
     }
 
-    protected abstract String getSchema();
+    protected abstract String getScheme();
 
     @RepeatedTest(10)
     public void testSingleCommitUser() throws Exception {
@@ -94,7 +89,7 @@ public abstract class FileStoreCommitTestBase {
                                 .collect(Collectors.toList()),
                 "input");
         Map<BinaryRowData, BinaryRowData> expected =
-                toKvMap(
+                OperationTestUtils.toKvMap(
                         data.values().stream()
                                 .flatMap(Collection::stream)
                                 .collect(Collectors.toList()));
@@ -114,7 +109,10 @@ public abstract class FileStoreCommitTestBase {
         for (int i = 0; i < numThreads; i++) {
             TestCommitThread thread =
                     new TestCommitThread(
-                            dataPerThread.get(i), createTestPathFactory(), createSafePathFactory());
+                            dataPerThread.get(i),
+                            OperationTestUtils.createPathFactory(getScheme(), tempDir.toString()),
+                            OperationTestUtils.createPathFactory(
+                                    TestAtomicRenameFileSystem.SCHEME, tempDir.toString()));
             thread.start();
             threads.add(thread);
         }
@@ -123,7 +121,16 @@ public abstract class FileStoreCommitTestBase {
         }
 
         // read actual data and compare
-        Map<BinaryRowData, BinaryRowData> actual = toKvMap(readKvsFromLatestSnapshot());
+        FileStorePathFactory safePathFactory =
+                OperationTestUtils.createPathFactory(
+                        TestAtomicRenameFileSystem.SCHEME, tempDir.toString());
+        Long snapshotId = safePathFactory.latestSnapshotId();
+        assertThat(snapshotId).isNotNull();
+        List<KeyValue> actualKvs =
+                OperationTestUtils.readKvsFromSnapshot(snapshotId, avro, safePathFactory);
+        gen.sort(actualKvs);
+        logData(() -> actualKvs, "raw read results");
+        Map<BinaryRowData, BinaryRowData> actual = OperationTestUtils.toKvMap(actualKvs);
         logData(() -> kvMapToKvList(expected), "expected");
         logData(() -> kvMapToKvList(actual), "actual");
         assertThat(actual).isEqualTo(expected);
@@ -138,90 +145,6 @@ public abstract class FileStoreCommitTestBase {
         return data;
     }
 
-    private List<KeyValue> readKvsFromLatestSnapshot() throws IOException {
-        FileStorePathFactory pathFactory = createSafePathFactory();
-        Long latestSnapshotId = pathFactory.latestSnapshotId();
-        assertThat(latestSnapshotId).isNotNull();
-
-        ManifestFile manifestFile =
-                new ManifestFile(
-                        TestKeyValueGenerator.PARTITION_TYPE,
-                        TestKeyValueGenerator.KEY_TYPE,
-                        TestKeyValueGenerator.ROW_TYPE,
-                        avro,
-                        pathFactory);
-        ManifestList manifestList =
-                new ManifestList(TestKeyValueGenerator.PARTITION_TYPE, avro, pathFactory);
-
-        List<KeyValue> kvs = new ArrayList<>();
-        List<ManifestEntry> entries =
-                new FileStoreScanImpl(pathFactory, manifestFile, manifestList)
-                        .withSnapshot(latestSnapshotId)
-                        .plan()
-                        .files();
-        for (ManifestEntry entry : entries) {
-            if (LOG.isDebugEnabled()) {
-                LOG.debug("Reading actual key-values from file " + entry.file().fileName());
-            }
-            SstFile sstFile =
-                    new SstFile(
-                            TestKeyValueGenerator.KEY_TYPE,
-                            TestKeyValueGenerator.ROW_TYPE,
-                            avro,
-                            pathFactory.createSstPathFactory(entry.partition(), 0),
-                            1024 * 1024 // not used
-                            );
-            RecordReaderIterator iterator =
-                    new RecordReaderIterator(sstFile.read(entry.file().fileName()));
-            while (iterator.hasNext()) {
-                kvs.add(
-                        iterator.next()
-                                .copy(
-                                        TestKeyValueGenerator.KEY_SERIALIZER,
-                                        TestKeyValueGenerator.ROW_SERIALIZER));
-            }
-        }
-
-        gen.sort(kvs);
-        logData(() -> kvs, "raw read results");
-        return kvs;
-    }
-
-    private Map<BinaryRowData, BinaryRowData> toKvMap(List<KeyValue> kvs) {
-        Map<BinaryRowData, BinaryRowData> result = new HashMap<>();
-        for (KeyValue kv : kvs) {
-            BinaryRowData key = TestKeyValueGenerator.KEY_SERIALIZER.toBinaryRow(kv.key()).copy();
-            BinaryRowData value =
-                    TestKeyValueGenerator.ROW_SERIALIZER.toBinaryRow(kv.value()).copy();
-            switch (kv.valueKind()) {
-                case ADD:
-                    result.put(key, value);
-                    break;
-                case DELETE:
-                    result.remove(key);
-                    break;
-                default:
-                    throw new UnsupportedOperationException(
-                            "Unknown value kind " + kv.valueKind().name());
-            }
-        }
-        return result;
-    }
-
-    private FileStorePathFactory createTestPathFactory() {
-        return new FileStorePathFactory(
-                new Path(getSchema() + "://" + tempDir.toString()),
-                TestKeyValueGenerator.PARTITION_TYPE,
-                "default");
-    }
-
-    private FileStorePathFactory createSafePathFactory() {
-        return new FileStorePathFactory(
-                new Path(TestAtomicRenameFileSystem.SCHEME + "://" + tempDir.toString()),
-                TestKeyValueGenerator.PARTITION_TYPE,
-                "default");
-    }
-
     private List<KeyValue> kvMapToKvList(Map<BinaryRowData, BinaryRowData> map) {
         return map.entrySet().stream()
                 .map(e -> new KeyValue().replace(e.getKey(), -1, ValueKind.ADD, e.getValue()))
@@ -244,7 +167,7 @@ public abstract class FileStoreCommitTestBase {
     public static class WithTestAtomicRenameFileSystem extends FileStoreCommitTestBase {
 
         @Override
-        protected String getSchema() {
+        protected String getScheme() {
             return TestAtomicRenameFileSystem.SCHEME;
         }
     }
@@ -261,7 +184,7 @@ public abstract class FileStoreCommitTestBase {
         }
 
         @Override
-        protected String getSchema() {
+        protected String getScheme() {
             return FailingAtomicRenameFileSystem.SCHEME;
         }
     }
diff --git a/flink-table-store-core/src/test/java/org/apache/flink/table/store/file/operation/FileStoreScanTest.java b/flink-table-store-core/src/test/java/org/apache/flink/table/store/file/operation/FileStoreScanTest.java
new file mode 100644
index 0000000..251c853
--- /dev/null
+++ b/flink-table-store-core/src/test/java/org/apache/flink/table/store/file/operation/FileStoreScanTest.java
@@ -0,0 +1,255 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.store.file.operation;
+
+import org.apache.flink.configuration.Configuration;
+import org.apache.flink.core.fs.Path;
+import org.apache.flink.table.data.binary.BinaryRowData;
+import org.apache.flink.table.store.file.FileFormat;
+import org.apache.flink.table.store.file.KeyValue;
+import org.apache.flink.table.store.file.Snapshot;
+import org.apache.flink.table.store.file.TestKeyValueGenerator;
+import org.apache.flink.table.store.file.manifest.ManifestCommittable;
+import org.apache.flink.table.store.file.manifest.ManifestFileMeta;
+import org.apache.flink.table.store.file.manifest.ManifestList;
+import org.apache.flink.table.store.file.mergetree.Increment;
+import org.apache.flink.table.store.file.utils.FileStorePathFactory;
+import org.apache.flink.table.store.file.utils.RecordWriter;
+import org.apache.flink.table.store.file.utils.TestAtomicRenameFileSystem;
+
+import org.junit.jupiter.api.BeforeEach;
+import org.junit.jupiter.api.RepeatedTest;
+import org.junit.jupiter.api.io.TempDir;
+
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.Collection;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Map;
+import java.util.Set;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.Executors;
+import java.util.concurrent.ThreadLocalRandom;
+import java.util.stream.Collectors;
+
+import static org.assertj.core.api.Assertions.assertThat;
+
+/** Tests for {@link FileStoreScanImpl}. */
+public class FileStoreScanTest {
+
+    private static final int NUM_BUCKETS = 10;
+
+    private final FileFormat avro =
+            FileFormat.fromIdentifier(
+                    FileStoreCommitTestBase.class.getClassLoader(), "avro", new Configuration());
+
+    private TestKeyValueGenerator gen;
+    @TempDir java.nio.file.Path tempDir;
+    private FileStorePathFactory pathFactory;
+
+    @BeforeEach
+    public void beforeEach() throws IOException {
+        gen = new TestKeyValueGenerator();
+        pathFactory =
+                OperationTestUtils.createPathFactory(
+                        TestAtomicRenameFileSystem.SCHEME, tempDir.toString());
+        Path root = new Path(tempDir.toString());
+        root.getFileSystem().mkdirs(new Path(root + "/snapshot"));
+    }
+
+    @RepeatedTest(10)
+    public void testWithPartitionFilter() throws Exception {
+        ThreadLocalRandom random = ThreadLocalRandom.current();
+        List<KeyValue> data = generateData(random.nextInt(1000) + 1);
+        List<BinaryRowData> partitions =
+                data.stream()
+                        .map(kv -> gen.getPartition(kv))
+                        .distinct()
+                        .collect(Collectors.toList());
+        Snapshot snapshot = writeData(data);
+
+        Set<BinaryRowData> wantedPartitions = new HashSet<>();
+        for (int i = random.nextInt(partitions.size() + 1); i > 0; i--) {
+            wantedPartitions.add(partitions.get(random.nextInt(partitions.size())));
+        }
+
+        FileStoreScan scan = OperationTestUtils.createScan(avro, pathFactory);
+        scan.withSnapshot(snapshot.id());
+        scan.withPartitionFilter(new ArrayList<>(wantedPartitions));
+
+        Map<BinaryRowData, BinaryRowData> expected =
+                OperationTestUtils.toKvMap(
+                        wantedPartitions.isEmpty()
+                                ? data
+                                : data.stream()
+                                        .filter(
+                                                kv ->
+                                                        wantedPartitions.contains(
+                                                                gen.getPartition(kv)))
+                                        .collect(Collectors.toList()));
+        runTest(scan, snapshot.id(), expected);
+    }
+
+    @RepeatedTest(10)
+    public void testWithBucket() throws Exception {
+        ThreadLocalRandom random = ThreadLocalRandom.current();
+        List<KeyValue> data = generateData(random.nextInt(1000) + 1);
+        Snapshot snapshot = writeData(data);
+
+        int wantedBucket = random.nextInt(NUM_BUCKETS);
+
+        FileStoreScan scan = OperationTestUtils.createScan(avro, pathFactory);
+        scan.withSnapshot(snapshot.id());
+        scan.withBucket(wantedBucket);
+
+        Map<BinaryRowData, BinaryRowData> expected =
+                OperationTestUtils.toKvMap(
+                        data.stream()
+                                .filter(kv -> getBucket(kv) == wantedBucket)
+                                .collect(Collectors.toList()));
+        runTest(scan, snapshot.id(), expected);
+    }
+
+    @RepeatedTest(10)
+    public void testWithSnapshot() throws Exception {
+        ThreadLocalRandom random = ThreadLocalRandom.current();
+        int numCommits = random.nextInt(10) + 1;
+        int wantedCommit = random.nextInt(numCommits);
+
+        List<Snapshot> snapshots = new ArrayList<>();
+        List<List<KeyValue>> allData = new ArrayList<>();
+        for (int i = 0; i < numCommits; i++) {
+            List<KeyValue> data = generateData(random.nextInt(100) + 1);
+            snapshots.add(writeData(data));
+            allData.add(data);
+        }
+        long wantedSnapshot = snapshots.get(wantedCommit).id();
+
+        FileStoreScan scan = OperationTestUtils.createScan(avro, pathFactory);
+        scan.withSnapshot(wantedSnapshot);
+
+        Map<BinaryRowData, BinaryRowData> expected =
+                OperationTestUtils.toKvMap(
+                        allData.subList(0, wantedCommit + 1).stream()
+                                .flatMap(Collection::stream)
+                                .collect(Collectors.toList()));
+        runTest(scan, wantedSnapshot, expected);
+    }
+
+    @RepeatedTest(10)
+    public void testWithManifestList() throws Exception {
+        ThreadLocalRandom random = ThreadLocalRandom.current();
+        int numCommits = random.nextInt(10) + 1;
+        for (int i = 0; i < numCommits; i++) {
+            List<KeyValue> data = generateData(random.nextInt(100) + 1);
+            writeData(data);
+        }
+
+        ManifestList manifestList =
+                new ManifestList.Factory(TestKeyValueGenerator.PARTITION_TYPE, avro, pathFactory)
+                        .create();
+        long wantedSnapshot = random.nextLong(pathFactory.latestSnapshotId()) + 1;
+        List<ManifestFileMeta> wantedManifests =
+                manifestList.read(
+                        Snapshot.fromPath(pathFactory.toSnapshotPath(wantedSnapshot))
+                                .manifestList());
+
+        FileStoreScan scan = OperationTestUtils.createScan(avro, pathFactory);
+        scan.withManifestList(wantedManifests);
+
+        List<KeyValue> expectedKvs =
+                OperationTestUtils.readKvsFromSnapshot(wantedSnapshot, avro, pathFactory);
+        gen.sort(expectedKvs);
+        Map<BinaryRowData, BinaryRowData> expected = OperationTestUtils.toKvMap(expectedKvs);
+        runTest(scan, null, expected);
+    }
+
+    private void runTest(
+            FileStoreScan scan, Long expectedSnapshotId, Map<BinaryRowData, BinaryRowData> expected)
+            throws Exception {
+        FileStoreScan.Plan plan = scan.plan();
+        assertThat(plan.snapshotId()).isEqualTo(expectedSnapshotId);
+
+        List<KeyValue> actualKvs =
+                OperationTestUtils.readKvsFromManifestEntries(plan.files(), avro, pathFactory);
+        gen.sort(actualKvs);
+        Map<BinaryRowData, BinaryRowData> actual = OperationTestUtils.toKvMap(actualKvs);
+        assertThat(actual).isEqualTo(expected);
+    }
+
+    private List<KeyValue> generateData(int numRecords) {
+        List<KeyValue> data = new ArrayList<>();
+        for (int i = 0; i < numRecords; i++) {
+            data.add(gen.next());
+        }
+        return data;
+    }
+
+    private Snapshot writeData(List<KeyValue> kvs) throws Exception {
+        FileStoreWrite write = OperationTestUtils.createWrite(avro, pathFactory);
+        Map<BinaryRowData, Map<Integer, RecordWriter>> writers = new HashMap<>();
+        for (KeyValue kv : kvs) {
+            BinaryRowData partition = gen.getPartition(kv);
+            int bucket = getBucket(kv);
+            writers.compute(partition, (p, m) -> m == null ? new HashMap<>() : m)
+                    .compute(
+                            bucket,
+                            (b, w) -> {
+                                if (w == null) {
+                                    ExecutorService service = Executors.newSingleThreadExecutor();
+                                    return write.createWriter(partition, bucket, service);
+                                } else {
+                                    return w;
+                                }
+                            })
+                    .write(kv.valueKind(), kv.key(), kv.value());
+        }
+
+        FileStoreCommit commit = OperationTestUtils.createCommit(avro, pathFactory);
+        ManifestCommittable committable = new ManifestCommittable();
+        for (Map.Entry<BinaryRowData, Map<Integer, RecordWriter>> entryWithPartition :
+                writers.entrySet()) {
+            for (Map.Entry<Integer, RecordWriter> entryWithBucket :
+                    entryWithPartition.getValue().entrySet()) {
+                Increment increment = entryWithBucket.getValue().prepareCommit();
+                committable.add(entryWithPartition.getKey(), entryWithBucket.getKey(), increment);
+            }
+        }
+        commit.commit(committable, Collections.emptyMap());
+        writers.values().stream()
+                .flatMap(m -> m.values().stream())
+                .forEach(
+                        w -> {
+                            try {
+                                w.close();
+                            } catch (Exception e) {
+                                throw new RuntimeException(e);
+                            }
+                        });
+
+        return Snapshot.fromPath(pathFactory.toSnapshotPath(pathFactory.latestSnapshotId()));
+    }
+
+    private int getBucket(KeyValue kv) {
+        return (kv.key().hashCode() % NUM_BUCKETS + NUM_BUCKETS) % NUM_BUCKETS;
+    }
+}
diff --git a/flink-table-store-core/src/test/java/org/apache/flink/table/store/file/operation/OperationTestUtils.java b/flink-table-store-core/src/test/java/org/apache/flink/table/store/file/operation/OperationTestUtils.java
new file mode 100644
index 0000000..bce303b
--- /dev/null
+++ b/flink-table-store-core/src/test/java/org/apache/flink/table/store/file/operation/OperationTestUtils.java
@@ -0,0 +1,190 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.store.file.operation;
+
+import org.apache.flink.configuration.Configuration;
+import org.apache.flink.configuration.MemorySize;
+import org.apache.flink.core.fs.Path;
+import org.apache.flink.table.data.binary.BinaryRowData;
+import org.apache.flink.table.store.file.FileFormat;
+import org.apache.flink.table.store.file.FileStoreOptions;
+import org.apache.flink.table.store.file.KeyValue;
+import org.apache.flink.table.store.file.TestKeyValueGenerator;
+import org.apache.flink.table.store.file.manifest.ManifestCommittableSerializer;
+import org.apache.flink.table.store.file.manifest.ManifestEntry;
+import org.apache.flink.table.store.file.manifest.ManifestFile;
+import org.apache.flink.table.store.file.manifest.ManifestList;
+import org.apache.flink.table.store.file.mergetree.MergeTreeFactory;
+import org.apache.flink.table.store.file.mergetree.MergeTreeOptions;
+import org.apache.flink.table.store.file.mergetree.compact.DeduplicateAccumulator;
+import org.apache.flink.table.store.file.mergetree.sst.SstFile;
+import org.apache.flink.table.store.file.utils.FileStorePathFactory;
+import org.apache.flink.table.store.file.utils.RecordReaderIterator;
+
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.UUID;
+import java.util.concurrent.ThreadLocalRandom;
+
+/** Utils for operation tests. */
+public class OperationTestUtils {
+
+    public static MergeTreeOptions getMergeTreeOptions(boolean forceCompact) {
+        Configuration conf = new Configuration();
+        conf.set(MergeTreeOptions.WRITE_BUFFER_SIZE, MemorySize.parse("16 kb"));
+        conf.set(MergeTreeOptions.PAGE_SIZE, MemorySize.parse("4 kb"));
+        conf.set(MergeTreeOptions.TARGET_FILE_SIZE, MemorySize.parse("1 kb"));
+        conf.set(MergeTreeOptions.COMMIT_FORCE_COMPACT, forceCompact);
+        return new MergeTreeOptions(conf);
+    }
+
+    private static FileStoreOptions getFileStoreOptions() {
+        Configuration conf = new Configuration();
+        conf.set(FileStoreOptions.BUCKET, 1);
+        conf.set(
+                FileStoreOptions.MANIFEST_TARGET_FILE_SIZE,
+                MemorySize.parse((ThreadLocalRandom.current().nextInt(16) + 1) + "kb"));
+        return new FileStoreOptions(conf);
+    }
+
+    public static FileStoreScan createScan(
+            FileFormat fileFormat, FileStorePathFactory pathFactory) {
+        return new FileStoreScanImpl(
+                TestKeyValueGenerator.PARTITION_TYPE,
+                pathFactory,
+                createManifestFileFactory(fileFormat, pathFactory),
+                createManifestListFactory(fileFormat, pathFactory));
+    }
+
+    public static FileStoreCommit createCommit(
+            FileFormat fileFormat, FileStorePathFactory pathFactory) {
+        ManifestCommittableSerializer serializer =
+                new ManifestCommittableSerializer(
+                        TestKeyValueGenerator.PARTITION_TYPE,
+                        TestKeyValueGenerator.KEY_TYPE,
+                        TestKeyValueGenerator.ROW_TYPE);
+        ManifestFile.Factory testManifestFileFactory =
+                createManifestFileFactory(fileFormat, pathFactory);
+        ManifestList.Factory testManifestListFactory =
+                createManifestListFactory(fileFormat, pathFactory);
+        return new FileStoreCommitImpl(
+                UUID.randomUUID().toString(),
+                serializer,
+                pathFactory,
+                testManifestFileFactory,
+                testManifestListFactory,
+                createScan(fileFormat, pathFactory),
+                getFileStoreOptions());
+    }
+
+    public static FileStoreWrite createWrite(
+            FileFormat fileFormat, FileStorePathFactory pathFactory) {
+        MergeTreeFactory mergeTreeFactory =
+                new MergeTreeFactory(
+                        TestKeyValueGenerator.KEY_TYPE,
+                        TestKeyValueGenerator.ROW_TYPE,
+                        TestKeyValueGenerator.KEY_COMPARATOR,
+                        new DeduplicateAccumulator(),
+                        fileFormat,
+                        pathFactory,
+                        OperationTestUtils.getMergeTreeOptions(false));
+        return new FileStoreWriteImpl(
+                pathFactory, mergeTreeFactory, createScan(fileFormat, pathFactory));
+    }
+
+    public static FileStorePathFactory createPathFactory(String scheme, String root) {
+        return new FileStorePathFactory(
+                new Path(scheme + "://" + root), TestKeyValueGenerator.PARTITION_TYPE, "default");
+    }
+
+    private static ManifestFile.Factory createManifestFileFactory(
+            FileFormat fileFormat, FileStorePathFactory pathFactory) {
+        return new ManifestFile.Factory(
+                TestKeyValueGenerator.PARTITION_TYPE,
+                TestKeyValueGenerator.KEY_TYPE,
+                TestKeyValueGenerator.ROW_TYPE,
+                fileFormat,
+                pathFactory);
+    }
+
+    private static ManifestList.Factory createManifestListFactory(
+            FileFormat fileFormat, FileStorePathFactory pathFactory) {
+        return new ManifestList.Factory(
+                TestKeyValueGenerator.PARTITION_TYPE, fileFormat, pathFactory);
+    }
+
+    public static List<KeyValue> readKvsFromSnapshot(
+            long snapshotId, FileFormat fileFormat, FileStorePathFactory pathFactory)
+            throws IOException {
+        List<ManifestEntry> entries =
+                createScan(fileFormat, pathFactory).withSnapshot(snapshotId).plan().files();
+        return readKvsFromManifestEntries(entries, fileFormat, pathFactory);
+    }
+
+    public static List<KeyValue> readKvsFromManifestEntries(
+            List<ManifestEntry> entries, FileFormat fileFormat, FileStorePathFactory pathFactory)
+            throws IOException {
+        List<KeyValue> kvs = new ArrayList<>();
+        SstFile.Factory sstFileFactory =
+                new SstFile.Factory(
+                        TestKeyValueGenerator.KEY_TYPE,
+                        TestKeyValueGenerator.ROW_TYPE,
+                        fileFormat,
+                        pathFactory,
+                        1024 * 1024 // not used
+                        );
+        for (ManifestEntry entry : entries) {
+            SstFile sstFile = sstFileFactory.create(entry.partition(), entry.bucket());
+            RecordReaderIterator iterator =
+                    new RecordReaderIterator(sstFile.read(entry.file().fileName()));
+            while (iterator.hasNext()) {
+                kvs.add(
+                        iterator.next()
+                                .copy(
+                                        TestKeyValueGenerator.KEY_SERIALIZER,
+                                        TestKeyValueGenerator.ROW_SERIALIZER));
+            }
+        }
+        return kvs;
+    }
+
+    public static Map<BinaryRowData, BinaryRowData> toKvMap(List<KeyValue> kvs) {
+        Map<BinaryRowData, BinaryRowData> result = new HashMap<>();
+        for (KeyValue kv : kvs) {
+            BinaryRowData key = TestKeyValueGenerator.KEY_SERIALIZER.toBinaryRow(kv.key()).copy();
+            BinaryRowData value =
+                    TestKeyValueGenerator.ROW_SERIALIZER.toBinaryRow(kv.value()).copy();
+            switch (kv.valueKind()) {
+                case ADD:
+                    result.put(key, value);
+                    break;
+                case DELETE:
+                    result.remove(key);
+                    break;
+                default:
+                    throw new UnsupportedOperationException(
+                            "Unknown value kind " + kv.valueKind().name());
+            }
+        }
+        return result;
+    }
+}
diff --git a/flink-table-store-core/src/test/java/org/apache/flink/table/store/file/operation/TestCommitThread.java b/flink-table-store-core/src/test/java/org/apache/flink/table/store/file/operation/TestCommitThread.java
index 334acfd..5d96cf5 100644
--- a/flink-table-store-core/src/test/java/org/apache/flink/table/store/file/operation/TestCommitThread.java
+++ b/flink-table-store-core/src/test/java/org/apache/flink/table/store/file/operation/TestCommitThread.java
@@ -19,22 +19,11 @@
 package org.apache.flink.table.store.file.operation;
 
 import org.apache.flink.configuration.Configuration;
-import org.apache.flink.configuration.MemorySize;
 import org.apache.flink.table.data.binary.BinaryRowData;
 import org.apache.flink.table.store.file.FileFormat;
-import org.apache.flink.table.store.file.FileStoreOptions;
 import org.apache.flink.table.store.file.KeyValue;
-import org.apache.flink.table.store.file.TestKeyValueGenerator;
 import org.apache.flink.table.store.file.manifest.ManifestCommittable;
-import org.apache.flink.table.store.file.manifest.ManifestCommittableSerializer;
-import org.apache.flink.table.store.file.manifest.ManifestEntry;
-import org.apache.flink.table.store.file.manifest.ManifestFile;
-import org.apache.flink.table.store.file.manifest.ManifestList;
-import org.apache.flink.table.store.file.mergetree.MergeTree;
-import org.apache.flink.table.store.file.mergetree.MergeTreeOptions;
 import org.apache.flink.table.store.file.mergetree.MergeTreeWriter;
-import org.apache.flink.table.store.file.mergetree.compact.DeduplicateAccumulator;
-import org.apache.flink.table.store.file.mergetree.sst.SstFile;
 import org.apache.flink.table.store.file.utils.FileStorePathFactory;
 
 import org.slf4j.Logger;
@@ -45,37 +34,19 @@ import java.util.Collections;
 import java.util.HashMap;
 import java.util.List;
 import java.util.Map;
-import java.util.UUID;
 import java.util.concurrent.ExecutorService;
 import java.util.concurrent.Executors;
 import java.util.concurrent.ThreadLocalRandom;
-import java.util.stream.Collectors;
 
 /** Testing {@link Thread}s to perform concurrent commits. */
 public class TestCommitThread extends Thread {
 
     private static final Logger LOG = LoggerFactory.getLogger(TestCommitThread.class);
 
-    private static final MergeTreeOptions MERGE_TREE_OPTIONS;
-    private static final long SUGGESTED_SST_FILE_SIZE = 1024;
-
-    static {
-        Configuration mergeTreeConf = new Configuration();
-        mergeTreeConf.set(MergeTreeOptions.WRITE_BUFFER_SIZE, MemorySize.parse("16 kb"));
-        mergeTreeConf.set(MergeTreeOptions.PAGE_SIZE, MemorySize.parse("4 kb"));
-        MERGE_TREE_OPTIONS = new MergeTreeOptions(mergeTreeConf);
-    }
-
-    private final FileFormat avro =
-            FileFormat.fromIdentifier(
-                    FileStoreCommitTestBase.class.getClassLoader(), "avro", new Configuration());
-
     private final Map<BinaryRowData, List<KeyValue>> data;
-    private final FileStorePathFactory safePathFactory;
-
     private final Map<BinaryRowData, MergeTreeWriter> writers;
 
-    private final FileStoreScan scan;
+    private final FileStoreWrite write;
     private final FileStoreCommit commit;
 
     public TestCommitThread(
@@ -83,53 +54,15 @@ public class TestCommitThread extends Thread {
             FileStorePathFactory testPathFactory,
             FileStorePathFactory safePathFactory) {
         this.data = data;
-        this.safePathFactory = safePathFactory;
-
         this.writers = new HashMap<>();
 
-        this.scan =
-                new FileStoreScanImpl(
-                        safePathFactory,
-                        createManifestFile(safePathFactory),
-                        createManifestList(safePathFactory));
-
-        ManifestCommittableSerializer serializer =
-                new ManifestCommittableSerializer(
-                        TestKeyValueGenerator.PARTITION_TYPE,
-                        TestKeyValueGenerator.KEY_TYPE,
-                        TestKeyValueGenerator.ROW_TYPE);
-        ManifestFile testManifestFile = createManifestFile(testPathFactory);
-        ManifestList testManifestList = createManifestList(testPathFactory);
-        Configuration fileStoreConf = new Configuration();
-        fileStoreConf.set(FileStoreOptions.BUCKET, 1);
-        fileStoreConf.set(
-                FileStoreOptions.MANIFEST_TARGET_FILE_SIZE,
-                MemorySize.parse((ThreadLocalRandom.current().nextInt(16) + 1) + "kb"));
-        FileStoreOptions fileStoreOptions = new FileStoreOptions(fileStoreConf);
-        FileStoreScanImpl testScan =
-                new FileStoreScanImpl(testPathFactory, testManifestFile, testManifestList);
-        this.commit =
-                new FileStoreCommitImpl(
-                        UUID.randomUUID().toString(),
-                        serializer,
-                        testPathFactory,
-                        testManifestFile,
-                        testManifestList,
-                        fileStoreOptions,
-                        testScan);
-    }
-
-    private ManifestFile createManifestFile(FileStorePathFactory pathFactory) {
-        return new ManifestFile(
-                TestKeyValueGenerator.PARTITION_TYPE,
-                TestKeyValueGenerator.KEY_TYPE,
-                TestKeyValueGenerator.ROW_TYPE,
-                avro,
-                pathFactory);
-    }
-
-    private ManifestList createManifestList(FileStorePathFactory pathFactory) {
-        return new ManifestList(TestKeyValueGenerator.PARTITION_TYPE, avro, pathFactory);
+        FileFormat avro =
+                FileFormat.fromIdentifier(
+                        FileStoreCommitTestBase.class.getClassLoader(),
+                        "avro",
+                        new Configuration());
+        this.write = OperationTestUtils.createWrite(avro, safePathFactory);
+        this.commit = OperationTestUtils.createCommit(avro, testPathFactory);
     }
 
     @Override
@@ -155,11 +88,7 @@ public class TestCommitThread extends Thread {
                     break;
                 } catch (Throwable e) {
                     if (LOG.isDebugEnabled()) {
-                        LOG.warn(
-                                "["
-                                        + Thread.currentThread().getName()
-                                        + "] Failed to commit because of exception, try again",
-                                e);
+                        LOG.warn("Failed to commit because of exception, try again", e);
                     }
                     writers.clear();
                     shouldCheckFilter = true;
@@ -214,13 +143,6 @@ public class TestCommitThread extends Thread {
     }
 
     private MergeTreeWriter createWriter(BinaryRowData partition) {
-        SstFile sstFile =
-                new SstFile(
-                        TestKeyValueGenerator.KEY_TYPE,
-                        TestKeyValueGenerator.ROW_TYPE,
-                        avro,
-                        safePathFactory.createSstPathFactory(partition, 0),
-                        SUGGESTED_SST_FILE_SIZE);
         ExecutorService service =
                 Executors.newSingleThreadExecutor(
                         r -> {
@@ -228,23 +150,6 @@ public class TestCommitThread extends Thread {
                             t.setName(Thread.currentThread().getName() + "-writer-service-pool");
                             return t;
                         });
-        MergeTree mergeTree =
-                new MergeTree(
-                        MERGE_TREE_OPTIONS,
-                        sstFile,
-                        TestKeyValueGenerator.KEY_COMPARATOR,
-                        service,
-                        new DeduplicateAccumulator());
-        Long latestSnapshotId = safePathFactory.latestSnapshotId();
-        if (latestSnapshotId == null) {
-            return (MergeTreeWriter) mergeTree.createWriter(Collections.emptyList());
-        } else {
-            return (MergeTreeWriter)
-                    mergeTree.createWriter(
-                            scan.withSnapshot(latestSnapshotId).plan().files().stream()
-                                    .filter(e -> partition.equals(e.partition()))
-                                    .map(ManifestEntry::file)
-                                    .collect(Collectors.toList()));
-        }
+        return (MergeTreeWriter) write.createWriter(partition, 0, service);
     }
 }