You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by lz...@apache.org on 2022/01/13 08:55:22 UTC

[flink-table-store] 06/06: [FLINK-25628] Introduce SstFile and SstFileMeta

This is an automated email from the ASF dual-hosted git repository.

lzljs3620320 pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/flink-table-store.git

commit 7ac835ffe828a511689ac080d90d2a729aa08d89
Author: JingsongLi <lz...@aliyun.com>
AuthorDate: Wed Jan 12 19:56:26 2022 +0800

    [FLINK-25628] Introduce SstFile and SstFileMeta
    
    This closes #4
    
    Co-authored-by: tsreaper <ts...@gmail.com>
---
 .../table/store/file/mergetree/sst/SstFile.java    | 269 +++++++++++++++++++++
 .../store/file/mergetree/sst/SstFileMeta.java      | 186 ++++++++++++++
 .../file/mergetree/sst/SstFileMetaSerializer.java  |  69 ++++++
 .../flink/table/store/file/utils/FileUtils.java    |  76 ++++++
 .../mergetree/sst/SstFileMetaSerializerTest.java   |  39 +++
 .../store/file/mergetree/sst/SstFileTest.java      | 245 +++++++++++++++++++
 .../file/mergetree/sst/SstTestDataGenerator.java   | 178 ++++++++++++++
 .../file/utils/FailingAtomicRenameFileSystem.java  | 143 +++++++++++
 .../store/file/utils/FileStorePathFactory.java     | 112 +++++++++
 .../file/utils/TestAtomicRenameFileSystem.java     | 121 +++++++++
 .../org.apache.flink.core.fs.FileSystemFactory     |  17 ++
 11 files changed, 1455 insertions(+)

diff --git a/flink-table-store-core/src/main/java/org/apache/flink/table/store/file/mergetree/sst/SstFile.java b/flink-table-store-core/src/main/java/org/apache/flink/table/store/file/mergetree/sst/SstFile.java
new file mode 100644
index 0000000..d73ff45
--- /dev/null
+++ b/flink-table-store-core/src/main/java/org/apache/flink/table/store/file/mergetree/sst/SstFile.java
@@ -0,0 +1,269 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.store.file.mergetree.sst;
+
+import org.apache.flink.annotation.VisibleForTesting;
+import org.apache.flink.api.common.serialization.BulkWriter;
+import org.apache.flink.connector.file.src.FileSourceSplit;
+import org.apache.flink.connector.file.src.reader.BulkFormat;
+import org.apache.flink.connector.file.src.util.RecordAndPosition;
+import org.apache.flink.core.fs.FSDataOutputStream;
+import org.apache.flink.core.fs.FileSystem;
+import org.apache.flink.core.fs.Path;
+import org.apache.flink.table.data.RowData;
+import org.apache.flink.table.data.binary.BinaryRowData;
+import org.apache.flink.table.runtime.typeutils.RowDataSerializer;
+import org.apache.flink.table.store.file.FileFormat;
+import org.apache.flink.table.store.file.KeyValue;
+import org.apache.flink.table.store.file.KeyValueSerializer;
+import org.apache.flink.table.store.file.stats.FieldStats;
+import org.apache.flink.table.store.file.utils.FileUtils;
+import org.apache.flink.table.store.file.utils.RecordReader;
+import org.apache.flink.table.types.logical.RowType;
+import org.apache.flink.util.CloseableIterator;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import javax.annotation.Nullable;
+
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.List;
+
+/**
+ * This file includes several {@link KeyValue}s, representing the changes inserted into the file
+ * storage.
+ */
+public class SstFile {
+
+    private static final Logger LOG = LoggerFactory.getLogger(SstFile.class);
+
+    private final RowType keyType;
+    private final RowType valueType;
+
+    private final BulkFormat<RowData, FileSourceSplit> readerFactory;
+    private final BulkWriter.Factory<RowData> writerFactory;
+    private final SstPathFactory pathFactory;
+    private final long suggestedFileSize;
+
+    public SstFile(
+            RowType keyType,
+            RowType valueType,
+            FileFormat fileFormat,
+            SstPathFactory pathFactory,
+            long suggestedFileSize) {
+        this.keyType = keyType;
+        this.valueType = valueType;
+
+        RowType recordType = KeyValue.schema(keyType, valueType);
+        this.readerFactory = fileFormat.createReaderFactory(recordType);
+        this.writerFactory = fileFormat.createWriterFactory(recordType);
+        this.pathFactory = pathFactory;
+        this.suggestedFileSize = suggestedFileSize;
+    }
+
+    public RowType keyType() {
+        return keyType;
+    }
+
+    public RowType valueType() {
+        return valueType;
+    }
+
+    @VisibleForTesting
+    public long suggestedFileSize() {
+        return suggestedFileSize;
+    }
+
+    public RecordReader read(String fileName) throws IOException {
+        return new SstFileRecordReader(pathFactory.toPath(fileName));
+    }
+
+    /**
+     * Write several {@link KeyValue}s into an sst file of a given level.
+     *
+     * <p>NOTE: This method is atomic.
+     */
+    public List<SstFileMeta> write(CloseableIterator<KeyValue> iterator, int level)
+            throws Exception {
+        List<SstFileMeta> result = new ArrayList<>();
+
+        RollingFile rollingFile = null;
+        Path currentPath = null;
+        try {
+            while (iterator.hasNext()) {
+                if (rollingFile == null) {
+                    currentPath = pathFactory.newPath();
+                    rollingFile = new RollingFile(currentPath, suggestedFileSize);
+                }
+                rollingFile.write(iterator.next());
+                if (rollingFile.exceedsSuggestedFileSize()) {
+                    result.add(rollingFile.finish(level));
+                    rollingFile = null;
+                }
+            }
+            // finish last file
+            if (rollingFile != null) {
+                result.add(rollingFile.finish(level));
+            }
+            iterator.close();
+        } catch (Throwable e) {
+            LOG.warn("Exception occurs when writing sst files. Cleaning up.", e);
+            // clean up finished files
+            for (SstFileMeta meta : result) {
+                FileUtils.deleteOrWarn(pathFactory.toPath(meta.fileName()));
+            }
+            // clean up in-progress file
+            if (currentPath != null) {
+                FileUtils.deleteOrWarn(currentPath);
+            }
+            throw e;
+        }
+
+        return result;
+    }
+
+    public void delete(SstFileMeta file) {
+        FileUtils.deleteOrWarn(pathFactory.toPath(file.fileName()));
+    }
+
+    private class SstFileRecordReader implements RecordReader {
+
+        private final BulkFormat.Reader<RowData> reader;
+        private final KeyValueSerializer serializer;
+
+        private SstFileRecordReader(Path path) throws IOException {
+            long fileSize = FileUtils.getFileSize(path);
+            FileSourceSplit split = new FileSourceSplit("ignore", path, 0, fileSize, 0, fileSize);
+            this.reader = readerFactory.createReader(FileUtils.DEFAULT_READER_CONFIG, split);
+            this.serializer = new KeyValueSerializer(keyType, valueType);
+        }
+
+        @Nullable
+        @Override
+        public RecordIterator readBatch() throws IOException {
+            BulkFormat.RecordIterator<RowData> iterator = reader.readBatch();
+            return iterator == null ? null : new SstFileRecordIterator(iterator, serializer);
+        }
+
+        @Override
+        public void close() throws IOException {
+            reader.close();
+        }
+    }
+
+    private static class SstFileRecordIterator implements RecordReader.RecordIterator {
+
+        private final BulkFormat.RecordIterator<RowData> iterator;
+        private final KeyValueSerializer serializer;
+
+        private SstFileRecordIterator(
+                BulkFormat.RecordIterator<RowData> iterator, KeyValueSerializer serializer) {
+            this.iterator = iterator;
+            this.serializer = serializer;
+        }
+
+        @Override
+        public KeyValue next() throws IOException {
+            RecordAndPosition<RowData> result = iterator.next();
+            return result == null ? null : serializer.fromRow(result.getRecord());
+        }
+
+        @Override
+        public void releaseBatch() {
+            iterator.releaseBatch();
+        }
+    }
+
+    private class RollingFile {
+        private final Path path;
+        private final long suggestedFileSize;
+
+        private final FSDataOutputStream out;
+        private final BulkWriter<RowData> writer;
+        private final KeyValueSerializer serializer;
+        private final RowDataSerializer keySerializer;
+
+        private long rowCount;
+        private BinaryRowData minKey;
+        private RowData maxKey;
+        private long minSequenceNumber;
+        private long maxSequenceNumber;
+
+        private RollingFile(Path path, long suggestedFileSize) throws IOException {
+            this.path = path;
+            this.suggestedFileSize = suggestedFileSize;
+
+            this.out =
+                    this.path.getFileSystem().create(this.path, FileSystem.WriteMode.NO_OVERWRITE);
+            this.writer = writerFactory.create(out);
+            this.serializer = new KeyValueSerializer(keyType, valueType);
+            this.keySerializer = new RowDataSerializer(keyType);
+
+            this.rowCount = 0;
+            this.minKey = null;
+            this.maxKey = null;
+            this.minSequenceNumber = Long.MAX_VALUE;
+            this.maxSequenceNumber = Long.MIN_VALUE;
+        }
+
+        private void write(KeyValue kv) throws IOException {
+            writer.addElement(serializer.toRow(kv));
+
+            rowCount++;
+            if (minKey == null) {
+                minKey = keySerializer.toBinaryRow(kv.key()).copy();
+            }
+            maxKey = kv.key();
+            minSequenceNumber = Math.min(minSequenceNumber, kv.sequenceNumber());
+            maxSequenceNumber = Math.max(maxSequenceNumber, kv.sequenceNumber());
+        }
+
+        private boolean exceedsSuggestedFileSize() throws IOException {
+            // NOTE: this method is inaccurate for formats buffering changes in memory
+            return out.getPos() >= suggestedFileSize;
+        }
+
+        private SstFileMeta finish(int level) throws IOException {
+            writer.finish();
+            out.close();
+
+            // TODO
+            //  1. Read statistics directly from the written orc/parquet files.
+            //  2. For other file formats use StatsCollector. Make sure fields are not reused
+            //     otherwise we need copying.
+            FieldStats[] stats = new FieldStats[valueType.getFieldCount()];
+            for (int i = 0; i < stats.length; i++) {
+                stats[i] = new FieldStats(null, null, 0);
+            }
+
+            return new SstFileMeta(
+                    path.getName(),
+                    FileUtils.getFileSize(path),
+                    rowCount,
+                    minKey,
+                    keySerializer.toBinaryRow(maxKey).copy(),
+                    stats,
+                    minSequenceNumber,
+                    maxSequenceNumber,
+                    level);
+        }
+    }
+}
diff --git a/flink-table-store-core/src/main/java/org/apache/flink/table/store/file/mergetree/sst/SstFileMeta.java b/flink-table-store-core/src/main/java/org/apache/flink/table/store/file/mergetree/sst/SstFileMeta.java
new file mode 100644
index 0000000..5a7e99a
--- /dev/null
+++ b/flink-table-store-core/src/main/java/org/apache/flink/table/store/file/mergetree/sst/SstFileMeta.java
@@ -0,0 +1,186 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.store.file.mergetree.sst;
+
+import org.apache.flink.table.data.binary.BinaryRowData;
+import org.apache.flink.table.store.file.stats.FieldStats;
+import org.apache.flink.table.store.file.stats.FieldStatsArraySerializer;
+import org.apache.flink.table.types.logical.BigIntType;
+import org.apache.flink.table.types.logical.IntType;
+import org.apache.flink.table.types.logical.RowType;
+import org.apache.flink.table.types.logical.VarCharType;
+
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.List;
+import java.util.Objects;
+
+import static org.apache.flink.util.Preconditions.checkArgument;
+
+/** Metadata of a SST file. */
+public class SstFileMeta {
+
+    private final String fileName;
+    private final long fileSize;
+    private final long rowCount;
+
+    private final BinaryRowData minKey;
+    private final BinaryRowData maxKey;
+    private final FieldStats[] stats;
+
+    private final long minSequenceNumber;
+    private final long maxSequenceNumber;
+    private final int level;
+
+    public SstFileMeta(
+            String fileName,
+            long fileSize,
+            long rowCount,
+            BinaryRowData minKey,
+            BinaryRowData maxKey,
+            FieldStats[] stats,
+            long minSequenceNumber,
+            long maxSequenceNumber,
+            int level) {
+        this.fileName = fileName;
+        this.fileSize = fileSize;
+        this.rowCount = rowCount;
+
+        this.minKey = minKey;
+        this.maxKey = maxKey;
+        this.stats = stats;
+
+        this.minSequenceNumber = minSequenceNumber;
+        this.maxSequenceNumber = maxSequenceNumber;
+        this.level = level;
+    }
+
+    public String fileName() {
+        return fileName;
+    }
+
+    public long fileSize() {
+        return fileSize;
+    }
+
+    public long rowCount() {
+        return rowCount;
+    }
+
+    public BinaryRowData minKey() {
+        return minKey;
+    }
+
+    public BinaryRowData maxKey() {
+        return maxKey;
+    }
+
+    /** Element in the array may be null, indicating the statistics of this field is unknown. */
+    public FieldStats[] stats() {
+        return stats;
+    }
+
+    public long minSequenceNumber() {
+        return minSequenceNumber;
+    }
+
+    public long maxSequenceNumber() {
+        return maxSequenceNumber;
+    }
+
+    public int level() {
+        return level;
+    }
+
+    public SstFileMeta upgrade(int newLevel) {
+        checkArgument(newLevel > this.level);
+        return new SstFileMeta(
+                fileName,
+                fileSize,
+                rowCount,
+                minKey,
+                maxKey,
+                stats,
+                minSequenceNumber,
+                maxSequenceNumber,
+                newLevel);
+    }
+
+    @Override
+    public boolean equals(Object o) {
+        if (!(o instanceof SstFileMeta)) {
+            return false;
+        }
+        SstFileMeta that = (SstFileMeta) o;
+        return Objects.equals(fileName, that.fileName)
+                && fileSize == that.fileSize
+                && rowCount == that.rowCount
+                && Objects.equals(minKey, that.minKey)
+                && Objects.equals(maxKey, that.maxKey)
+                && Arrays.equals(stats, that.stats)
+                && minSequenceNumber == that.minSequenceNumber
+                && maxSequenceNumber == that.maxSequenceNumber
+                && level == that.level;
+    }
+
+    @Override
+    public int hashCode() {
+        return Objects.hash(
+                fileName,
+                fileSize,
+                rowCount,
+                minKey,
+                maxKey,
+                // by default, hash code of arrays are computed by reference, not by content.
+                // so we must use Arrays.hashCode to hash by content.
+                Arrays.hashCode(stats),
+                minSequenceNumber,
+                maxSequenceNumber,
+                level);
+    }
+
+    @Override
+    public String toString() {
+        return String.format(
+                "{%s, %d, %d, %s, %s, %s, %d, %d, %d}",
+                fileName,
+                fileSize,
+                rowCount,
+                minKey,
+                maxKey,
+                Arrays.toString(stats),
+                minSequenceNumber,
+                maxSequenceNumber,
+                level);
+    }
+
+    public static RowType schema(RowType keyType, RowType rowType) {
+        List<RowType.RowField> fields = new ArrayList<>();
+        fields.add(new RowType.RowField("_FILE_NAME", new VarCharType(false, Integer.MAX_VALUE)));
+        fields.add(new RowType.RowField("_FILE_SIZE", new BigIntType(false)));
+        fields.add(new RowType.RowField("_ROW_COUNT", new BigIntType(false)));
+        fields.add(new RowType.RowField("_MIN_KEY", keyType));
+        fields.add(new RowType.RowField("_MAX_KEY", keyType));
+        fields.add(new RowType.RowField("_STATS", FieldStatsArraySerializer.schema(rowType)));
+        fields.add(new RowType.RowField("_MIN_SEQUENCE_NUMBER", new BigIntType(false)));
+        fields.add(new RowType.RowField("_MAX_SEQUENCE_NUMBER", new BigIntType(false)));
+        fields.add(new RowType.RowField("_LEVEL", new IntType(false)));
+        return new RowType(fields);
+    }
+}
diff --git a/flink-table-store-core/src/main/java/org/apache/flink/table/store/file/mergetree/sst/SstFileMetaSerializer.java b/flink-table-store-core/src/main/java/org/apache/flink/table/store/file/mergetree/sst/SstFileMetaSerializer.java
new file mode 100644
index 0000000..0e176d1
--- /dev/null
+++ b/flink-table-store-core/src/main/java/org/apache/flink/table/store/file/mergetree/sst/SstFileMetaSerializer.java
@@ -0,0 +1,69 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.store.file.mergetree.sst;
+
+import org.apache.flink.table.data.GenericRowData;
+import org.apache.flink.table.data.RowData;
+import org.apache.flink.table.data.StringData;
+import org.apache.flink.table.runtime.typeutils.RowDataSerializer;
+import org.apache.flink.table.store.file.stats.FieldStatsArraySerializer;
+import org.apache.flink.table.store.file.utils.ObjectSerializer;
+import org.apache.flink.table.types.logical.RowType;
+
+/** Serializer for {@link SstFileMeta}. */
+public class SstFileMetaSerializer extends ObjectSerializer<SstFileMeta> {
+
+    private final RowDataSerializer keySerializer;
+    private final FieldStatsArraySerializer statsArraySerializer;
+
+    public SstFileMetaSerializer(RowType keyType, RowType rowType) {
+        super(SstFileMeta.schema(keyType, rowType));
+        this.keySerializer = new RowDataSerializer(keyType);
+        this.statsArraySerializer = new FieldStatsArraySerializer(rowType);
+    }
+
+    @Override
+    public RowData toRow(SstFileMeta meta) {
+        return GenericRowData.of(
+                StringData.fromString(meta.fileName()),
+                meta.fileSize(),
+                meta.rowCount(),
+                meta.minKey(),
+                meta.maxKey(),
+                statsArraySerializer.toRow(meta.stats()),
+                meta.minSequenceNumber(),
+                meta.maxSequenceNumber(),
+                meta.level());
+    }
+
+    @Override
+    public SstFileMeta fromRow(RowData row) {
+        int keyFieldCount = keySerializer.getArity();
+        return new SstFileMeta(
+                row.getString(0).toString(),
+                row.getLong(1),
+                row.getLong(2),
+                keySerializer.toBinaryRow(row.getRow(3, keyFieldCount)).copy(),
+                keySerializer.toBinaryRow(row.getRow(4, keyFieldCount)).copy(),
+                statsArraySerializer.fromRow(row.getRow(5, statsArraySerializer.numFields())),
+                row.getLong(6),
+                row.getLong(7),
+                row.getInt(8));
+    }
+}
diff --git a/flink-table-store-core/src/main/java/org/apache/flink/table/store/file/utils/FileUtils.java b/flink-table-store-core/src/main/java/org/apache/flink/table/store/file/utils/FileUtils.java
new file mode 100644
index 0000000..b787b9a
--- /dev/null
+++ b/flink-table-store-core/src/main/java/org/apache/flink/table/store/file/utils/FileUtils.java
@@ -0,0 +1,76 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.store.file.utils;
+
+import org.apache.flink.configuration.Configuration;
+import org.apache.flink.connector.base.source.reader.SourceReaderOptions;
+import org.apache.flink.connector.file.src.FileSourceSplit;
+import org.apache.flink.connector.file.src.reader.BulkFormat;
+import org.apache.flink.connector.file.src.util.Utils;
+import org.apache.flink.core.fs.FileSystem;
+import org.apache.flink.core.fs.Path;
+import org.apache.flink.table.data.RowData;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.List;
+
+/** Utils for file reading and writing. */
+public class FileUtils {
+
+    private static final Logger LOG = LoggerFactory.getLogger(FileUtils.class);
+
+    public static final Configuration DEFAULT_READER_CONFIG = new Configuration();
+
+    static {
+        DEFAULT_READER_CONFIG.setInteger(SourceReaderOptions.ELEMENT_QUEUE_CAPACITY, 1);
+    }
+
+    public static <T> List<T> readListFromFile(
+            Path path,
+            ObjectSerializer<T> serializer,
+            BulkFormat<RowData, FileSourceSplit> readerFactory)
+            throws IOException {
+        List<T> result = new ArrayList<>();
+        long fileSize = FileUtils.getFileSize(path);
+        FileSourceSplit split = new FileSourceSplit("ignore", path, 0, fileSize, 0, fileSize);
+        BulkFormat.Reader<RowData> reader =
+                readerFactory.createReader(DEFAULT_READER_CONFIG, split);
+        Utils.forEachRemaining(reader, row -> result.add(serializer.fromRow(row)));
+        return result;
+    }
+
+    public static long getFileSize(Path path) throws IOException {
+        return path.getFileSystem().getFileStatus(path).getLen();
+    }
+
+    public static void deleteOrWarn(Path file) {
+        try {
+            FileSystem fs = file.getFileSystem();
+            if (!fs.delete(file, false) && fs.exists(file)) {
+                LOG.warn("Failed to delete file " + file);
+            }
+        } catch (IOException e) {
+            LOG.warn("Exception occurs when deleting file " + file, e);
+        }
+    }
+}
diff --git a/flink-table-store-core/src/test/java/org/apache/flink/table/store/file/mergetree/sst/SstFileMetaSerializerTest.java b/flink-table-store-core/src/test/java/org/apache/flink/table/store/file/mergetree/sst/SstFileMetaSerializerTest.java
new file mode 100644
index 0000000..28e03b6
--- /dev/null
+++ b/flink-table-store-core/src/test/java/org/apache/flink/table/store/file/mergetree/sst/SstFileMetaSerializerTest.java
@@ -0,0 +1,39 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.store.file.mergetree.sst;
+
+import org.apache.flink.table.store.file.TestKeyValueGenerator;
+import org.apache.flink.table.store.file.utils.ObjectSerializerTestBase;
+
+/** Tests for {@link SstFileMetaSerializer}. */
+public class SstFileMetaSerializerTest extends ObjectSerializerTestBase<SstFileMeta> {
+
+    private final SstTestDataGenerator gen = SstTestDataGenerator.builder().build();
+
+    @Override
+    protected SstFileMetaSerializer serializer() {
+        return new SstFileMetaSerializer(
+                TestKeyValueGenerator.KEY_TYPE, TestKeyValueGenerator.ROW_TYPE);
+    }
+
+    @Override
+    protected SstFileMeta object() {
+        return gen.next().meta;
+    }
+}
diff --git a/flink-table-store-core/src/test/java/org/apache/flink/table/store/file/mergetree/sst/SstFileTest.java b/flink-table-store-core/src/test/java/org/apache/flink/table/store/file/mergetree/sst/SstFileTest.java
new file mode 100644
index 0000000..de6ed25
--- /dev/null
+++ b/flink-table-store-core/src/test/java/org/apache/flink/table/store/file/mergetree/sst/SstFileTest.java
@@ -0,0 +1,245 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.store.file.mergetree.sst;
+
+import org.apache.flink.api.common.serialization.BulkWriter;
+import org.apache.flink.configuration.Configuration;
+import org.apache.flink.connector.file.src.FileSourceSplit;
+import org.apache.flink.connector.file.src.reader.BulkFormat;
+import org.apache.flink.core.fs.FileStatus;
+import org.apache.flink.core.fs.FileSystem;
+import org.apache.flink.core.fs.Path;
+import org.apache.flink.table.data.RowData;
+import org.apache.flink.table.expressions.ResolvedExpression;
+import org.apache.flink.table.store.file.FileFormat;
+import org.apache.flink.table.store.file.KeyValue;
+import org.apache.flink.table.store.file.KeyValueSerializerTest;
+import org.apache.flink.table.store.file.TestKeyValueGenerator;
+import org.apache.flink.table.store.file.stats.FieldStats;
+import org.apache.flink.table.store.file.utils.FailingAtomicRenameFileSystem;
+import org.apache.flink.table.store.file.utils.FileStorePathFactory;
+import org.apache.flink.table.store.file.utils.RecordReaderIterator;
+import org.apache.flink.table.types.logical.RowType;
+import org.apache.flink.util.CloseableIterator;
+
+import org.junit.jupiter.api.RepeatedTest;
+import org.junit.jupiter.api.io.TempDir;
+
+import java.io.IOException;
+import java.util.Iterator;
+import java.util.List;
+import java.util.concurrent.ThreadLocalRandom;
+
+import static org.assertj.core.api.Assertions.assertThat;
+
+/** Tests for {@link SstFile}. */
+public class SstFileTest {
+
+    private final SstTestDataGenerator gen =
+            SstTestDataGenerator.builder().memTableCapacity(20).build();
+    private final FileFormat flushingAvro = new FlushingAvroFormat();
+
+    @TempDir java.nio.file.Path tempDir;
+
+    @RepeatedTest(10)
+    public void testWriteAndReadSstFile() throws Exception {
+        SstTestDataGenerator.SstFile data = gen.next();
+        SstFile sstFile = createSstFile(tempDir.toString());
+        SstFileMetaSerializer serializer =
+                new SstFileMetaSerializer(
+                        TestKeyValueGenerator.KEY_TYPE, TestKeyValueGenerator.ROW_TYPE);
+
+        List<SstFileMeta> actualMetas =
+                sstFile.write(CloseableIterator.fromList(data.content, kv -> {}), 0);
+
+        checkRollingFiles(data.meta, actualMetas, sstFile.suggestedFileSize());
+
+        Iterator<KeyValue> expectedIterator = data.content.iterator();
+        for (SstFileMeta meta : actualMetas) {
+            // check the contents of sst file
+            CloseableIterator<KeyValue> actualKvsIterator =
+                    new RecordReaderIterator(sstFile.read(meta.fileName()));
+            while (actualKvsIterator.hasNext()) {
+                assertThat(expectedIterator.hasNext()).isTrue();
+                KeyValue actualKv = actualKvsIterator.next();
+                assertThat(
+                                KeyValueSerializerTest.equals(
+                                        expectedIterator.next(),
+                                        actualKv,
+                                        TestKeyValueGenerator.KEY_SERIALIZER,
+                                        TestKeyValueGenerator.ROW_SERIALIZER))
+                        .isTrue();
+            }
+            actualKvsIterator.close();
+
+            // check that each sst file meta is serializable
+            assertThat(serializer.fromRow(serializer.toRow(meta))).isEqualTo(meta);
+        }
+        assertThat(expectedIterator.hasNext()).isFalse();
+    }
+
+    @RepeatedTest(10)
+    public void testCleanUpForException() throws IOException {
+        FailingAtomicRenameFileSystem.resetFailCounter(1);
+        FailingAtomicRenameFileSystem.setFailPossibility(10);
+        SstTestDataGenerator.SstFile data = gen.next();
+        SstFile sstFile =
+                createSstFile(FailingAtomicRenameFileSystem.SCHEME + "://" + tempDir.toString());
+
+        try {
+            sstFile.write(CloseableIterator.fromList(data.content, kv -> {}), 0);
+        } catch (Throwable e) {
+            assertThat(e)
+                    .isExactlyInstanceOf(FailingAtomicRenameFileSystem.ArtificialException.class);
+            Path root = new Path(tempDir.toString());
+            FileSystem fs = root.getFileSystem();
+            for (FileStatus bucketStatus : fs.listStatus(root)) {
+                assertThat(bucketStatus.isDir()).isTrue();
+                assertThat(fs.listStatus(bucketStatus.getPath())).isEmpty();
+            }
+        }
+    }
+
+    private SstFile createSstFile(String path) {
+        FileStorePathFactory fileStorePathFactory = new FileStorePathFactory(new Path(path));
+        SstPathFactory sstPathFactory = fileStorePathFactory.createSstPathFactory(null, 0);
+        int suggestedFileSize = ThreadLocalRandom.current().nextInt(8192) + 1024;
+        return new SstFile(
+                TestKeyValueGenerator.KEY_TYPE,
+                TestKeyValueGenerator.ROW_TYPE,
+                // normal avro format will buffer changes in memory and we can't determine
+                // if the written file size is really larger than suggested, so we use a
+                // special avro format which flushes for every added element
+                flushingAvro,
+                sstPathFactory,
+                suggestedFileSize);
+    }
+
+    private void checkRollingFiles(
+            SstFileMeta expected, List<SstFileMeta> actual, long suggestedFileSize) {
+        // all but last file should be no smaller than suggestedFileSize
+        for (int i = 0; i + 1 < actual.size(); i++) {
+            assertThat(actual.get(i).fileSize() >= suggestedFileSize).isTrue();
+        }
+
+        // expected.rowCount == sum(rowCount)
+        assertThat(actual.stream().mapToLong(SstFileMeta::rowCount).sum())
+                .isEqualTo(expected.rowCount());
+
+        // expected.minKey == firstFile.minKey
+        assertThat(actual.get(0).minKey()).isEqualTo(expected.minKey());
+
+        // expected.maxKey == lastFile.maxKey
+        assertThat(actual.get(actual.size() - 1).maxKey()).isEqualTo(expected.maxKey());
+
+        // TODO check stats after they're collected
+        /*
+        for (int i = 0; i < expected.stats().length; i++) {
+            List<FieldStats> actualStats = new ArrayList<>();
+            for (SstFileMeta meta : actual) {
+                actualStats.add(meta.stats()[i]);
+            }
+            checkRollingFileStats(expected.stats()[i], actualStats);
+        }
+        */
+
+        // expected.minSequenceNumber == min(minSequenceNumber)
+        assertThat(actual.stream().mapToLong(SstFileMeta::minSequenceNumber).min().orElse(-1))
+                .isEqualTo(expected.minSequenceNumber());
+
+        // expected.maxSequenceNumber == max(maxSequenceNumber)
+        assertThat(actual.stream().mapToLong(SstFileMeta::maxSequenceNumber).max().orElse(-1))
+                .isEqualTo(expected.maxSequenceNumber());
+
+        // expected.level == eachFile.level
+        for (SstFileMeta meta : actual) {
+            assertThat(meta.level()).isEqualTo(expected.level());
+        }
+    }
+
+    @SuppressWarnings("unchecked")
+    private void checkRollingFileStats(FieldStats expected, List<FieldStats> actual) {
+        if (expected.minValue() instanceof Comparable) {
+            Object actualMin = null;
+            Object actualMax = null;
+            for (FieldStats stats : actual) {
+                if (stats.minValue() != null
+                        && (actualMin == null
+                                || ((Comparable<Object>) stats.minValue()).compareTo(actualMin)
+                                        < 0)) {
+                    actualMin = stats.minValue();
+                }
+                if (stats.maxValue() != null
+                        && (actualMax == null
+                                || ((Comparable<Object>) stats.maxValue()).compareTo(actualMax)
+                                        > 0)) {
+                    actualMax = stats.maxValue();
+                }
+            }
+            assertThat(actualMin).isEqualTo(expected.minValue());
+            assertThat(actualMax).isEqualTo(expected.maxValue());
+        } else {
+            for (FieldStats stats : actual) {
+                assertThat(stats.minValue()).isNull();
+                assertThat(stats.maxValue()).isNull();
+            }
+        }
+        assertThat(actual.stream().mapToLong(FieldStats::nullCount).sum())
+                .isEqualTo(expected.nullCount());
+    }
+
+    /** A special avro {@link FileFormat} which flushes for every added element. */
+    public static class FlushingAvroFormat implements FileFormat {
+
+        private final FileFormat avro =
+                FileFormat.fromIdentifier(
+                        SstFileTest.class.getClassLoader(), "avro", new Configuration());
+
+        @Override
+        public BulkFormat<RowData, FileSourceSplit> createReaderFactory(
+                RowType type, List<ResolvedExpression> filters) {
+            return avro.createReaderFactory(type, filters);
+        }
+
+        @Override
+        public BulkWriter.Factory<RowData> createWriterFactory(RowType type) {
+            return fsDataOutputStream -> {
+                BulkWriter<RowData> wrapped =
+                        avro.createWriterFactory(type).create(fsDataOutputStream);
+                return new BulkWriter<RowData>() {
+                    @Override
+                    public void addElement(RowData rowData) throws IOException {
+                        wrapped.addElement(rowData);
+                        wrapped.flush();
+                    }
+
+                    @Override
+                    public void flush() throws IOException {
+                        wrapped.flush();
+                    }
+
+                    @Override
+                    public void finish() throws IOException {
+                        wrapped.finish();
+                    }
+                };
+            };
+        }
+    }
+}
diff --git a/flink-table-store-core/src/test/java/org/apache/flink/table/store/file/mergetree/sst/SstTestDataGenerator.java b/flink-table-store-core/src/test/java/org/apache/flink/table/store/file/mergetree/sst/SstTestDataGenerator.java
new file mode 100644
index 0000000..94b2f08
--- /dev/null
+++ b/flink-table-store-core/src/test/java/org/apache/flink/table/store/file/mergetree/sst/SstTestDataGenerator.java
@@ -0,0 +1,178 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.store.file.mergetree.sst;
+
+import org.apache.flink.table.data.binary.BinaryRowData;
+import org.apache.flink.table.store.file.KeyValue;
+import org.apache.flink.table.store.file.TestKeyValueGenerator;
+import org.apache.flink.table.store.file.stats.FieldStatsCollector;
+
+import java.util.ArrayList;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.UUID;
+
+/** Random {@link SstFileMeta} generator. */
+public class SstTestDataGenerator {
+
+    private final int numBuckets;
+    private final int memTableCapacity;
+
+    private final List<Map<BinaryRowData, List<KeyValue>>> memTables;
+    private final TestKeyValueGenerator gen;
+
+    private SstTestDataGenerator(int numBuckets, int memTableCapacity) {
+        this.numBuckets = numBuckets;
+        this.memTableCapacity = memTableCapacity;
+
+        this.memTables = new ArrayList<>();
+        for (int i = 0; i < numBuckets; i++) {
+            memTables.add(new HashMap<>());
+        }
+        this.gen = new TestKeyValueGenerator();
+    }
+
+    public SstFile next() {
+        while (true) {
+            KeyValue kv = gen.next();
+            BinaryRowData key = (BinaryRowData) kv.key();
+            BinaryRowData partition = gen.getPartition(kv);
+            int bucket = (key.hashCode() % numBuckets + numBuckets) % numBuckets;
+            List<KeyValue> memTable =
+                    memTables.get(bucket).computeIfAbsent(partition, k -> new ArrayList<>());
+            memTable.add(kv);
+
+            if (memTable.size() >= memTableCapacity) {
+                List<SstFile> result = createSstFiles(memTable, 0, partition, bucket);
+                memTable.clear();
+                assert result.size() == 1;
+                return result.get(0);
+            }
+        }
+    }
+
+    public List<SstFile> createSstFiles(
+            List<KeyValue> kvs, int level, BinaryRowData partition, int bucket) {
+        gen.sort(kvs);
+        List<KeyValue> combined = new ArrayList<>();
+        for (int i = 0; i + 1 < kvs.size(); i++) {
+            KeyValue now = kvs.get(i);
+            KeyValue next = kvs.get(i + 1);
+            if (!now.key().equals(next.key())) {
+                combined.add(now);
+            }
+        }
+        combined.add(kvs.get(kvs.size() - 1));
+
+        int capacity = memTableCapacity;
+        for (int i = 0; i < level; i++) {
+            capacity *= memTableCapacity;
+        }
+        List<SstFile> result = new ArrayList<>();
+        for (int i = 0; i < combined.size(); i += capacity) {
+            result.add(
+                    createSstFile(
+                            combined.subList(i, Math.min(i + capacity, combined.size())),
+                            level,
+                            partition,
+                            bucket));
+        }
+        return result;
+    }
+
+    private SstFile createSstFile(
+            List<KeyValue> kvs, int level, BinaryRowData partition, int bucket) {
+        FieldStatsCollector collector = new FieldStatsCollector(TestKeyValueGenerator.ROW_TYPE);
+        long totalSize = 0;
+        BinaryRowData minKey = null;
+        BinaryRowData maxKey = null;
+        long minSequenceNumber = Long.MAX_VALUE;
+        long maxSequenceNumber = Long.MIN_VALUE;
+        for (KeyValue kv : kvs) {
+            BinaryRowData key = (BinaryRowData) kv.key();
+            BinaryRowData value = (BinaryRowData) kv.value();
+            totalSize += key.getSizeInBytes() + value.getSizeInBytes();
+            collector.collect(value);
+            if (minKey == null || gen.compareKeys(key, minKey) < 0) {
+                minKey = key;
+            }
+            if (maxKey == null || gen.compareKeys(key, maxKey) > 0) {
+                maxKey = key;
+            }
+            minSequenceNumber = Math.min(minSequenceNumber, kv.sequenceNumber());
+            maxSequenceNumber = Math.max(maxSequenceNumber, kv.sequenceNumber());
+        }
+
+        return new SstFile(
+                partition,
+                bucket,
+                new SstFileMeta(
+                        "sst-" + UUID.randomUUID(),
+                        totalSize,
+                        kvs.size(),
+                        minKey,
+                        maxKey,
+                        collector.extract(),
+                        minSequenceNumber,
+                        maxSequenceNumber,
+                        level),
+                kvs);
+    }
+
+    /** An in-memory SST file. */
+    public static class SstFile {
+        public final BinaryRowData partition;
+        public final int bucket;
+        public final SstFileMeta meta;
+        public final List<KeyValue> content;
+
+        private SstFile(
+                BinaryRowData partition, int bucket, SstFileMeta meta, List<KeyValue> content) {
+            this.partition = partition;
+            this.bucket = bucket;
+            this.meta = meta;
+            this.content = content;
+        }
+    }
+
+    public static Builder builder() {
+        return new Builder();
+    }
+
+    /** Builder for {@link SstTestDataGenerator}. */
+    public static class Builder {
+        private int numBuckets = 3;
+        private int memTableCapacity = 3;
+
+        public Builder numBuckets(int value) {
+            this.numBuckets = value;
+            return this;
+        }
+
+        public Builder memTableCapacity(int value) {
+            this.memTableCapacity = value;
+            return this;
+        }
+
+        public SstTestDataGenerator build() {
+            return new SstTestDataGenerator(numBuckets, memTableCapacity);
+        }
+    }
+}
diff --git a/flink-table-store-core/src/test/java/org/apache/flink/table/store/file/utils/FailingAtomicRenameFileSystem.java b/flink-table-store-core/src/test/java/org/apache/flink/table/store/file/utils/FailingAtomicRenameFileSystem.java
new file mode 100644
index 0000000..eb3b32a
--- /dev/null
+++ b/flink-table-store-core/src/test/java/org/apache/flink/table/store/file/utils/FailingAtomicRenameFileSystem.java
@@ -0,0 +1,143 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.store.file.utils;
+
+import org.apache.flink.core.fs.FSDataInputStream;
+import org.apache.flink.core.fs.FSDataInputStreamWrapper;
+import org.apache.flink.core.fs.FSDataOutputStream;
+import org.apache.flink.core.fs.FSDataOutputStreamWrapper;
+import org.apache.flink.core.fs.FileSystem;
+import org.apache.flink.core.fs.FileSystemFactory;
+import org.apache.flink.core.fs.Path;
+
+import java.io.IOException;
+import java.net.URI;
+import java.util.concurrent.ThreadLocalRandom;
+import java.util.concurrent.atomic.AtomicInteger;
+
+/**
+ * A {@link TestAtomicRenameFileSystem} which may fail when reading and writing. Mainly used to
+ * check if components deal with failures correctly.
+ */
+public class FailingAtomicRenameFileSystem extends TestAtomicRenameFileSystem {
+
+    public static final String SCHEME = "fail";
+
+    private static final AtomicInteger failCounter = new AtomicInteger();
+    private static int failPossibility = 1000;
+
+    public static void resetFailCounter(int maxValue) {
+        failCounter.set(maxValue);
+    }
+
+    public static void setFailPossibility(int v) {
+        failPossibility = v;
+    }
+
+    @Override
+    public FSDataInputStream open(Path f, int bufferSize) throws IOException {
+        return new FailingFSDataInputStreamWrapper(super.open(f, bufferSize));
+    }
+
+    @Override
+    public FSDataInputStream open(Path f) throws IOException {
+        return new FailingFSDataInputStreamWrapper(super.open(f));
+    }
+
+    @Override
+    public FSDataOutputStream create(Path filePath, FileSystem.WriteMode overwrite)
+            throws IOException {
+        return new FailingFSDataOutputStreamWrapper(super.create(filePath, overwrite));
+    }
+
+    @Override
+    public URI getUri() {
+        return URI.create(SCHEME + ":///");
+    }
+
+    /** {@link FileSystemFactory} for {@link FailingAtomicRenameFileSystem}. */
+    public static final class FailingAtomicRenameFileSystemFactory implements FileSystemFactory {
+
+        @Override
+        public String getScheme() {
+            return SCHEME;
+        }
+
+        @Override
+        public FileSystem create(URI uri) throws IOException {
+            return new FailingAtomicRenameFileSystem();
+        }
+    }
+
+    /** Specific {@link IOException} produced by {@link FailingAtomicRenameFileSystem}. */
+    public static final class ArtificialException extends IOException {
+
+        public ArtificialException() {
+            super("Artificial exception");
+        }
+    }
+
+    private static class FailingFSDataInputStreamWrapper extends FSDataInputStreamWrapper {
+
+        public FailingFSDataInputStreamWrapper(FSDataInputStream inputStream) {
+            super(inputStream);
+        }
+
+        @Override
+        public int read() throws IOException {
+            if (ThreadLocalRandom.current().nextInt(failPossibility) == 0
+                    && failCounter.getAndDecrement() > 0) {
+                throw new ArtificialException();
+            }
+            return super.read();
+        }
+
+        @Override
+        public int read(byte[] b, int off, int len) throws IOException {
+            if (ThreadLocalRandom.current().nextInt(failPossibility) == 0
+                    && failCounter.getAndDecrement() > 0) {
+                throw new ArtificialException();
+            }
+            return super.read(b, off, len);
+        }
+    }
+
+    private static class FailingFSDataOutputStreamWrapper extends FSDataOutputStreamWrapper {
+
+        public FailingFSDataOutputStreamWrapper(FSDataOutputStream outputStream) {
+            super(outputStream);
+        }
+
+        @Override
+        public void write(int b) throws IOException {
+            if (ThreadLocalRandom.current().nextInt(failPossibility) == 0) {
+                throw new ArtificialException();
+            }
+            super.write(b);
+        }
+
+        @Override
+        public void write(byte[] b, int off, int len) throws IOException {
+            if (ThreadLocalRandom.current().nextInt(failPossibility) == 0) {
+                throw new ArtificialException();
+            }
+            super.write(b, off, len);
+        }
+    }
+}
diff --git a/flink-table-store-core/src/test/java/org/apache/flink/table/store/file/utils/FileStorePathFactory.java b/flink-table-store-core/src/test/java/org/apache/flink/table/store/file/utils/FileStorePathFactory.java
new file mode 100644
index 0000000..2ae6946
--- /dev/null
+++ b/flink-table-store-core/src/test/java/org/apache/flink/table/store/file/utils/FileStorePathFactory.java
@@ -0,0 +1,112 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.store.file.utils;
+
+import org.apache.flink.annotation.VisibleForTesting;
+import org.apache.flink.connector.file.table.FileSystemConnectorOptions;
+import org.apache.flink.connector.file.table.RowDataPartitionComputer;
+import org.apache.flink.core.fs.Path;
+import org.apache.flink.table.data.binary.BinaryRowData;
+import org.apache.flink.table.store.file.mergetree.sst.SstPathFactory;
+import org.apache.flink.table.types.DataType;
+import org.apache.flink.table.types.logical.RowType;
+import org.apache.flink.table.types.utils.LogicalTypeDataTypeConverter;
+import org.apache.flink.table.utils.PartitionPathUtils;
+import org.apache.flink.util.Preconditions;
+
+import javax.annotation.Nullable;
+
+import java.util.UUID;
+
+/** Factory which produces {@link Path}s for each type of files. */
+public class FileStorePathFactory {
+
+    private final Path root;
+    private final String uuid;
+    @Nullable private final RowDataPartitionComputer partitionComputer;
+
+    private int manifestFileCount;
+    private int manifestListCount;
+
+    public FileStorePathFactory(Path root) {
+        this(root, null, FileSystemConnectorOptions.PARTITION_DEFAULT_NAME.defaultValue());
+    }
+
+    public FileStorePathFactory(
+            Path root, @Nullable RowType partitionType, String defaultPartValue) {
+        this.root = root;
+        this.uuid = UUID.randomUUID().toString();
+
+        if (partitionType == null) {
+            this.partitionComputer = null;
+        } else {
+            String[] partitionColumns = partitionType.getFieldNames().toArray(new String[0]);
+            this.partitionComputer =
+                    new RowDataPartitionComputer(
+                            defaultPartValue,
+                            partitionColumns,
+                            partitionType.getFields().stream()
+                                    .map(f -> LogicalTypeDataTypeConverter.toDataType(f.getType()))
+                                    .toArray(DataType[]::new),
+                            partitionColumns);
+        }
+
+        this.manifestFileCount = 0;
+        this.manifestListCount = 0;
+    }
+
+    public Path newManifestFile() {
+        return new Path(root + "/manifest/manifest-" + uuid + "-" + (manifestFileCount++));
+    }
+
+    public Path newManifestList() {
+        return new Path(root + "/manifest/manifest-list-" + uuid + "-" + (manifestListCount++));
+    }
+
+    public Path toManifestFilePath(String manifestFileName) {
+        return new Path(root + "/manifest/" + manifestFileName);
+    }
+
+    public Path toManifestListPath(String manifestListName) {
+        return new Path(root + "/manifest/" + manifestListName);
+    }
+
+    public Path toSnapshotPath(long id) {
+        return new Path(root + "/snapshot/snapshot-" + id);
+    }
+
+    public SstPathFactory createSstPathFactory(@Nullable BinaryRowData partition, int bucket) {
+        return new SstPathFactory(root, getPartitionString(partition), bucket);
+    }
+
+    public @Nullable String getPartitionString(@Nullable BinaryRowData partition) {
+        if (partitionComputer == null) {
+            return null;
+        }
+        return PartitionPathUtils.generatePartitionPath(
+                partitionComputer.generatePartValues(
+                        Preconditions.checkNotNull(
+                                partition, "Partition row data is null. This is unexpected.")));
+    }
+
+    @VisibleForTesting
+    public String uuid() {
+        return uuid;
+    }
+}
diff --git a/flink-table-store-core/src/test/java/org/apache/flink/table/store/file/utils/TestAtomicRenameFileSystem.java b/flink-table-store-core/src/test/java/org/apache/flink/table/store/file/utils/TestAtomicRenameFileSystem.java
new file mode 100644
index 0000000..98f4101
--- /dev/null
+++ b/flink-table-store-core/src/test/java/org/apache/flink/table/store/file/utils/TestAtomicRenameFileSystem.java
@@ -0,0 +1,121 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.store.file.utils;
+
+import org.apache.flink.core.fs.FileStatus;
+import org.apache.flink.core.fs.FileSystem;
+import org.apache.flink.core.fs.FileSystemFactory;
+import org.apache.flink.core.fs.Path;
+import org.apache.flink.core.fs.local.LocalFileStatus;
+import org.apache.flink.core.fs.local.LocalFileSystem;
+
+import java.io.File;
+import java.io.FileNotFoundException;
+import java.io.IOException;
+import java.net.URI;
+import java.nio.file.AccessDeniedException;
+import java.nio.file.DirectoryNotEmptyException;
+import java.nio.file.FileAlreadyExistsException;
+import java.nio.file.Files;
+import java.nio.file.NoSuchFileException;
+import java.util.ArrayList;
+import java.util.List;
+import java.util.concurrent.locks.ReentrantLock;
+
+/** A modified {@link LocalFileSystem} supporting atomic rename. */
+public class TestAtomicRenameFileSystem extends LocalFileSystem {
+
+    public static final String SCHEME = "test";
+
+    // the same file system object is cached and shared in the same JVM,
+    // so we can use java locks to ensure atomic renaming
+    private final ReentrantLock renameLock;
+
+    public TestAtomicRenameFileSystem() {
+        this.renameLock = new ReentrantLock();
+    }
+
+    @Override
+    public boolean rename(final Path src, final Path dst) throws IOException {
+        File srcFile = pathToFile(src);
+        File dstFile = pathToFile(dst);
+        File dstParent = dstFile.getParentFile();
+        dstParent.mkdirs();
+        try {
+            renameLock.lock();
+            Files.move(srcFile.toPath(), dstFile.toPath());
+            return true;
+        } catch (NoSuchFileException
+                | AccessDeniedException
+                | DirectoryNotEmptyException
+                | SecurityException
+                | FileAlreadyExistsException e) {
+            return false;
+        } finally {
+            renameLock.unlock();
+        }
+    }
+
+    @Override
+    public FileStatus[] listStatus(final Path f) throws IOException {
+        // TODO remove this method once FLINK-25453 is fixed
+        File localf = pathToFile(f);
+        if (!localf.exists()) {
+            return null;
+        }
+        if (localf.isFile()) {
+            return new FileStatus[] {new LocalFileStatus(localf, this)};
+        }
+
+        final String[] names = localf.list();
+        if (names == null) {
+            return null;
+        }
+        List<FileStatus> results = new ArrayList<>();
+        for (String name : names) {
+            try {
+                results.add(getFileStatus(new Path(f, name)));
+            } catch (FileNotFoundException e) {
+                // ignore the files not found since the dir list may have have changed
+                // since the names[] list was generated.
+            }
+        }
+
+        return results.toArray(new FileStatus[0]);
+    }
+
+    @Override
+    public URI getUri() {
+        return URI.create(SCHEME + ":///");
+    }
+
+    /** {@link FileSystemFactory} for {@link TestAtomicRenameFileSystem}. */
+    public static final class TestAtomicRenameFileSystemFactory implements FileSystemFactory {
+
+        @Override
+        public String getScheme() {
+            return SCHEME;
+        }
+
+        @Override
+        public FileSystem create(URI uri) throws IOException {
+            return new TestAtomicRenameFileSystem();
+        }
+    }
+}
diff --git a/flink-table-store-core/src/test/resources/META-INF/services/org.apache.flink.core.fs.FileSystemFactory b/flink-table-store-core/src/test/resources/META-INF/services/org.apache.flink.core.fs.FileSystemFactory
new file mode 100644
index 0000000..82e0c2d
--- /dev/null
+++ b/flink-table-store-core/src/test/resources/META-INF/services/org.apache.flink.core.fs.FileSystemFactory
@@ -0,0 +1,17 @@
+# Licensed to the Apache Software Foundation (ASF) under one or more
+# contributor license agreements.  See the NOTICE file distributed with
+# this work for additional information regarding copyright ownership.
+# The ASF licenses this file to You under the Apache License, Version 2.0
+# (the "License"); you may not use this file except in compliance with
+# the License.  You may obtain a copy of the License at
+#
+#     http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+
+org.apache.flink.table.store.file.utils.TestAtomicRenameFileSystem$TestAtomicRenameFileSystemFactory
+org.apache.flink.table.store.file.utils.FailingAtomicRenameFileSystem$FailingAtomicRenameFileSystemFactory