You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by lz...@apache.org on 2022/03/01 09:49:34 UTC

[flink-table-store] branch master updated: [hotfix] Refactor OperationTestUtils into TestFileStore and remove unnecessary repeated tests

This is an automated email from the ASF dual-hosted git repository.

lzljs3620320 pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/flink-table-store.git


The following commit(s) were added to refs/heads/master by this push:
     new 55385ae  [hotfix] Refactor OperationTestUtils into TestFileStore and remove unnecessary repeated tests
55385ae is described below

commit 55385ae8a556f87b22687a4f2a7ca93245ddcaba
Author: tsreaper <ts...@gmail.com>
AuthorDate: Tue Mar 1 17:49:27 2022 +0800

    [hotfix] Refactor OperationTestUtils into TestFileStore and remove unnecessary repeated tests
    
    This closes #27
---
 .../flink/table/store/file/FileStoreImpl.java      |   7 +-
 .../store/file/manifest/ManifestCommittable.java   |  12 +-
 .../store/file/operation/FileStoreCommitImpl.java  |  11 ++
 .../OperationTestUtils.java => TestFileStore.java} | 201 +++++++--------------
 .../store/file/operation/FileStoreCommitTest.java  |  62 ++++---
 .../store/file/operation/FileStoreExpireTest.java  |  58 +++---
 .../store/file/operation/FileStoreScanTest.java    |  62 +++----
 .../store/file/operation/TestCommitThread.java     |  15 +-
 8 files changed, 183 insertions(+), 245 deletions(-)

diff --git a/flink-table-store-core/src/main/java/org/apache/flink/table/store/file/FileStoreImpl.java b/flink-table-store-core/src/main/java/org/apache/flink/table/store/file/FileStoreImpl.java
index 52857fa..2fa5c48 100644
--- a/flink-table-store-core/src/main/java/org/apache/flink/table/store/file/FileStoreImpl.java
+++ b/flink-table-store-core/src/main/java/org/apache/flink/table/store/file/FileStoreImpl.java
@@ -18,6 +18,7 @@
 
 package org.apache.flink.table.store.file;
 
+import org.apache.flink.annotation.VisibleForTesting;
 import org.apache.flink.configuration.Configuration;
 import org.apache.flink.table.api.TableConfig;
 import org.apache.flink.table.planner.codegen.sort.SortCodeGenerator;
@@ -71,7 +72,8 @@ public class FileStoreImpl implements FileStore {
                         .generateRecordComparator("KeyComparator");
     }
 
-    private FileStorePathFactory pathFactory() {
+    @VisibleForTesting
+    public FileStorePathFactory pathFactory() {
         return new FileStorePathFactory(
                 options.path(), partitionType, options.partitionDefaultName());
     }
@@ -81,7 +83,8 @@ public class FileStoreImpl implements FileStore {
                 partitionType, keyType, valueType, options.manifestFormat(), pathFactory());
     }
 
-    private ManifestList.Factory manifestListFactory() {
+    @VisibleForTesting
+    public ManifestList.Factory manifestListFactory() {
         return new ManifestList.Factory(partitionType, options.manifestFormat(), pathFactory());
     }
 
diff --git a/flink-table-store-core/src/main/java/org/apache/flink/table/store/file/manifest/ManifestCommittable.java b/flink-table-store-core/src/main/java/org/apache/flink/table/store/file/manifest/ManifestCommittable.java
index a461c2d..339d1d9 100644
--- a/flink-table-store-core/src/main/java/org/apache/flink/table/store/file/manifest/ManifestCommittable.java
+++ b/flink-table-store-core/src/main/java/org/apache/flink/table/store/file/manifest/ManifestCommittable.java
@@ -126,16 +126,16 @@ public class ManifestCommittable {
 
     @Override
     public String toString() {
-        return "ManifestCommittable{"
-                + "identifier="
+        return "ManifestCommittable { "
+                + "identifier = "
                 + identifier
-                + ", logOffsets="
+                + ", logOffsets = "
                 + logOffsets
-                + ", newFiles="
+                + ", newFiles =\n"
                 + filesToString(newFiles)
-                + ", compactBefore="
+                + ", compactBefore =\n"
                 + filesToString(compactBefore)
-                + ", compactAfter="
+                + ", compactAfter =\n"
                 + filesToString(compactAfter)
                 + '}';
     }
diff --git a/flink-table-store-core/src/main/java/org/apache/flink/table/store/file/operation/FileStoreCommitImpl.java b/flink-table-store-core/src/main/java/org/apache/flink/table/store/file/operation/FileStoreCommitImpl.java
index fe20b45..d8f8211 100644
--- a/flink-table-store-core/src/main/java/org/apache/flink/table/store/file/operation/FileStoreCommitImpl.java
+++ b/flink-table-store-core/src/main/java/org/apache/flink/table/store/file/operation/FileStoreCommitImpl.java
@@ -378,6 +378,17 @@ public class FileStoreCommitImpl implements FileStoreCommit {
         }
 
         if (success) {
+            if (LOG.isDebugEnabled()) {
+                LOG.debug(
+                        String.format(
+                                "Successfully commit snapshot #%d (path %s) by user %s "
+                                        + "with identifier %s and kind %s.",
+                                newSnapshotId,
+                                newSnapshotPath.toString(),
+                                commitUser,
+                                identifier,
+                                commitKind.name()));
+            }
             return true;
         }
 
diff --git a/flink-table-store-core/src/test/java/org/apache/flink/table/store/file/operation/OperationTestUtils.java b/flink-table-store-core/src/test/java/org/apache/flink/table/store/file/TestFileStore.java
similarity index 56%
rename from flink-table-store-core/src/test/java/org/apache/flink/table/store/file/operation/OperationTestUtils.java
rename to flink-table-store-core/src/test/java/org/apache/flink/table/store/file/TestFileStore.java
index 0fe2fe0..38b4057 100644
--- a/flink-table-store-core/src/test/java/org/apache/flink/table/store/file/operation/OperationTestUtils.java
+++ b/flink-table-store-core/src/test/java/org/apache/flink/table/store/file/TestFileStore.java
@@ -16,31 +16,31 @@
  * limitations under the License.
  */
 
-package org.apache.flink.table.store.file.operation;
+package org.apache.flink.table.store.file;
 
 import org.apache.flink.configuration.Configuration;
 import org.apache.flink.configuration.MemorySize;
-import org.apache.flink.core.fs.Path;
 import org.apache.flink.table.data.binary.BinaryRowData;
-import org.apache.flink.table.store.file.FileFormat;
-import org.apache.flink.table.store.file.KeyValue;
-import org.apache.flink.table.store.file.Snapshot;
-import org.apache.flink.table.store.file.TestKeyValueGenerator;
+import org.apache.flink.table.runtime.typeutils.RowDataSerializer;
 import org.apache.flink.table.store.file.manifest.ManifestCommittable;
 import org.apache.flink.table.store.file.manifest.ManifestEntry;
-import org.apache.flink.table.store.file.manifest.ManifestFile;
-import org.apache.flink.table.store.file.manifest.ManifestList;
 import org.apache.flink.table.store.file.mergetree.Increment;
 import org.apache.flink.table.store.file.mergetree.MergeTreeOptions;
-import org.apache.flink.table.store.file.mergetree.compact.DeduplicateAccumulator;
+import org.apache.flink.table.store.file.mergetree.compact.Accumulator;
 import org.apache.flink.table.store.file.mergetree.sst.SstFileMeta;
-import org.apache.flink.table.store.file.utils.FailingAtomicRenameFileSystem;
+import org.apache.flink.table.store.file.operation.FileStoreCommit;
+import org.apache.flink.table.store.file.operation.FileStoreExpireImpl;
+import org.apache.flink.table.store.file.operation.FileStoreRead;
+import org.apache.flink.table.store.file.operation.FileStoreWrite;
 import org.apache.flink.table.store.file.utils.FileStorePathFactory;
 import org.apache.flink.table.store.file.utils.RecordReaderIterator;
 import org.apache.flink.table.store.file.utils.RecordWriter;
-import org.apache.flink.table.store.file.utils.TestAtomicRenameFileSystem;
+import org.apache.flink.table.types.logical.RowType;
 import org.apache.flink.util.function.QuadFunction;
 
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
 import java.io.IOException;
 import java.util.ArrayList;
 import java.util.Collections;
@@ -55,154 +55,93 @@ import java.util.concurrent.ThreadLocalRandom;
 import java.util.function.BiConsumer;
 import java.util.function.Function;
 
-/** Utils for operation tests. */
-public class OperationTestUtils {
+/** {@link FileStore} for tests. */
+public class TestFileStore extends FileStoreImpl {
+
+    private static final Logger LOG = LoggerFactory.getLogger(TestFileStore.class);
+
+    private final RowDataSerializer keySerializer;
+    private final RowDataSerializer valueSerializer;
 
-    public static MergeTreeOptions getMergeTreeOptions(boolean forceCompact) {
+    public static TestFileStore create(
+            String format,
+            String root,
+            int numBuckets,
+            RowType partitionType,
+            RowType keyType,
+            RowType valueType,
+            Accumulator accumulator) {
         Configuration conf = new Configuration();
+
         conf.set(MergeTreeOptions.WRITE_BUFFER_SIZE, MemorySize.parse("16 kb"));
         conf.set(MergeTreeOptions.PAGE_SIZE, MemorySize.parse("4 kb"));
         conf.set(MergeTreeOptions.TARGET_FILE_SIZE, MemorySize.parse("1 kb"));
-        conf.set(MergeTreeOptions.COMMIT_FORCE_COMPACT, forceCompact);
-        return new MergeTreeOptions(conf);
-    }
-
-    public static FileStoreScan createScan(
-            FileFormat fileFormat, FileStorePathFactory pathFactory) {
-        return new FileStoreScanImpl(
-                TestKeyValueGenerator.PARTITION_TYPE,
-                pathFactory,
-                createManifestFileFactory(fileFormat, pathFactory),
-                createManifestListFactory(fileFormat, pathFactory));
-    }
-
-    public static FileStoreCommit createCommit(
-            FileFormat fileFormat, FileStorePathFactory pathFactory) {
-        ManifestFile.Factory testManifestFileFactory =
-                createManifestFileFactory(fileFormat, pathFactory);
-        ManifestList.Factory testManifestListFactory =
-                createManifestListFactory(fileFormat, pathFactory);
-        return new FileStoreCommitImpl(
-                UUID.randomUUID().toString(),
-                TestKeyValueGenerator.PARTITION_TYPE,
-                pathFactory,
-                testManifestFileFactory,
-                testManifestListFactory,
-                createScan(fileFormat, pathFactory),
-                1,
-                MemorySize.parse((ThreadLocalRandom.current().nextInt(16) + 1) + "kb"),
-                30);
-    }
-
-    public static FileStoreWrite createWrite(
-            FileFormat fileFormat, FileStorePathFactory pathFactory) {
-        return new FileStoreWriteImpl(
-                TestKeyValueGenerator.KEY_TYPE,
-                TestKeyValueGenerator.ROW_TYPE,
-                TestKeyValueGenerator.KEY_COMPARATOR,
-                new DeduplicateAccumulator(),
-                fileFormat,
-                pathFactory,
-                createScan(fileFormat, pathFactory),
-                getMergeTreeOptions(false));
-    }
 
-    public static FileStoreExpire createExpire(
-            int numRetained,
-            long millisRetained,
-            FileFormat fileFormat,
-            FileStorePathFactory pathFactory) {
-        return new FileStoreExpireImpl(
-                numRetained,
-                millisRetained,
-                pathFactory,
-                createManifestListFactory(fileFormat, pathFactory),
-                createScan(fileFormat, pathFactory));
-    }
+        conf.set(
+                FileStoreOptions.MANIFEST_TARGET_FILE_SIZE,
+                MemorySize.parse((ThreadLocalRandom.current().nextInt(16) + 1) + "kb"));
 
-    public static FileStoreRead createRead(
-            FileFormat fileFormat, FileStorePathFactory pathFactory) {
-        return new FileStoreReadImpl(
-                TestKeyValueGenerator.KEY_TYPE,
-                TestKeyValueGenerator.ROW_TYPE,
-                TestKeyValueGenerator.KEY_COMPARATOR,
-                new DeduplicateAccumulator(),
-                fileFormat,
-                pathFactory);
-    }
+        conf.set(FileStoreOptions.FILE_FORMAT, format);
+        conf.set(FileStoreOptions.MANIFEST_FORMAT, format);
+        conf.set(FileStoreOptions.FILE_PATH, root);
+        conf.set(FileStoreOptions.BUCKET, numBuckets);
 
-    public static FileStorePathFactory createPathFactory(boolean failing, String root) {
-        String path =
-                failing
-                        ? FailingAtomicRenameFileSystem.getFailingPath(root)
-                        : TestAtomicRenameFileSystem.SCHEME + "://" + root;
-        return new FileStorePathFactory(
-                new Path(path), TestKeyValueGenerator.PARTITION_TYPE, "default");
+        return new TestFileStore(conf, partitionType, keyType, valueType, accumulator);
     }
 
-    private static ManifestFile.Factory createManifestFileFactory(
-            FileFormat fileFormat, FileStorePathFactory pathFactory) {
-        return new ManifestFile.Factory(
-                TestKeyValueGenerator.PARTITION_TYPE,
-                TestKeyValueGenerator.KEY_TYPE,
-                TestKeyValueGenerator.ROW_TYPE,
-                fileFormat,
-                pathFactory);
+    public TestFileStore(
+            Configuration conf,
+            RowType partitionType,
+            RowType keyType,
+            RowType valueType,
+            Accumulator accumulator) {
+        super(conf, UUID.randomUUID().toString(), partitionType, keyType, valueType, accumulator);
+        this.keySerializer = new RowDataSerializer(keyType);
+        this.valueSerializer = new RowDataSerializer(valueType);
     }
 
-    private static ManifestList.Factory createManifestListFactory(
-            FileFormat fileFormat, FileStorePathFactory pathFactory) {
-        return new ManifestList.Factory(
-                TestKeyValueGenerator.PARTITION_TYPE, fileFormat, pathFactory);
+    public FileStoreExpireImpl newExpire(int numRetained, long millisRetained) {
+        return new FileStoreExpireImpl(
+                numRetained, millisRetained, pathFactory(), manifestListFactory(), newScan());
     }
 
-    public static List<Snapshot> commitData(
+    public List<Snapshot> commitData(
             List<KeyValue> kvs,
             Function<KeyValue, BinaryRowData> partitionCalculator,
-            Function<KeyValue, Integer> bucketCalculator,
-            FileFormat fileFormat,
-            FileStorePathFactory pathFactory)
+            Function<KeyValue, Integer> bucketCalculator)
             throws Exception {
         return commitDataImpl(
                 kvs,
                 partitionCalculator,
                 bucketCalculator,
-                fileFormat,
-                pathFactory,
                 FileStoreWrite::createWriter,
                 (commit, committable) -> commit.commit(committable, Collections.emptyMap()));
     }
 
-    public static List<Snapshot> overwriteData(
+    public List<Snapshot> overwriteData(
             List<KeyValue> kvs,
             Function<KeyValue, BinaryRowData> partitionCalculator,
             Function<KeyValue, Integer> bucketCalculator,
-            FileFormat fileFormat,
-            FileStorePathFactory pathFactory,
             Map<String, String> partition)
             throws Exception {
         return commitDataImpl(
                 kvs,
                 partitionCalculator,
                 bucketCalculator,
-                fileFormat,
-                pathFactory,
                 FileStoreWrite::createEmptyWriter,
                 (commit, committable) ->
                         commit.overwrite(partition, committable, Collections.emptyMap()));
     }
 
-    private static List<Snapshot> commitDataImpl(
+    private List<Snapshot> commitDataImpl(
             List<KeyValue> kvs,
             Function<KeyValue, BinaryRowData> partitionCalculator,
             Function<KeyValue, Integer> bucketCalculator,
-            FileFormat fileFormat,
-            FileStorePathFactory pathFactory,
             QuadFunction<FileStoreWrite, BinaryRowData, Integer, ExecutorService, RecordWriter>
                     createWriterFunction,
             BiConsumer<FileStoreCommit, ManifestCommittable> commitFunction)
             throws Exception {
-        FileStoreWrite write = createWrite(fileFormat, pathFactory);
+        FileStoreWrite write = newWrite();
         Map<BinaryRowData, Map<Integer, RecordWriter>> writers = new HashMap<>();
         for (KeyValue kv : kvs) {
             BinaryRowData partition = partitionCalculator.apply(kv);
@@ -222,7 +161,7 @@ public class OperationTestUtils {
                     .write(kv.valueKind(), kv.key(), kv.value());
         }
 
-        FileStoreCommit commit = createCommit(fileFormat, pathFactory);
+        FileStoreCommit commit = newCommit();
         ManifestCommittable committable =
                 new ManifestCommittable(String.valueOf(new Random().nextLong()));
         for (Map.Entry<BinaryRowData, Map<Integer, RecordWriter>> entryWithPartition :
@@ -235,6 +174,7 @@ public class OperationTestUtils {
             }
         }
 
+        FileStorePathFactory pathFactory = pathFactory();
         Long snapshotIdBeforeCommit = pathFactory.latestSnapshotId();
         if (snapshotIdBeforeCommit == null) {
             snapshotIdBeforeCommit = Snapshot.FIRST_SNAPSHOT_ID - 1;
@@ -263,17 +203,19 @@ public class OperationTestUtils {
         return snapshots;
     }
 
-    public static List<KeyValue> readKvsFromSnapshot(
-            long snapshotId, FileFormat fileFormat, FileStorePathFactory pathFactory)
-            throws IOException {
-        List<ManifestEntry> entries =
-                createScan(fileFormat, pathFactory).withSnapshot(snapshotId).plan().files();
-        return readKvsFromManifestEntries(entries, fileFormat, pathFactory);
+    public List<KeyValue> readKvsFromSnapshot(long snapshotId) throws IOException {
+        List<ManifestEntry> entries = newScan().withSnapshot(snapshotId).plan().files();
+        return readKvsFromManifestEntries(entries);
     }
 
-    public static List<KeyValue> readKvsFromManifestEntries(
-            List<ManifestEntry> entries, FileFormat fileFormat, FileStorePathFactory pathFactory)
+    public List<KeyValue> readKvsFromManifestEntries(List<ManifestEntry> entries)
             throws IOException {
+        if (LOG.isDebugEnabled()) {
+            for (ManifestEntry entry : entries) {
+                LOG.debug("reading from " + entry.toString());
+            }
+        }
+
         Map<BinaryRowData, Map<Integer, List<SstFileMeta>>> filesPerPartitionAndBucket =
                 new HashMap<>();
         for (ManifestEntry entry : entries) {
@@ -284,7 +226,7 @@ public class OperationTestUtils {
         }
 
         List<KeyValue> kvs = new ArrayList<>();
-        FileStoreRead read = createRead(fileFormat, pathFactory);
+        FileStoreRead read = newRead();
         for (Map.Entry<BinaryRowData, Map<Integer, List<SstFileMeta>>> entryWithPartition :
                 filesPerPartitionAndBucket.entrySet()) {
             for (Map.Entry<Integer, List<SstFileMeta>> entryWithBucket :
@@ -296,23 +238,18 @@ public class OperationTestUtils {
                                         entryWithBucket.getKey(),
                                         entryWithBucket.getValue()));
                 while (iterator.hasNext()) {
-                    kvs.add(
-                            iterator.next()
-                                    .copy(
-                                            TestKeyValueGenerator.KEY_SERIALIZER,
-                                            TestKeyValueGenerator.ROW_SERIALIZER));
+                    kvs.add(iterator.next().copy(keySerializer, valueSerializer));
                 }
             }
         }
         return kvs;
     }
 
-    public static Map<BinaryRowData, BinaryRowData> toKvMap(List<KeyValue> kvs) {
+    public Map<BinaryRowData, BinaryRowData> toKvMap(List<KeyValue> kvs) {
         Map<BinaryRowData, BinaryRowData> result = new HashMap<>();
         for (KeyValue kv : kvs) {
-            BinaryRowData key = TestKeyValueGenerator.KEY_SERIALIZER.toBinaryRow(kv.key()).copy();
-            BinaryRowData value =
-                    TestKeyValueGenerator.ROW_SERIALIZER.toBinaryRow(kv.value()).copy();
+            BinaryRowData key = keySerializer.toBinaryRow(kv.key()).copy();
+            BinaryRowData value = valueSerializer.toBinaryRow(kv.value()).copy();
             switch (kv.valueKind()) {
                 case ADD:
                     result.put(key, value);
diff --git a/flink-table-store-core/src/test/java/org/apache/flink/table/store/file/operation/FileStoreCommitTest.java b/flink-table-store-core/src/test/java/org/apache/flink/table/store/file/operation/FileStoreCommitTest.java
index 136dff1..c558ed4 100644
--- a/flink-table-store-core/src/test/java/org/apache/flink/table/store/file/operation/FileStoreCommitTest.java
+++ b/flink-table-store-core/src/test/java/org/apache/flink/table/store/file/operation/FileStoreCommitTest.java
@@ -18,15 +18,15 @@
 
 package org.apache.flink.table.store.file.operation;
 
-import org.apache.flink.configuration.Configuration;
 import org.apache.flink.core.fs.Path;
 import org.apache.flink.table.data.binary.BinaryRowData;
-import org.apache.flink.table.store.file.FileFormat;
 import org.apache.flink.table.store.file.KeyValue;
+import org.apache.flink.table.store.file.TestFileStore;
 import org.apache.flink.table.store.file.TestKeyValueGenerator;
 import org.apache.flink.table.store.file.ValueKind;
+import org.apache.flink.table.store.file.mergetree.compact.DeduplicateAccumulator;
 import org.apache.flink.table.store.file.utils.FailingAtomicRenameFileSystem;
-import org.apache.flink.table.store.file.utils.FileStorePathFactory;
+import org.apache.flink.table.store.file.utils.TestAtomicRenameFileSystem;
 
 import org.junit.jupiter.api.BeforeEach;
 import org.junit.jupiter.api.Test;
@@ -53,10 +53,6 @@ public class FileStoreCommitTest {
 
     private static final Logger LOG = LoggerFactory.getLogger(FileStoreCommitTest.class);
 
-    private final FileFormat avro =
-            FileFormat.fromIdentifier(
-                    FileStoreCommitTest.class.getClassLoader(), "avro", new Configuration());
-
     private TestKeyValueGenerator gen;
     @TempDir java.nio.file.Path tempDir;
 
@@ -108,13 +104,13 @@ public class FileStoreCommitTest {
         for (int i = 0; i < numThreads; i++) {
             TestCommitThread thread =
                     new TestCommitThread(
-                            dataPerThread.get(i),
-                            OperationTestUtils.createPathFactory(failing, tempDir.toString()),
-                            OperationTestUtils.createPathFactory(false, tempDir.toString()));
+                            dataPerThread.get(i), createStore(failing), createStore(false));
             thread.start();
             threads.add(thread);
         }
 
+        TestFileStore store = createStore(false);
+
         // calculate expected results
         Map<BinaryRowData, List<KeyValue>> threadResults = new HashMap<>();
         for (TestCommitThread thread : threads) {
@@ -124,21 +120,18 @@ public class FileStoreCommitTest {
             }
         }
         Map<BinaryRowData, BinaryRowData> expected =
-                OperationTestUtils.toKvMap(
+                store.toKvMap(
                         threadResults.values().stream()
                                 .flatMap(Collection::stream)
                                 .collect(Collectors.toList()));
 
         // read actual data and compare
-        FileStorePathFactory safePathFactory =
-                OperationTestUtils.createPathFactory(false, tempDir.toString());
-        Long snapshotId = safePathFactory.latestSnapshotId();
+        Long snapshotId = store.pathFactory().latestSnapshotId();
         assertThat(snapshotId).isNotNull();
-        List<KeyValue> actualKvs =
-                OperationTestUtils.readKvsFromSnapshot(snapshotId, avro, safePathFactory);
+        List<KeyValue> actualKvs = store.readKvsFromSnapshot(snapshotId);
         gen.sort(actualKvs);
         logData(() -> actualKvs, "raw read results");
-        Map<BinaryRowData, BinaryRowData> actual = OperationTestUtils.toKvMap(actualKvs);
+        Map<BinaryRowData, BinaryRowData> actual = store.toKvMap(actualKvs);
         logData(() -> kvMapToKvList(expected), "expected");
         logData(() -> kvMapToKvList(actual), "actual");
         assertThat(actual).isEqualTo(expected);
@@ -155,14 +148,11 @@ public class FileStoreCommitTest {
                                 .collect(Collectors.toList()),
                 "data1");
 
-        FileStorePathFactory pathFactory =
-                OperationTestUtils.createPathFactory(false, tempDir.toString());
-        OperationTestUtils.commitData(
+        TestFileStore store = createStore(false);
+        store.commitData(
                 data1.values().stream().flatMap(Collection::stream).collect(Collectors.toList()),
                 gen::getPartition,
-                kv -> 0,
-                avro,
-                pathFactory);
+                kv -> 0);
 
         ThreadLocalRandom random = ThreadLocalRandom.current();
         String dtToOverwrite =
@@ -186,12 +176,10 @@ public class FileStoreCommitTest {
                                 .flatMap(Collection::stream)
                                 .collect(Collectors.toList()),
                 "data2");
-        OperationTestUtils.overwriteData(
+        store.overwriteData(
                 data2.values().stream().flatMap(Collection::stream).collect(Collectors.toList()),
                 gen::getPartition,
                 kv -> 0,
-                avro,
-                pathFactory,
                 partitionToOverwrite);
 
         List<KeyValue> expectedKvs = new ArrayList<>();
@@ -203,19 +191,33 @@ public class FileStoreCommitTest {
         }
         data2.values().forEach(expectedKvs::addAll);
         gen.sort(expectedKvs);
-        Map<BinaryRowData, BinaryRowData> expected = OperationTestUtils.toKvMap(expectedKvs);
+        Map<BinaryRowData, BinaryRowData> expected = store.toKvMap(expectedKvs);
 
         List<KeyValue> actualKvs =
-                OperationTestUtils.readKvsFromSnapshot(
-                        pathFactory.latestSnapshotId(), avro, pathFactory);
+                store.readKvsFromSnapshot(store.pathFactory().latestSnapshotId());
         gen.sort(actualKvs);
-        Map<BinaryRowData, BinaryRowData> actual = OperationTestUtils.toKvMap(actualKvs);
+        Map<BinaryRowData, BinaryRowData> actual = store.toKvMap(actualKvs);
 
         logData(() -> kvMapToKvList(expected), "expected");
         logData(() -> kvMapToKvList(actual), "actual");
         assertThat(actual).isEqualTo(expected);
     }
 
+    private TestFileStore createStore(boolean failing) {
+        String root =
+                failing
+                        ? FailingAtomicRenameFileSystem.getFailingPath(tempDir.toString())
+                        : TestAtomicRenameFileSystem.SCHEME + "://" + tempDir.toString();
+        return TestFileStore.create(
+                "avro",
+                root,
+                1,
+                TestKeyValueGenerator.PARTITION_TYPE,
+                TestKeyValueGenerator.KEY_TYPE,
+                TestKeyValueGenerator.ROW_TYPE,
+                new DeduplicateAccumulator());
+    }
+
     private Map<BinaryRowData, List<KeyValue>> generateData(int numRecords) {
         Map<BinaryRowData, List<KeyValue>> data = new HashMap<>();
         for (int i = 0; i < numRecords; i++) {
diff --git a/flink-table-store-core/src/test/java/org/apache/flink/table/store/file/operation/FileStoreExpireTest.java b/flink-table-store-core/src/test/java/org/apache/flink/table/store/file/operation/FileStoreExpireTest.java
index 54ba19f..e007ca0 100644
--- a/flink-table-store-core/src/test/java/org/apache/flink/table/store/file/operation/FileStoreExpireTest.java
+++ b/flink-table-store-core/src/test/java/org/apache/flink/table/store/file/operation/FileStoreExpireTest.java
@@ -18,14 +18,14 @@
 
 package org.apache.flink.table.store.file.operation;
 
-import org.apache.flink.configuration.Configuration;
 import org.apache.flink.core.fs.FileSystem;
 import org.apache.flink.core.fs.Path;
 import org.apache.flink.table.data.binary.BinaryRowData;
-import org.apache.flink.table.store.file.FileFormat;
 import org.apache.flink.table.store.file.KeyValue;
 import org.apache.flink.table.store.file.Snapshot;
+import org.apache.flink.table.store.file.TestFileStore;
 import org.apache.flink.table.store.file.TestKeyValueGenerator;
+import org.apache.flink.table.store.file.mergetree.compact.DeduplicateAccumulator;
 import org.apache.flink.table.store.file.utils.FileStorePathFactory;
 
 import org.junit.jupiter.api.BeforeEach;
@@ -43,26 +43,31 @@ import static org.assertj.core.api.Assertions.assertThat;
 /** Tests for {@link FileStoreExpireImpl}. */
 public class FileStoreExpireTest {
 
-    private final FileFormat avro =
-            FileFormat.fromIdentifier(
-                    FileStoreCommitTest.class.getClassLoader(), "avro", new Configuration());
-
     private TestKeyValueGenerator gen;
     @TempDir java.nio.file.Path tempDir;
+    private TestFileStore store;
     private FileStorePathFactory pathFactory;
 
     @BeforeEach
     public void beforeEach() throws IOException {
         gen = new TestKeyValueGenerator();
-        pathFactory = OperationTestUtils.createPathFactory(false, tempDir.toString());
         Path root = new Path(tempDir.toString());
         root.getFileSystem().mkdirs(new Path(root + "/snapshot"));
+        store =
+                TestFileStore.create(
+                        "avro",
+                        tempDir.toString(),
+                        1,
+                        TestKeyValueGenerator.PARTITION_TYPE,
+                        TestKeyValueGenerator.KEY_TYPE,
+                        TestKeyValueGenerator.ROW_TYPE,
+                        new DeduplicateAccumulator());
+        pathFactory = store.pathFactory();
     }
 
     @Test
     public void testNoSnapshot() {
-        FileStoreExpire expire =
-                OperationTestUtils.createExpire(3, Long.MAX_VALUE, avro, pathFactory);
+        FileStoreExpire expire = store.newExpire(3, Long.MAX_VALUE);
         expire.expire();
 
         assertThat(pathFactory.latestSnapshotId()).isNull();
@@ -74,9 +79,7 @@ public class FileStoreExpireTest {
         List<Integer> snapshotPositions = new ArrayList<>();
         commit(2, allData, snapshotPositions);
         int latestSnapshotId = pathFactory.latestSnapshotId().intValue();
-        FileStoreExpire expire =
-                OperationTestUtils.createExpire(
-                        latestSnapshotId + 1, Long.MAX_VALUE, avro, pathFactory);
+        FileStoreExpire expire = store.newExpire(latestSnapshotId + 1, Long.MAX_VALUE);
         expire.expire();
 
         FileSystem fs = pathFactory.toSnapshotPath(latestSnapshotId).getFileSystem();
@@ -92,9 +95,7 @@ public class FileStoreExpireTest {
         List<Integer> snapshotPositions = new ArrayList<>();
         commit(5, allData, snapshotPositions);
         int latestSnapshotId = pathFactory.latestSnapshotId().intValue();
-        FileStoreExpire expire =
-                OperationTestUtils.createExpire(
-                        Integer.MAX_VALUE, Long.MAX_VALUE, avro, pathFactory);
+        FileStoreExpire expire = store.newExpire(Integer.MAX_VALUE, Long.MAX_VALUE);
         expire.expire();
 
         FileSystem fs = pathFactory.toSnapshotPath(latestSnapshotId).getFileSystem();
@@ -111,8 +112,7 @@ public class FileStoreExpireTest {
         commit(3, allData, snapshotPositions);
         int latestSnapshotId = pathFactory.latestSnapshotId().intValue();
         Thread.sleep(100);
-        FileStoreExpire expire =
-                OperationTestUtils.createExpire(Integer.MAX_VALUE, 1, avro, pathFactory);
+        FileStoreExpire expire = store.newExpire(Integer.MAX_VALUE, 1);
         expire.expire();
 
         FileSystem fs = pathFactory.toSnapshotPath(latestSnapshotId).getFileSystem();
@@ -125,8 +125,7 @@ public class FileStoreExpireTest {
 
     @Test
     public void testExpireWithNumber() throws Exception {
-        FileStoreExpire expire =
-                OperationTestUtils.createExpire(3, Long.MAX_VALUE, avro, pathFactory);
+        FileStoreExpire expire = store.newExpire(3, Long.MAX_VALUE);
 
         List<KeyValue> allData = new ArrayList<>();
         List<Integer> snapshotPositions = new ArrayList<>();
@@ -149,8 +148,7 @@ public class FileStoreExpireTest {
 
     @Test
     public void testExpireWithTime() throws Exception {
-        FileStoreExpire expire =
-                OperationTestUtils.createExpire(Integer.MAX_VALUE, 1000, avro, pathFactory);
+        FileStoreExpire expire = store.newExpire(Integer.MAX_VALUE, 1000);
 
         List<KeyValue> allData = new ArrayList<>();
         List<Integer> snapshotPositions = new ArrayList<>();
@@ -183,9 +181,7 @@ public class FileStoreExpireTest {
                 data.add(gen.next());
             }
             allData.addAll(data);
-            List<Snapshot> snapshots =
-                    OperationTestUtils.commitData(
-                            data, gen::getPartition, kv -> 0, avro, pathFactory);
+            List<Snapshot> snapshots = store.commitData(data, gen::getPartition, kv -> 0);
             for (int j = 0; j < snapshots.size(); j++) {
                 snapshotPositions.add(allData.size());
             }
@@ -196,18 +192,12 @@ public class FileStoreExpireTest {
             int snapshotId, List<KeyValue> allData, List<Integer> snapshotPositions)
             throws Exception {
         Map<BinaryRowData, BinaryRowData> expected =
-                OperationTestUtils.toKvMap(
-                        allData.subList(0, snapshotPositions.get(snapshotId - 1)));
+                store.toKvMap(allData.subList(0, snapshotPositions.get(snapshotId - 1)));
         List<KeyValue> actualKvs =
-                OperationTestUtils.readKvsFromManifestEntries(
-                        OperationTestUtils.createScan(avro, pathFactory)
-                                .withSnapshot(snapshotId)
-                                .plan()
-                                .files(),
-                        avro,
-                        pathFactory);
+                store.readKvsFromManifestEntries(
+                        store.newScan().withSnapshot(snapshotId).plan().files());
         gen.sort(actualKvs);
-        Map<BinaryRowData, BinaryRowData> actual = OperationTestUtils.toKvMap(actualKvs);
+        Map<BinaryRowData, BinaryRowData> actual = store.toKvMap(actualKvs);
         assertThat(actual).isEqualTo(expected);
     }
 }
diff --git a/flink-table-store-core/src/test/java/org/apache/flink/table/store/file/operation/FileStoreScanTest.java b/flink-table-store-core/src/test/java/org/apache/flink/table/store/file/operation/FileStoreScanTest.java
index 6f21c9e..bde976a 100644
--- a/flink-table-store-core/src/test/java/org/apache/flink/table/store/file/operation/FileStoreScanTest.java
+++ b/flink-table-store-core/src/test/java/org/apache/flink/table/store/file/operation/FileStoreScanTest.java
@@ -18,19 +18,19 @@
 
 package org.apache.flink.table.store.file.operation;
 
-import org.apache.flink.configuration.Configuration;
 import org.apache.flink.core.fs.Path;
 import org.apache.flink.table.data.binary.BinaryRowData;
-import org.apache.flink.table.store.file.FileFormat;
 import org.apache.flink.table.store.file.KeyValue;
 import org.apache.flink.table.store.file.Snapshot;
+import org.apache.flink.table.store.file.TestFileStore;
 import org.apache.flink.table.store.file.TestKeyValueGenerator;
 import org.apache.flink.table.store.file.manifest.ManifestFileMeta;
 import org.apache.flink.table.store.file.manifest.ManifestList;
+import org.apache.flink.table.store.file.mergetree.compact.DeduplicateAccumulator;
 import org.apache.flink.table.store.file.utils.FileStorePathFactory;
 
 import org.junit.jupiter.api.BeforeEach;
-import org.junit.jupiter.api.RepeatedTest;
+import org.junit.jupiter.api.Test;
 import org.junit.jupiter.api.io.TempDir;
 
 import java.io.IOException;
@@ -50,23 +50,29 @@ public class FileStoreScanTest {
 
     private static final int NUM_BUCKETS = 10;
 
-    private final FileFormat avro =
-            FileFormat.fromIdentifier(
-                    FileStoreCommitTest.class.getClassLoader(), "avro", new Configuration());
-
     private TestKeyValueGenerator gen;
     @TempDir java.nio.file.Path tempDir;
+    private TestFileStore store;
     private FileStorePathFactory pathFactory;
 
     @BeforeEach
     public void beforeEach() throws IOException {
         gen = new TestKeyValueGenerator();
-        pathFactory = OperationTestUtils.createPathFactory(false, tempDir.toString());
         Path root = new Path(tempDir.toString());
         root.getFileSystem().mkdirs(new Path(root + "/snapshot"));
+        store =
+                TestFileStore.create(
+                        "avro",
+                        tempDir.toString(),
+                        NUM_BUCKETS,
+                        TestKeyValueGenerator.PARTITION_TYPE,
+                        TestKeyValueGenerator.KEY_TYPE,
+                        TestKeyValueGenerator.ROW_TYPE,
+                        new DeduplicateAccumulator());
+        pathFactory = store.pathFactory();
     }
 
-    @RepeatedTest(10)
+    @Test
     public void testWithPartitionFilter() throws Exception {
         ThreadLocalRandom random = ThreadLocalRandom.current();
         List<KeyValue> data = generateData(random.nextInt(1000) + 1);
@@ -82,12 +88,12 @@ public class FileStoreScanTest {
             wantedPartitions.add(partitions.get(random.nextInt(partitions.size())));
         }
 
-        FileStoreScan scan = OperationTestUtils.createScan(avro, pathFactory);
+        FileStoreScan scan = store.newScan();
         scan.withSnapshot(snapshot.id());
         scan.withPartitionFilter(new ArrayList<>(wantedPartitions));
 
         Map<BinaryRowData, BinaryRowData> expected =
-                OperationTestUtils.toKvMap(
+                store.toKvMap(
                         wantedPartitions.isEmpty()
                                 ? data
                                 : data.stream()
@@ -99,7 +105,7 @@ public class FileStoreScanTest {
         runTest(scan, snapshot.id(), expected);
     }
 
-    @RepeatedTest(10)
+    @Test
     public void testWithBucket() throws Exception {
         ThreadLocalRandom random = ThreadLocalRandom.current();
         List<KeyValue> data = generateData(random.nextInt(1000) + 1);
@@ -107,19 +113,19 @@ public class FileStoreScanTest {
 
         int wantedBucket = random.nextInt(NUM_BUCKETS);
 
-        FileStoreScan scan = OperationTestUtils.createScan(avro, pathFactory);
+        FileStoreScan scan = store.newScan();
         scan.withSnapshot(snapshot.id());
         scan.withBucket(wantedBucket);
 
         Map<BinaryRowData, BinaryRowData> expected =
-                OperationTestUtils.toKvMap(
+                store.toKvMap(
                         data.stream()
                                 .filter(kv -> getBucket(kv) == wantedBucket)
                                 .collect(Collectors.toList()));
         runTest(scan, snapshot.id(), expected);
     }
 
-    @RepeatedTest(10)
+    @Test
     public void testWithSnapshot() throws Exception {
         ThreadLocalRandom random = ThreadLocalRandom.current();
         int numCommits = random.nextInt(10) + 1;
@@ -134,18 +140,18 @@ public class FileStoreScanTest {
         }
         long wantedSnapshot = snapshots.get(wantedCommit).id();
 
-        FileStoreScan scan = OperationTestUtils.createScan(avro, pathFactory);
+        FileStoreScan scan = store.newScan();
         scan.withSnapshot(wantedSnapshot);
 
         Map<BinaryRowData, BinaryRowData> expected =
-                OperationTestUtils.toKvMap(
+                store.toKvMap(
                         allData.subList(0, wantedCommit + 1).stream()
                                 .flatMap(Collection::stream)
                                 .collect(Collectors.toList()));
         runTest(scan, wantedSnapshot, expected);
     }
 
-    @RepeatedTest(10)
+    @Test
     public void testWithManifestList() throws Exception {
         ThreadLocalRandom random = ThreadLocalRandom.current();
         int numCommits = random.nextInt(10) + 1;
@@ -154,22 +160,19 @@ public class FileStoreScanTest {
             writeData(data);
         }
 
-        ManifestList manifestList =
-                new ManifestList.Factory(TestKeyValueGenerator.PARTITION_TYPE, avro, pathFactory)
-                        .create();
+        ManifestList manifestList = store.manifestListFactory().create();
         long wantedSnapshot = random.nextLong(pathFactory.latestSnapshotId()) + 1;
         List<ManifestFileMeta> wantedManifests =
                 manifestList.read(
                         Snapshot.fromPath(pathFactory.toSnapshotPath(wantedSnapshot))
                                 .manifestList());
 
-        FileStoreScan scan = OperationTestUtils.createScan(avro, pathFactory);
+        FileStoreScan scan = store.newScan();
         scan.withManifestList(wantedManifests);
 
-        List<KeyValue> expectedKvs =
-                OperationTestUtils.readKvsFromSnapshot(wantedSnapshot, avro, pathFactory);
+        List<KeyValue> expectedKvs = store.readKvsFromSnapshot(wantedSnapshot);
         gen.sort(expectedKvs);
-        Map<BinaryRowData, BinaryRowData> expected = OperationTestUtils.toKvMap(expectedKvs);
+        Map<BinaryRowData, BinaryRowData> expected = store.toKvMap(expectedKvs);
         runTest(scan, null, expected);
     }
 
@@ -179,10 +182,9 @@ public class FileStoreScanTest {
         FileStoreScan.Plan plan = scan.plan();
         assertThat(plan.snapshotId()).isEqualTo(expectedSnapshotId);
 
-        List<KeyValue> actualKvs =
-                OperationTestUtils.readKvsFromManifestEntries(plan.files(), avro, pathFactory);
+        List<KeyValue> actualKvs = store.readKvsFromManifestEntries(plan.files());
         gen.sort(actualKvs);
-        Map<BinaryRowData, BinaryRowData> actual = OperationTestUtils.toKvMap(actualKvs);
+        Map<BinaryRowData, BinaryRowData> actual = store.toKvMap(actualKvs);
         assertThat(actual).isEqualTo(expected);
     }
 
@@ -195,9 +197,7 @@ public class FileStoreScanTest {
     }
 
     private Snapshot writeData(List<KeyValue> kvs) throws Exception {
-        List<Snapshot> snapshots =
-                OperationTestUtils.commitData(
-                        kvs, gen::getPartition, this::getBucket, avro, pathFactory);
+        List<Snapshot> snapshots = store.commitData(kvs, gen::getPartition, this::getBucket);
         return snapshots.get(snapshots.size() - 1);
     }
 
diff --git a/flink-table-store-core/src/test/java/org/apache/flink/table/store/file/operation/TestCommitThread.java b/flink-table-store-core/src/test/java/org/apache/flink/table/store/file/operation/TestCommitThread.java
index c6b6501..7ed31a8 100644
--- a/flink-table-store-core/src/test/java/org/apache/flink/table/store/file/operation/TestCommitThread.java
+++ b/flink-table-store-core/src/test/java/org/apache/flink/table/store/file/operation/TestCommitThread.java
@@ -18,14 +18,12 @@
 
 package org.apache.flink.table.store.file.operation;
 
-import org.apache.flink.configuration.Configuration;
 import org.apache.flink.table.data.binary.BinaryRowData;
-import org.apache.flink.table.store.file.FileFormat;
 import org.apache.flink.table.store.file.KeyValue;
+import org.apache.flink.table.store.file.TestFileStore;
 import org.apache.flink.table.store.file.TestKeyValueGenerator;
 import org.apache.flink.table.store.file.manifest.ManifestCommittable;
 import org.apache.flink.table.store.file.mergetree.MergeTreeWriter;
-import org.apache.flink.table.store.file.utils.FileStorePathFactory;
 
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
@@ -54,17 +52,14 @@ public class TestCommitThread extends Thread {
 
     public TestCommitThread(
             Map<BinaryRowData, List<KeyValue>> data,
-            FileStorePathFactory testPathFactory,
-            FileStorePathFactory safePathFactory) {
+            TestFileStore testStore,
+            TestFileStore safeStore) {
         this.data = data;
         this.result = new HashMap<>();
         this.writers = new HashMap<>();
 
-        FileFormat avro =
-                FileFormat.fromIdentifier(
-                        FileStoreCommitTest.class.getClassLoader(), "avro", new Configuration());
-        this.write = OperationTestUtils.createWrite(avro, safePathFactory);
-        this.commit = OperationTestUtils.createCommit(avro, testPathFactory);
+        this.write = safeStore.newWrite();
+        this.commit = testStore.newCommit();
     }
 
     public Map<BinaryRowData, List<KeyValue>> getResult() {