You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by lz...@apache.org on 2022/02/16 03:46:03 UTC

[flink-table-store] branch master updated: [FLINK-25994] Implement FileStoreExpire

This is an automated email from the ASF dual-hosted git repository.

lzljs3620320 pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/flink-table-store.git


The following commit(s) were added to refs/heads/master by this push:
     new 6ec208b  [FLINK-25994] Implement FileStoreExpire
6ec208b is described below

commit 6ec208bcc180471f141cfa250b92adc6aa95c2b2
Author: tsreaper <ts...@gmail.com>
AuthorDate: Wed Feb 16 11:45:59 2022 +0800

    [FLINK-25994] Implement FileStoreExpire
    
    This closes #17
---
 .../apache/flink/table/store/file/Snapshot.java    |  13 +-
 .../store/file/operation/FileStoreCommitImpl.java  |   8 +-
 .../store/file/operation/FileStoreExpireImpl.java  | 166 ++++++++++++++++
 .../store/file/utils/FileStorePathFactory.java     |  24 +++
 .../store/file/operation/FileStoreExpireTest.java  | 216 +++++++++++++++++++++
 .../store/file/operation/FileStoreScanTest.java    |  53 +----
 .../store/file/operation/OperationTestUtils.java   |  88 ++++++++-
 7 files changed, 516 insertions(+), 52 deletions(-)

diff --git a/flink-table-store-core/src/main/java/org/apache/flink/table/store/file/Snapshot.java b/flink-table-store-core/src/main/java/org/apache/flink/table/store/file/Snapshot.java
index 77f79e6..e02fa0d 100644
--- a/flink-table-store-core/src/main/java/org/apache/flink/table/store/file/Snapshot.java
+++ b/flink-table-store-core/src/main/java/org/apache/flink/table/store/file/Snapshot.java
@@ -39,6 +39,7 @@ public class Snapshot {
     private static final String FIELD_COMMIT_USER = "commitUser";
     private static final String FIELD_COMMIT_DIGEST = "commitDigest";
     private static final String FIELD_COMMIT_KIND = "commitKind";
+    private static final String FIELD_TIME_MILLIS = "timeMillis";
 
     @JsonProperty(FIELD_ID)
     private final long id;
@@ -56,18 +57,23 @@ public class Snapshot {
     @JsonProperty(FIELD_COMMIT_KIND)
     private final CommitKind commitKind;
 
+    @JsonProperty(FIELD_TIME_MILLIS)
+    private final long timeMillis;
+
     @JsonCreator
     public Snapshot(
             @JsonProperty(FIELD_ID) long id,
             @JsonProperty(FIELD_MANIFEST_LIST) String manifestList,
             @JsonProperty(FIELD_COMMIT_USER) String commitUser,
             @JsonProperty(FIELD_COMMIT_DIGEST) String commitDigest,
-            @JsonProperty(FIELD_COMMIT_KIND) CommitKind commitKind) {
+            @JsonProperty(FIELD_COMMIT_KIND) CommitKind commitKind,
+            @JsonProperty(FIELD_TIME_MILLIS) long timeMillis) {
         this.id = id;
         this.manifestList = manifestList;
         this.commitUser = commitUser;
         this.commitDigest = commitDigest;
         this.commitKind = commitKind;
+        this.timeMillis = timeMillis;
     }
 
     @JsonGetter(FIELD_ID)
@@ -95,6 +101,11 @@ public class Snapshot {
         return commitKind;
     }
 
+    @JsonGetter(FIELD_TIME_MILLIS)
+    public long timeMillis() {
+        return timeMillis;
+    }
+
     public String toJson() {
         try {
             return new ObjectMapper().writeValueAsString(this);
diff --git a/flink-table-store-core/src/main/java/org/apache/flink/table/store/file/operation/FileStoreCommitImpl.java b/flink-table-store-core/src/main/java/org/apache/flink/table/store/file/operation/FileStoreCommitImpl.java
index 465c2ec..7c56acd 100644
--- a/flink-table-store-core/src/main/java/org/apache/flink/table/store/file/operation/FileStoreCommitImpl.java
+++ b/flink-table-store-core/src/main/java/org/apache/flink/table/store/file/operation/FileStoreCommitImpl.java
@@ -243,7 +243,13 @@ public class FileStoreCommitImpl implements FileStoreCommit {
                 // prepare snapshot file
                 manifestListName = manifestList.write(newMetas);
                 newSnapshot =
-                        new Snapshot(newSnapshotId, manifestListName, commitUser, hash, commitKind);
+                        new Snapshot(
+                                newSnapshotId,
+                                manifestListName,
+                                commitUser,
+                                hash,
+                                commitKind,
+                                System.currentTimeMillis());
                 FileUtils.writeFileUtf8(tmpSnapshotPath, newSnapshot.toJson());
             } catch (Throwable e) {
                 // fails when preparing for commit, we should clean up
diff --git a/flink-table-store-core/src/main/java/org/apache/flink/table/store/file/operation/FileStoreExpireImpl.java b/flink-table-store-core/src/main/java/org/apache/flink/table/store/file/operation/FileStoreExpireImpl.java
new file mode 100644
index 0000000..e7a03d9
--- /dev/null
+++ b/flink-table-store-core/src/main/java/org/apache/flink/table/store/file/operation/FileStoreExpireImpl.java
@@ -0,0 +1,166 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.store.file.operation;
+
+import org.apache.flink.core.fs.Path;
+import org.apache.flink.table.store.file.Snapshot;
+import org.apache.flink.table.store.file.manifest.ManifestEntry;
+import org.apache.flink.table.store.file.manifest.ManifestFileMeta;
+import org.apache.flink.table.store.file.manifest.ManifestList;
+import org.apache.flink.table.store.file.mergetree.sst.SstPathFactory;
+import org.apache.flink.table.store.file.utils.FileStorePathFactory;
+import org.apache.flink.table.store.file.utils.FileUtils;
+
+import java.io.IOException;
+import java.util.HashSet;
+import java.util.Set;
+
+/**
+ * Default implementation of {@link FileStoreExpire}. It retains a certain number or period of
+ * latest snapshots.
+ *
+ * <p>NOTE: This implementation will keep at least one snapshot so that users will not accidentally
+ * clear all snapshots.
+ */
+public class FileStoreExpireImpl implements FileStoreExpire {
+
+    // snapshots exceeding any constraint will be expired
+    private final int numRetained;
+    private final long millisRetained;
+
+    private final FileStorePathFactory pathFactory;
+    private final ManifestList manifestList;
+    private final FileStoreScan scan;
+
+    public FileStoreExpireImpl(
+            int numRetained,
+            long millisRetained,
+            FileStorePathFactory pathFactory,
+            ManifestList.Factory manifestListFactory,
+            FileStoreScan scan) {
+        this.numRetained = numRetained;
+        this.millisRetained = millisRetained;
+        this.pathFactory = pathFactory;
+        this.manifestList = manifestListFactory.create();
+        this.scan = scan;
+    }
+
+    @Override
+    public void expire() {
+        Long latestSnapshotId = pathFactory.latestSnapshotId();
+        if (latestSnapshotId == null) {
+            // no snapshot, nothing to expire
+            return;
+        }
+
+        long currentMillis = System.currentTimeMillis();
+
+        // find earliest snapshot to retain
+        for (long id = Math.max(latestSnapshotId - numRetained + 1, Snapshot.FIRST_SNAPSHOT_ID);
+                id <= latestSnapshotId;
+                id++) {
+            Path snapshotPath = pathFactory.toSnapshotPath(id);
+            try {
+                if (snapshotPath.getFileSystem().exists(snapshotPath)
+                        && currentMillis - Snapshot.fromPath(snapshotPath).timeMillis()
+                                <= millisRetained) {
+                    // within time threshold, can assume that all snapshots after it are also within
+                    // the threshold
+                    expireUntil(id);
+                    return;
+                }
+            } catch (IOException e) {
+                throw new RuntimeException(
+                        "Failed to determine if snapshot #" + id + " still exists", e);
+            }
+        }
+
+        // no snapshot can be retained, expire all but last one
+        expireUntil(latestSnapshotId);
+    }
+
+    private void expireUntil(long exclusiveId) {
+        if (exclusiveId <= Snapshot.FIRST_SNAPSHOT_ID) {
+            // fast exit
+            return;
+        }
+
+        Snapshot exclusiveSnapshot = Snapshot.fromPath(pathFactory.toSnapshotPath(exclusiveId));
+
+        // if sst file is only used in snapshots to expire but not in next snapshot we can delete it
+        // because each sst file will only be added and deleted once
+        Set<Path> sstInUse = new HashSet<>();
+        FileStorePathFactory.SstPathFactoryCache sstPathFactoryCache =
+                new FileStorePathFactory.SstPathFactoryCache(pathFactory);
+        for (ManifestEntry entry : scan.withSnapshot(exclusiveId).plan().files()) {
+            SstPathFactory sstPathFactory =
+                    sstPathFactoryCache.getSstPathFactory(entry.partition(), entry.bucket());
+            sstInUse.add(sstPathFactory.toPath(entry.file().fileName()));
+        }
+
+        // the same with sst, manifests are only added and deleted once
+        Set<ManifestFileMeta> manifestsInUse =
+                new HashSet<>(manifestList.read(exclusiveSnapshot.manifestList()));
+
+        Set<Path> sstToDelete = new HashSet<>();
+        Set<String> manifestsToDelete = new HashSet<>();
+
+        for (long id = exclusiveId - 1; id >= Snapshot.FIRST_SNAPSHOT_ID; id--) {
+            Path snapshotPath = pathFactory.toSnapshotPath(id);
+            try {
+                if (!snapshotPath.getFileSystem().exists(snapshotPath)) {
+                    // only latest snapshots are retained, as we cannot find this snapshot, we can
+                    // assume that all snapshots preceding it have been removed
+                    break;
+                }
+
+                Snapshot toExpire = Snapshot.fromPath(pathFactory.toSnapshotPath(id));
+
+                for (ManifestEntry entry : scan.withSnapshot(toExpire.id()).plan().files()) {
+                    SstPathFactory sstPathFactory =
+                            sstPathFactoryCache.getSstPathFactory(
+                                    entry.partition(), entry.bucket());
+                    Path sstPath = sstPathFactory.toPath(entry.file().fileName());
+                    if (!sstInUse.contains(sstPath)) {
+                        sstToDelete.add(sstPath);
+                    }
+                }
+
+                for (ManifestFileMeta manifest : manifestList.read(toExpire.manifestList())) {
+                    if (!manifestsInUse.contains(manifest)) {
+                        manifestsToDelete.add(manifest.fileName());
+                    }
+                }
+
+                manifestList.delete(toExpire.manifestList());
+                FileUtils.deleteOrWarn(pathFactory.toSnapshotPath(id));
+            } catch (IOException e) {
+                throw new RuntimeException(
+                        "Failed to determine if snapshot #" + id + " still exists", e);
+            }
+        }
+
+        for (Path sst : sstToDelete) {
+            FileUtils.deleteOrWarn(sst);
+        }
+        for (String manifestName : manifestsToDelete) {
+            FileUtils.deleteOrWarn(pathFactory.toManifestFilePath(manifestName));
+        }
+    }
+}
diff --git a/flink-table-store-core/src/main/java/org/apache/flink/table/store/file/utils/FileStorePathFactory.java b/flink-table-store-core/src/main/java/org/apache/flink/table/store/file/utils/FileStorePathFactory.java
index 679e367..ab6a384 100644
--- a/flink-table-store-core/src/main/java/org/apache/flink/table/store/file/utils/FileStorePathFactory.java
+++ b/flink-table-store-core/src/main/java/org/apache/flink/table/store/file/utils/FileStorePathFactory.java
@@ -40,6 +40,8 @@ import javax.annotation.Nullable;
 import javax.annotation.concurrent.ThreadSafe;
 
 import java.io.IOException;
+import java.util.HashMap;
+import java.util.Map;
 import java.util.UUID;
 import java.util.concurrent.atomic.AtomicInteger;
 
@@ -152,4 +154,26 @@ public class FileStorePathFactory {
     public String uuid() {
         return uuid;
     }
+
+    /** Cache for storing {@link SstPathFactory}s. */
+    public static class SstPathFactoryCache {
+
+        private final FileStorePathFactory pathFactory;
+        private final Map<BinaryRowData, Map<Integer, SstPathFactory>> cache;
+
+        public SstPathFactoryCache(FileStorePathFactory pathFactory) {
+            this.pathFactory = pathFactory;
+            this.cache = new HashMap<>();
+        }
+
+        public SstPathFactory getSstPathFactory(BinaryRowData partition, int bucket) {
+            return cache.compute(partition, (p, m) -> m == null ? new HashMap<>() : m)
+                    .compute(
+                            bucket,
+                            (b, f) ->
+                                    f == null
+                                            ? pathFactory.createSstPathFactory(partition, bucket)
+                                            : f);
+        }
+    }
 }
diff --git a/flink-table-store-core/src/test/java/org/apache/flink/table/store/file/operation/FileStoreExpireTest.java b/flink-table-store-core/src/test/java/org/apache/flink/table/store/file/operation/FileStoreExpireTest.java
new file mode 100644
index 0000000..16a7ab1
--- /dev/null
+++ b/flink-table-store-core/src/test/java/org/apache/flink/table/store/file/operation/FileStoreExpireTest.java
@@ -0,0 +1,216 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.store.file.operation;
+
+import org.apache.flink.configuration.Configuration;
+import org.apache.flink.core.fs.FileSystem;
+import org.apache.flink.core.fs.Path;
+import org.apache.flink.table.data.binary.BinaryRowData;
+import org.apache.flink.table.store.file.FileFormat;
+import org.apache.flink.table.store.file.KeyValue;
+import org.apache.flink.table.store.file.Snapshot;
+import org.apache.flink.table.store.file.TestKeyValueGenerator;
+import org.apache.flink.table.store.file.utils.FileStorePathFactory;
+import org.apache.flink.table.store.file.utils.TestAtomicRenameFileSystem;
+
+import org.junit.jupiter.api.BeforeEach;
+import org.junit.jupiter.api.Test;
+import org.junit.jupiter.api.io.TempDir;
+
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.List;
+import java.util.Map;
+import java.util.concurrent.ThreadLocalRandom;
+
+import static org.assertj.core.api.Assertions.assertThat;
+
+/** Tests for {@link FileStoreExpireImpl}. */
+public class FileStoreExpireTest {
+
+    private final FileFormat avro =
+            FileFormat.fromIdentifier(
+                    FileStoreCommitTestBase.class.getClassLoader(), "avro", new Configuration());
+
+    private TestKeyValueGenerator gen;
+    @TempDir java.nio.file.Path tempDir;
+    private FileStorePathFactory pathFactory;
+
+    @BeforeEach
+    public void beforeEach() throws IOException {
+        gen = new TestKeyValueGenerator();
+        pathFactory =
+                OperationTestUtils.createPathFactory(
+                        TestAtomicRenameFileSystem.SCHEME, tempDir.toString());
+        Path root = new Path(tempDir.toString());
+        root.getFileSystem().mkdirs(new Path(root + "/snapshot"));
+    }
+
+    @Test
+    public void testNoSnapshot() {
+        FileStoreExpire expire =
+                OperationTestUtils.createExpire(3, Long.MAX_VALUE, avro, pathFactory);
+        expire.expire();
+
+        assertThat(pathFactory.latestSnapshotId()).isNull();
+    }
+
+    @Test
+    public void testNotEnoughSnapshots() throws Exception {
+        List<KeyValue> allData = new ArrayList<>();
+        List<Integer> snapshotPositions = new ArrayList<>();
+        commit(2, allData, snapshotPositions);
+        int latestSnapshotId = pathFactory.latestSnapshotId().intValue();
+        FileStoreExpire expire =
+                OperationTestUtils.createExpire(
+                        latestSnapshotId + 1, Long.MAX_VALUE, avro, pathFactory);
+        expire.expire();
+
+        FileSystem fs = pathFactory.toSnapshotPath(latestSnapshotId).getFileSystem();
+        for (int i = 1; i <= latestSnapshotId; i++) {
+            assertThat(fs.exists(pathFactory.toSnapshotPath(i))).isTrue();
+            assertSnapshot(i, allData, snapshotPositions);
+        }
+    }
+
+    @Test
+    public void testNeverExpire() throws Exception {
+        List<KeyValue> allData = new ArrayList<>();
+        List<Integer> snapshotPositions = new ArrayList<>();
+        commit(5, allData, snapshotPositions);
+        int latestSnapshotId = pathFactory.latestSnapshotId().intValue();
+        FileStoreExpire expire =
+                OperationTestUtils.createExpire(
+                        Integer.MAX_VALUE, Long.MAX_VALUE, avro, pathFactory);
+        expire.expire();
+
+        FileSystem fs = pathFactory.toSnapshotPath(latestSnapshotId).getFileSystem();
+        for (int i = 1; i <= latestSnapshotId; i++) {
+            assertThat(fs.exists(pathFactory.toSnapshotPath(i))).isTrue();
+            assertSnapshot(i, allData, snapshotPositions);
+        }
+    }
+
+    @Test
+    public void testKeepAtLeastOneSnapshot() throws Exception {
+        List<KeyValue> allData = new ArrayList<>();
+        List<Integer> snapshotPositions = new ArrayList<>();
+        commit(3, allData, snapshotPositions);
+        int latestSnapshotId = pathFactory.latestSnapshotId().intValue();
+        Thread.sleep(100);
+        FileStoreExpire expire =
+                OperationTestUtils.createExpire(Integer.MAX_VALUE, 1, avro, pathFactory);
+        expire.expire();
+
+        FileSystem fs = pathFactory.toSnapshotPath(latestSnapshotId).getFileSystem();
+        for (int i = 1; i < latestSnapshotId; i++) {
+            assertThat(fs.exists(pathFactory.toSnapshotPath(i))).isFalse();
+        }
+        assertThat(fs.exists(pathFactory.toSnapshotPath(latestSnapshotId))).isTrue();
+        assertSnapshot(latestSnapshotId, allData, snapshotPositions);
+    }
+
+    @Test
+    public void testExpireWithNumber() throws Exception {
+        FileStoreExpire expire =
+                OperationTestUtils.createExpire(3, Long.MAX_VALUE, avro, pathFactory);
+
+        List<KeyValue> allData = new ArrayList<>();
+        List<Integer> snapshotPositions = new ArrayList<>();
+        for (int i = 1; i <= 3; i++) {
+            commit(ThreadLocalRandom.current().nextInt(5) + 1, allData, snapshotPositions);
+            expire.expire();
+
+            int latestSnapshotId = pathFactory.latestSnapshotId().intValue();
+            FileSystem fs = pathFactory.toSnapshotPath(latestSnapshotId).getFileSystem();
+            for (int j = 1; j <= latestSnapshotId; j++) {
+                if (j > latestSnapshotId - 3) {
+                    assertThat(fs.exists(pathFactory.toSnapshotPath(j))).isTrue();
+                    assertSnapshot(j, allData, snapshotPositions);
+                } else {
+                    assertThat(fs.exists(pathFactory.toSnapshotPath(j))).isFalse();
+                }
+            }
+        }
+    }
+
+    @Test
+    public void testExpireWithTime() throws Exception {
+        FileStoreExpire expire =
+                OperationTestUtils.createExpire(Integer.MAX_VALUE, 1000, avro, pathFactory);
+
+        List<KeyValue> allData = new ArrayList<>();
+        List<Integer> snapshotPositions = new ArrayList<>();
+        commit(5, allData, snapshotPositions);
+        Thread.sleep(1500);
+        commit(5, allData, snapshotPositions);
+        long expireMillis = System.currentTimeMillis();
+        // expire twice to check for idempotence
+        expire.expire();
+        expire.expire();
+
+        int latestSnapshotId = pathFactory.latestSnapshotId().intValue();
+        FileSystem fs = pathFactory.toSnapshotPath(latestSnapshotId).getFileSystem();
+        for (int i = 1; i <= latestSnapshotId; i++) {
+            Path snapshotPath = pathFactory.toSnapshotPath(i);
+            if (fs.exists(snapshotPath)) {
+                assertThat(Snapshot.fromPath(snapshotPath).timeMillis())
+                        .isBetween(expireMillis - 1000, expireMillis);
+                assertSnapshot(i, allData, snapshotPositions);
+            }
+        }
+    }
+
+    private void commit(int numCommits, List<KeyValue> allData, List<Integer> snapshotPositions)
+            throws Exception {
+        for (int i = 0; i < numCommits; i++) {
+            int numRecords = ThreadLocalRandom.current().nextInt(100) + 1;
+            List<KeyValue> data = new ArrayList<>();
+            for (int j = 0; j < numRecords; j++) {
+                data.add(gen.next());
+            }
+            allData.addAll(data);
+            List<Snapshot> snapshots =
+                    OperationTestUtils.writeAndCommitData(
+                            data, gen::getPartition, kv -> 0, avro, pathFactory);
+            for (int j = 0; j < snapshots.size(); j++) {
+                snapshotPositions.add(allData.size());
+            }
+        }
+    }
+
+    private void assertSnapshot(
+            int snapshotId, List<KeyValue> allData, List<Integer> snapshotPositions)
+            throws Exception {
+        Map<BinaryRowData, BinaryRowData> expected =
+                OperationTestUtils.toKvMap(
+                        allData.subList(0, snapshotPositions.get(snapshotId - 1)));
+        List<KeyValue> actualKvs =
+                OperationTestUtils.readKvsFromManifestEntries(
+                        OperationTestUtils.createScan(avro, pathFactory)
+                                .withSnapshot(snapshotId)
+                                .plan()
+                                .files(),
+                        avro,
+                        pathFactory);
+        gen.sort(actualKvs);
+        Map<BinaryRowData, BinaryRowData> actual = OperationTestUtils.toKvMap(actualKvs);
+        assertThat(actual).isEqualTo(expected);
+    }
+}
diff --git a/flink-table-store-core/src/test/java/org/apache/flink/table/store/file/operation/FileStoreScanTest.java b/flink-table-store-core/src/test/java/org/apache/flink/table/store/file/operation/FileStoreScanTest.java
index 251c853..8a12007 100644
--- a/flink-table-store-core/src/test/java/org/apache/flink/table/store/file/operation/FileStoreScanTest.java
+++ b/flink-table-store-core/src/test/java/org/apache/flink/table/store/file/operation/FileStoreScanTest.java
@@ -25,12 +25,9 @@ import org.apache.flink.table.store.file.FileFormat;
 import org.apache.flink.table.store.file.KeyValue;
 import org.apache.flink.table.store.file.Snapshot;
 import org.apache.flink.table.store.file.TestKeyValueGenerator;
-import org.apache.flink.table.store.file.manifest.ManifestCommittable;
 import org.apache.flink.table.store.file.manifest.ManifestFileMeta;
 import org.apache.flink.table.store.file.manifest.ManifestList;
-import org.apache.flink.table.store.file.mergetree.Increment;
 import org.apache.flink.table.store.file.utils.FileStorePathFactory;
-import org.apache.flink.table.store.file.utils.RecordWriter;
 import org.apache.flink.table.store.file.utils.TestAtomicRenameFileSystem;
 
 import org.junit.jupiter.api.BeforeEach;
@@ -40,14 +37,10 @@ import org.junit.jupiter.api.io.TempDir;
 import java.io.IOException;
 import java.util.ArrayList;
 import java.util.Collection;
-import java.util.Collections;
-import java.util.HashMap;
 import java.util.HashSet;
 import java.util.List;
 import java.util.Map;
 import java.util.Set;
-import java.util.concurrent.ExecutorService;
-import java.util.concurrent.Executors;
 import java.util.concurrent.ThreadLocalRandom;
 import java.util.stream.Collectors;
 
@@ -205,48 +198,10 @@ public class FileStoreScanTest {
     }
 
     private Snapshot writeData(List<KeyValue> kvs) throws Exception {
-        FileStoreWrite write = OperationTestUtils.createWrite(avro, pathFactory);
-        Map<BinaryRowData, Map<Integer, RecordWriter>> writers = new HashMap<>();
-        for (KeyValue kv : kvs) {
-            BinaryRowData partition = gen.getPartition(kv);
-            int bucket = getBucket(kv);
-            writers.compute(partition, (p, m) -> m == null ? new HashMap<>() : m)
-                    .compute(
-                            bucket,
-                            (b, w) -> {
-                                if (w == null) {
-                                    ExecutorService service = Executors.newSingleThreadExecutor();
-                                    return write.createWriter(partition, bucket, service);
-                                } else {
-                                    return w;
-                                }
-                            })
-                    .write(kv.valueKind(), kv.key(), kv.value());
-        }
-
-        FileStoreCommit commit = OperationTestUtils.createCommit(avro, pathFactory);
-        ManifestCommittable committable = new ManifestCommittable();
-        for (Map.Entry<BinaryRowData, Map<Integer, RecordWriter>> entryWithPartition :
-                writers.entrySet()) {
-            for (Map.Entry<Integer, RecordWriter> entryWithBucket :
-                    entryWithPartition.getValue().entrySet()) {
-                Increment increment = entryWithBucket.getValue().prepareCommit();
-                committable.add(entryWithPartition.getKey(), entryWithBucket.getKey(), increment);
-            }
-        }
-        commit.commit(committable, Collections.emptyMap());
-        writers.values().stream()
-                .flatMap(m -> m.values().stream())
-                .forEach(
-                        w -> {
-                            try {
-                                w.close();
-                            } catch (Exception e) {
-                                throw new RuntimeException(e);
-                            }
-                        });
-
-        return Snapshot.fromPath(pathFactory.toSnapshotPath(pathFactory.latestSnapshotId()));
+        List<Snapshot> snapshots =
+                OperationTestUtils.writeAndCommitData(
+                        kvs, gen::getPartition, this::getBucket, avro, pathFactory);
+        return snapshots.get(snapshots.size() - 1);
     }
 
     private int getBucket(KeyValue kv) {
diff --git a/flink-table-store-core/src/test/java/org/apache/flink/table/store/file/operation/OperationTestUtils.java b/flink-table-store-core/src/test/java/org/apache/flink/table/store/file/operation/OperationTestUtils.java
index bce303b..ef93d26 100644
--- a/flink-table-store-core/src/test/java/org/apache/flink/table/store/file/operation/OperationTestUtils.java
+++ b/flink-table-store-core/src/test/java/org/apache/flink/table/store/file/operation/OperationTestUtils.java
@@ -25,25 +25,33 @@ import org.apache.flink.table.data.binary.BinaryRowData;
 import org.apache.flink.table.store.file.FileFormat;
 import org.apache.flink.table.store.file.FileStoreOptions;
 import org.apache.flink.table.store.file.KeyValue;
+import org.apache.flink.table.store.file.Snapshot;
 import org.apache.flink.table.store.file.TestKeyValueGenerator;
+import org.apache.flink.table.store.file.manifest.ManifestCommittable;
 import org.apache.flink.table.store.file.manifest.ManifestCommittableSerializer;
 import org.apache.flink.table.store.file.manifest.ManifestEntry;
 import org.apache.flink.table.store.file.manifest.ManifestFile;
 import org.apache.flink.table.store.file.manifest.ManifestList;
+import org.apache.flink.table.store.file.mergetree.Increment;
 import org.apache.flink.table.store.file.mergetree.MergeTreeFactory;
 import org.apache.flink.table.store.file.mergetree.MergeTreeOptions;
 import org.apache.flink.table.store.file.mergetree.compact.DeduplicateAccumulator;
 import org.apache.flink.table.store.file.mergetree.sst.SstFile;
 import org.apache.flink.table.store.file.utils.FileStorePathFactory;
 import org.apache.flink.table.store.file.utils.RecordReaderIterator;
+import org.apache.flink.table.store.file.utils.RecordWriter;
 
 import java.io.IOException;
 import java.util.ArrayList;
+import java.util.Collections;
 import java.util.HashMap;
 import java.util.List;
 import java.util.Map;
 import java.util.UUID;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.Executors;
 import java.util.concurrent.ThreadLocalRandom;
+import java.util.function.Function;
 
 /** Utils for operation tests. */
 public class OperationTestUtils {
@@ -106,11 +114,24 @@ public class OperationTestUtils {
                         new DeduplicateAccumulator(),
                         fileFormat,
                         pathFactory,
-                        OperationTestUtils.getMergeTreeOptions(false));
+                        getMergeTreeOptions(false));
         return new FileStoreWriteImpl(
                 pathFactory, mergeTreeFactory, createScan(fileFormat, pathFactory));
     }
 
+    public static FileStoreExpire createExpire(
+            int numRetained,
+            long millisRetained,
+            FileFormat fileFormat,
+            FileStorePathFactory pathFactory) {
+        return new FileStoreExpireImpl(
+                numRetained,
+                millisRetained,
+                pathFactory,
+                createManifestListFactory(fileFormat, pathFactory),
+                createScan(fileFormat, pathFactory));
+    }
+
     public static FileStorePathFactory createPathFactory(String scheme, String root) {
         return new FileStorePathFactory(
                 new Path(scheme + "://" + root), TestKeyValueGenerator.PARTITION_TYPE, "default");
@@ -132,6 +153,71 @@ public class OperationTestUtils {
                 TestKeyValueGenerator.PARTITION_TYPE, fileFormat, pathFactory);
     }
 
+    public static List<Snapshot> writeAndCommitData(
+            List<KeyValue> kvs,
+            Function<KeyValue, BinaryRowData> partitionCalculator,
+            Function<KeyValue, Integer> bucketCalculator,
+            FileFormat fileFormat,
+            FileStorePathFactory pathFactory)
+            throws Exception {
+        FileStoreWrite write = createWrite(fileFormat, pathFactory);
+        Map<BinaryRowData, Map<Integer, RecordWriter>> writers = new HashMap<>();
+        for (KeyValue kv : kvs) {
+            BinaryRowData partition = partitionCalculator.apply(kv);
+            int bucket = bucketCalculator.apply(kv);
+            writers.compute(partition, (p, m) -> m == null ? new HashMap<>() : m)
+                    .compute(
+                            bucket,
+                            (b, w) -> {
+                                if (w == null) {
+                                    ExecutorService service = Executors.newSingleThreadExecutor();
+                                    return write.createWriter(partition, bucket, service);
+                                } else {
+                                    return w;
+                                }
+                            })
+                    .write(kv.valueKind(), kv.key(), kv.value());
+        }
+
+        FileStoreCommit commit = createCommit(fileFormat, pathFactory);
+        ManifestCommittable committable = new ManifestCommittable();
+        for (Map.Entry<BinaryRowData, Map<Integer, RecordWriter>> entryWithPartition :
+                writers.entrySet()) {
+            for (Map.Entry<Integer, RecordWriter> entryWithBucket :
+                    entryWithPartition.getValue().entrySet()) {
+                Increment increment = entryWithBucket.getValue().prepareCommit();
+                committable.add(entryWithPartition.getKey(), entryWithBucket.getKey(), increment);
+            }
+        }
+
+        Long snapshotIdBeforeCommit = pathFactory.latestSnapshotId();
+        if (snapshotIdBeforeCommit == null) {
+            snapshotIdBeforeCommit = Snapshot.FIRST_SNAPSHOT_ID - 1;
+        }
+        commit.commit(committable, Collections.emptyMap());
+        Long snapshotIdAfterCommit = pathFactory.latestSnapshotId();
+        if (snapshotIdAfterCommit == null) {
+            snapshotIdAfterCommit = Snapshot.FIRST_SNAPSHOT_ID - 1;
+        }
+
+        writers.values().stream()
+                .flatMap(m -> m.values().stream())
+                .forEach(
+                        w -> {
+                            try {
+                                w.close();
+                            } catch (Exception e) {
+                                throw new RuntimeException(e);
+                            }
+                        });
+
+        List<Snapshot> snapshots = new ArrayList<>();
+        for (long id = snapshotIdBeforeCommit + 1; id <= snapshotIdAfterCommit; id++) {
+            snapshots.add(Snapshot.fromPath(pathFactory.toSnapshotPath(id)));
+        }
+        return snapshots;
+    }
+
     public static List<KeyValue> readKvsFromSnapshot(
             long snapshotId, FileFormat fileFormat, FileStorePathFactory pathFactory)
             throws IOException {