You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by lz...@apache.org on 2022/03/03 06:18:13 UTC

[flink-table-store] branch master updated: [FLINK-26247] Optimize expire by only reading new changes in a snapshot

This is an automated email from the ASF dual-hosted git repository.

lzljs3620320 pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/flink-table-store.git


The following commit(s) were added to refs/heads/master by this push:
     new 57134d3  [FLINK-26247] Optimize expire by only reading new changes in a snapshot
57134d3 is described below

commit 57134d35fdd7bf3db1fd5048207fc4bacf198667
Author: tsreaper <ts...@gmail.com>
AuthorDate: Thu Mar 3 14:18:06 2022 +0800

    [FLINK-26247] Optimize expire by only reading new changes in a snapshot
    
    This closes #30
---
 .../flink/table/store/file/FileStoreImpl.java      |   7 +-
 .../apache/flink/table/store/file/Snapshot.java    |  41 +++++--
 .../table/store/file/manifest/ManifestFile.java    |   4 -
 .../store/file/operation/FileStoreCommitImpl.java  |  38 ++++--
 .../store/file/operation/FileStoreExpireImpl.java  | 128 +++++++++++++--------
 .../store/file/operation/FileStoreScanImpl.java    |   2 +-
 .../flink/table/store/file/utils/FileUtils.java    |   4 +
 .../flink/table/store/file/TestFileStore.java      |  87 +++++++++++++-
 .../store/file/operation/FileStoreExpireTest.java  |   6 +
 .../store/file/operation/FileStoreScanTest.java    |  10 +-
 10 files changed, 245 insertions(+), 82 deletions(-)

diff --git a/flink-table-store-core/src/main/java/org/apache/flink/table/store/file/FileStoreImpl.java b/flink-table-store-core/src/main/java/org/apache/flink/table/store/file/FileStoreImpl.java
index 3249c35..d670ef6 100644
--- a/flink-table-store-core/src/main/java/org/apache/flink/table/store/file/FileStoreImpl.java
+++ b/flink-table-store-core/src/main/java/org/apache/flink/table/store/file/FileStoreImpl.java
@@ -78,7 +78,8 @@ public class FileStoreImpl implements FileStore {
                 options.path(), partitionType, options.partitionDefaultName());
     }
 
-    private ManifestFile.Factory manifestFileFactory() {
+    @VisibleForTesting
+    public ManifestFile.Factory manifestFileFactory() {
         return new ManifestFile.Factory(
                 partitionType,
                 keyType,
@@ -141,8 +142,8 @@ public class FileStoreImpl implements FileStore {
                 options.snapshotNumRetain(),
                 options.snapshotTimeRetain().toMillis(),
                 pathFactory(),
-                manifestListFactory(),
-                newScan());
+                manifestFileFactory(),
+                manifestListFactory());
     }
 
     @Override
diff --git a/flink-table-store-core/src/main/java/org/apache/flink/table/store/file/Snapshot.java b/flink-table-store-core/src/main/java/org/apache/flink/table/store/file/Snapshot.java
index e30286d..6bb343b 100644
--- a/flink-table-store-core/src/main/java/org/apache/flink/table/store/file/Snapshot.java
+++ b/flink-table-store-core/src/main/java/org/apache/flink/table/store/file/Snapshot.java
@@ -19,6 +19,8 @@
 package org.apache.flink.table.store.file;
 
 import org.apache.flink.core.fs.Path;
+import org.apache.flink.table.store.file.manifest.ManifestFileMeta;
+import org.apache.flink.table.store.file.manifest.ManifestList;
 import org.apache.flink.table.store.file.utils.FileUtils;
 
 import org.apache.flink.shaded.jackson2.com.fasterxml.jackson.annotation.JsonCreator;
@@ -28,6 +30,8 @@ import org.apache.flink.shaded.jackson2.com.fasterxml.jackson.core.JsonProcessin
 import org.apache.flink.shaded.jackson2.com.fasterxml.jackson.databind.ObjectMapper;
 
 import java.io.IOException;
+import java.util.ArrayList;
+import java.util.List;
 
 /** This file is the entrance to all data committed at some specific time point. */
 public class Snapshot {
@@ -35,7 +39,8 @@ public class Snapshot {
     public static final long FIRST_SNAPSHOT_ID = 1;
 
     private static final String FIELD_ID = "id";
-    private static final String FIELD_MANIFEST_LIST = "manifestList";
+    private static final String FIELD_BASE_MANIFEST_LIST = "baseManifestList";
+    private static final String FIELD_DELTA_MANIFEST_LIST = "deltaManifestList";
     private static final String FIELD_COMMIT_USER = "commitUser";
     private static final String FIELD_COMMIT_IDENTIFIER = "commitIdentifier";
     private static final String FIELD_COMMIT_KIND = "commitKind";
@@ -44,8 +49,14 @@ public class Snapshot {
     @JsonProperty(FIELD_ID)
     private final long id;
 
-    @JsonProperty(FIELD_MANIFEST_LIST)
-    private final String manifestList;
+    // a manifest list recording all changes from the previous snapshots
+    @JsonProperty(FIELD_BASE_MANIFEST_LIST)
+    private final String baseManifestList;
+
+    // a manifest list recording all new changes occurred in this snapshot
+    // for faster expire and streaming reads
+    @JsonProperty(FIELD_DELTA_MANIFEST_LIST)
+    private final String deltaManifestList;
 
     @JsonProperty(FIELD_COMMIT_USER)
     private final String commitUser;
@@ -63,13 +74,15 @@ public class Snapshot {
     @JsonCreator
     public Snapshot(
             @JsonProperty(FIELD_ID) long id,
-            @JsonProperty(FIELD_MANIFEST_LIST) String manifestList,
+            @JsonProperty(FIELD_BASE_MANIFEST_LIST) String baseManifestList,
+            @JsonProperty(FIELD_DELTA_MANIFEST_LIST) String deltaManifestList,
             @JsonProperty(FIELD_COMMIT_USER) String commitUser,
             @JsonProperty(FIELD_COMMIT_IDENTIFIER) String commitIdentifier,
             @JsonProperty(FIELD_COMMIT_KIND) CommitKind commitKind,
             @JsonProperty(FIELD_TIME_MILLIS) long timeMillis) {
         this.id = id;
-        this.manifestList = manifestList;
+        this.baseManifestList = baseManifestList;
+        this.deltaManifestList = deltaManifestList;
         this.commitUser = commitUser;
         this.commitIdentifier = commitIdentifier;
         this.commitKind = commitKind;
@@ -81,9 +94,14 @@ public class Snapshot {
         return id;
     }
 
-    @JsonGetter(FIELD_MANIFEST_LIST)
-    public String manifestList() {
-        return manifestList;
+    @JsonGetter(FIELD_BASE_MANIFEST_LIST)
+    public String baseManifestList() {
+        return baseManifestList;
+    }
+
+    @JsonGetter(FIELD_DELTA_MANIFEST_LIST)
+    public String deltaManifestList() {
+        return deltaManifestList;
     }
 
     @JsonGetter(FIELD_COMMIT_USER)
@@ -114,6 +132,13 @@ public class Snapshot {
         }
     }
 
+    public List<ManifestFileMeta> readAllManifests(ManifestList manifestList) {
+        List<ManifestFileMeta> result = new ArrayList<>();
+        result.addAll(manifestList.read(baseManifestList));
+        result.addAll(manifestList.read(deltaManifestList));
+        return result;
+    }
+
     public static Snapshot fromJson(String json) {
         try {
             return new ObjectMapper().readValue(json, Snapshot.class);
diff --git a/flink-table-store-core/src/main/java/org/apache/flink/table/store/file/manifest/ManifestFile.java b/flink-table-store-core/src/main/java/org/apache/flink/table/store/file/manifest/ManifestFile.java
index 6382817..57ec343 100644
--- a/flink-table-store-core/src/main/java/org/apache/flink/table/store/file/manifest/ManifestFile.java
+++ b/flink-table-store-core/src/main/java/org/apache/flink/table/store/file/manifest/ManifestFile.java
@@ -31,7 +31,6 @@ import org.apache.flink.table.store.file.utils.FileStorePathFactory;
 import org.apache.flink.table.store.file.utils.FileUtils;
 import org.apache.flink.table.store.file.utils.RollingFile;
 import org.apache.flink.table.types.logical.RowType;
-import org.apache.flink.util.Preconditions;
 
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
@@ -90,9 +89,6 @@ public class ManifestFile {
      * <p>NOTE: This method is atomic.
      */
     public List<ManifestFileMeta> write(List<ManifestEntry> entries) {
-        Preconditions.checkArgument(
-                entries.size() > 0, "Manifest entries to write must not be empty.");
-
         ManifestRollingFile rollingFile = new ManifestRollingFile();
         List<ManifestFileMeta> result = new ArrayList<>();
         List<Path> filesToCleanUp = new ArrayList<>();
diff --git a/flink-table-store-core/src/main/java/org/apache/flink/table/store/file/operation/FileStoreCommitImpl.java b/flink-table-store-core/src/main/java/org/apache/flink/table/store/file/operation/FileStoreCommitImpl.java
index 1bfdf1b..9d0212f 100644
--- a/flink-table-store-core/src/main/java/org/apache/flink/table/store/file/operation/FileStoreCommitImpl.java
+++ b/flink-table-store-core/src/main/java/org/apache/flink/table/store/file/operation/FileStoreCommitImpl.java
@@ -301,13 +301,14 @@ public class FileStoreCommitImpl implements FileStoreCommit {
         }
 
         Snapshot newSnapshot;
-        String manifestListName = null;
+        String previousChangesListName = null;
+        String newChangesListName = null;
         List<ManifestFileMeta> oldMetas = new ArrayList<>();
         List<ManifestFileMeta> newMetas = new ArrayList<>();
         try {
             if (latestSnapshot != null) {
                 // read all previous manifest files
-                oldMetas.addAll(manifestList.read(latestSnapshot.manifestList()));
+                oldMetas.addAll(latestSnapshot.readAllManifests(manifestList));
             }
             // merge manifest files with changes
             newMetas.addAll(
@@ -316,16 +317,19 @@ public class FileStoreCommitImpl implements FileStoreCommit {
                             manifestFile,
                             manifestTargetSize.getBytes(),
                             manifestMergeMinCount));
+            previousChangesListName = manifestList.write(newMetas);
+
             // write new changes into manifest files
-            if (!changes.isEmpty()) {
-                newMetas.addAll(manifestFile.write(changes));
-            }
+            List<ManifestFileMeta> newChangesManifests = manifestFile.write(changes);
+            newMetas.addAll(newChangesManifests);
+            newChangesListName = manifestList.write(newChangesManifests);
+
             // prepare snapshot file
-            manifestListName = manifestList.write(newMetas);
             newSnapshot =
                     new Snapshot(
                             newSnapshotId,
-                            manifestListName,
+                            previousChangesListName,
+                            newChangesListName,
                             commitUser,
                             identifier,
                             commitKind,
@@ -333,7 +337,12 @@ public class FileStoreCommitImpl implements FileStoreCommit {
             FileUtils.writeFileUtf8(tmpSnapshotPath, newSnapshot.toJson());
         } catch (Throwable e) {
             // fails when preparing for commit, we should clean up
-            cleanUpTmpSnapshot(tmpSnapshotPath, manifestListName, oldMetas, newMetas);
+            cleanUpTmpSnapshot(
+                    tmpSnapshotPath,
+                    previousChangesListName,
+                    newChangesListName,
+                    oldMetas,
+                    newMetas);
             throw new RuntimeException(
                     String.format(
                             "Exception occurs when preparing snapshot #%d (path %s) by user %s "
@@ -406,7 +415,8 @@ public class FileStoreCommitImpl implements FileStoreCommit {
                         commitUser,
                         identifier,
                         commitKind.name()));
-        cleanUpTmpSnapshot(tmpSnapshotPath, manifestListName, oldMetas, newMetas);
+        cleanUpTmpSnapshot(
+                tmpSnapshotPath, previousChangesListName, newChangesListName, oldMetas, newMetas);
         return false;
     }
 
@@ -457,14 +467,18 @@ public class FileStoreCommitImpl implements FileStoreCommit {
 
     private void cleanUpTmpSnapshot(
             Path tmpSnapshotPath,
-            String manifestListName,
+            String previousChangesListName,
+            String newChangesListName,
             List<ManifestFileMeta> oldMetas,
             List<ManifestFileMeta> newMetas) {
         // clean up tmp snapshot file
         FileUtils.deleteOrWarn(tmpSnapshotPath);
         // clean up newly created manifest list
-        if (manifestListName != null) {
-            manifestList.delete(manifestListName);
+        if (previousChangesListName != null) {
+            manifestList.delete(previousChangesListName);
+        }
+        if (newChangesListName != null) {
+            manifestList.delete(newChangesListName);
         }
         // clean up newly merged manifest files
         Set<ManifestFileMeta> oldMetaSet = new HashSet<>(oldMetas); // for faster searching
diff --git a/flink-table-store-core/src/main/java/org/apache/flink/table/store/file/operation/FileStoreExpireImpl.java b/flink-table-store-core/src/main/java/org/apache/flink/table/store/file/operation/FileStoreExpireImpl.java
index e7a03d9..e6dd7e3 100644
--- a/flink-table-store-core/src/main/java/org/apache/flink/table/store/file/operation/FileStoreExpireImpl.java
+++ b/flink-table-store-core/src/main/java/org/apache/flink/table/store/file/operation/FileStoreExpireImpl.java
@@ -21,14 +21,19 @@ package org.apache.flink.table.store.file.operation;
 import org.apache.flink.core.fs.Path;
 import org.apache.flink.table.store.file.Snapshot;
 import org.apache.flink.table.store.file.manifest.ManifestEntry;
+import org.apache.flink.table.store.file.manifest.ManifestFile;
 import org.apache.flink.table.store.file.manifest.ManifestFileMeta;
 import org.apache.flink.table.store.file.manifest.ManifestList;
 import org.apache.flink.table.store.file.mergetree.sst.SstPathFactory;
 import org.apache.flink.table.store.file.utils.FileStorePathFactory;
 import org.apache.flink.table.store.file.utils.FileUtils;
 
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
 import java.io.IOException;
 import java.util.HashSet;
+import java.util.List;
 import java.util.Set;
 
 /**
@@ -40,25 +45,27 @@ import java.util.Set;
  */
 public class FileStoreExpireImpl implements FileStoreExpire {
 
+    private static final Logger LOG = LoggerFactory.getLogger(FileStoreExpireImpl.class);
+
     // snapshots exceeding any constraint will be expired
     private final int numRetained;
     private final long millisRetained;
 
     private final FileStorePathFactory pathFactory;
+    private final ManifestFile manifestFile;
     private final ManifestList manifestList;
-    private final FileStoreScan scan;
 
     public FileStoreExpireImpl(
             int numRetained,
             long millisRetained,
             FileStorePathFactory pathFactory,
-            ManifestList.Factory manifestListFactory,
-            FileStoreScan scan) {
+            ManifestFile.Factory manifestFileFactory,
+            ManifestList.Factory manifestListFactory) {
         this.numRetained = numRetained;
         this.millisRetained = millisRetained;
         this.pathFactory = pathFactory;
+        this.manifestFile = manifestFileFactory.create();
         this.manifestList = manifestListFactory.create();
-        this.scan = scan;
     }
 
     @Override
@@ -95,72 +102,99 @@ public class FileStoreExpireImpl implements FileStoreExpire {
         expireUntil(latestSnapshotId);
     }
 
-    private void expireUntil(long exclusiveId) {
-        if (exclusiveId <= Snapshot.FIRST_SNAPSHOT_ID) {
+    private void expireUntil(long endExclusiveId) {
+        if (endExclusiveId <= Snapshot.FIRST_SNAPSHOT_ID) {
             // fast exit
             return;
         }
 
-        Snapshot exclusiveSnapshot = Snapshot.fromPath(pathFactory.toSnapshotPath(exclusiveId));
-
-        // if sst file is only used in snapshots to expire but not in next snapshot we can delete it
-        // because each sst file will only be added and deleted once
-        Set<Path> sstInUse = new HashSet<>();
-        FileStorePathFactory.SstPathFactoryCache sstPathFactoryCache =
-                new FileStorePathFactory.SstPathFactoryCache(pathFactory);
-        for (ManifestEntry entry : scan.withSnapshot(exclusiveId).plan().files()) {
-            SstPathFactory sstPathFactory =
-                    sstPathFactoryCache.getSstPathFactory(entry.partition(), entry.bucket());
-            sstInUse.add(sstPathFactory.toPath(entry.file().fileName()));
-        }
-
-        // the same with sst, manifests are only added and deleted once
-        Set<ManifestFileMeta> manifestsInUse =
-                new HashSet<>(manifestList.read(exclusiveSnapshot.manifestList()));
-
-        Set<Path> sstToDelete = new HashSet<>();
-        Set<String> manifestsToDelete = new HashSet<>();
-
-        for (long id = exclusiveId - 1; id >= Snapshot.FIRST_SNAPSHOT_ID; id--) {
+        // find first snapshot to expire
+        long beginInclusiveId = Snapshot.FIRST_SNAPSHOT_ID;
+        for (long id = endExclusiveId - 1; id >= Snapshot.FIRST_SNAPSHOT_ID; id--) {
             Path snapshotPath = pathFactory.toSnapshotPath(id);
             try {
                 if (!snapshotPath.getFileSystem().exists(snapshotPath)) {
                     // only latest snapshots are retained, as we cannot find this snapshot, we can
                     // assume that all snapshots preceding it have been removed
+                    beginInclusiveId = id + 1;
                     break;
                 }
+            } catch (IOException e) {
+                throw new RuntimeException(
+                        "Failed to determine if snapshot #" + id + " still exists", e);
+            }
+        }
+        if (LOG.isDebugEnabled()) {
+            LOG.debug(
+                    "Snapshot expire range is [" + beginInclusiveId + ", " + endExclusiveId + ")");
+        }
 
-                Snapshot toExpire = Snapshot.fromPath(pathFactory.toSnapshotPath(id));
+        // delete sst files
+        FileStorePathFactory.SstPathFactoryCache sstPathFactoryCache =
+                new FileStorePathFactory.SstPathFactoryCache(pathFactory);
+        // deleted sst files in a snapshot are not used by that snapshot, so the range of id should
+        // be (beginInclusiveId, endExclusiveId]
+        for (long id = beginInclusiveId + 1; id <= endExclusiveId; id++) {
+            if (LOG.isDebugEnabled()) {
+                LOG.debug("Ready to delete sst files in snapshot #" + id);
+            }
+
+            Snapshot toExpire = Snapshot.fromPath(pathFactory.toSnapshotPath(id));
+            List<ManifestFileMeta> deltaManifests = manifestList.read(toExpire.deltaManifestList());
 
-                for (ManifestEntry entry : scan.withSnapshot(toExpire.id()).plan().files()) {
+            // we cannot delete an sst file directly when we meet a DELETE entry, because that
+            // file might be upgraded
+            Set<Path> sstToDelete = new HashSet<>();
+            for (ManifestFileMeta meta : deltaManifests) {
+                for (ManifestEntry entry : manifestFile.read(meta.fileName())) {
                     SstPathFactory sstPathFactory =
                             sstPathFactoryCache.getSstPathFactory(
                                     entry.partition(), entry.bucket());
                     Path sstPath = sstPathFactory.toPath(entry.file().fileName());
-                    if (!sstInUse.contains(sstPath)) {
-                        sstToDelete.add(sstPath);
+                    switch (entry.kind()) {
+                        case ADD:
+                            sstToDelete.remove(sstPath);
+                            break;
+                        case DELETE:
+                            sstToDelete.add(sstPath);
+                            break;
+                        default:
+                            throw new UnsupportedOperationException(
+                                    "Unknown value kind " + entry.kind().name());
                     }
                 }
+            }
+            for (Path sst : sstToDelete) {
+                FileUtils.deleteOrWarn(sst);
+            }
+        }
 
-                for (ManifestFileMeta manifest : manifestList.read(toExpire.manifestList())) {
-                    if (!manifestsInUse.contains(manifest)) {
-                        manifestsToDelete.add(manifest.fileName());
-                    }
-                }
+        // delete manifests
+        Snapshot exclusiveSnapshot = Snapshot.fromPath(pathFactory.toSnapshotPath(endExclusiveId));
+        Set<ManifestFileMeta> manifestsInUse =
+                new HashSet<>(exclusiveSnapshot.readAllManifests(manifestList));
+        // to avoid deleting twice
+        Set<ManifestFileMeta> deletedManifests = new HashSet<>();
+        for (long id = beginInclusiveId; id < endExclusiveId; id++) {
+            if (LOG.isDebugEnabled()) {
+                LOG.debug("Ready to delete manifests in snapshot #" + id);
+            }
 
-                manifestList.delete(toExpire.manifestList());
-                FileUtils.deleteOrWarn(pathFactory.toSnapshotPath(id));
-            } catch (IOException e) {
-                throw new RuntimeException(
-                        "Failed to determine if snapshot #" + id + " still exists", e);
+            Snapshot toExpire = Snapshot.fromPath(pathFactory.toSnapshotPath(id));
+
+            for (ManifestFileMeta manifest : toExpire.readAllManifests(manifestList)) {
+                if (!manifestsInUse.contains(manifest) && !deletedManifests.contains(manifest)) {
+                    manifestFile.delete(manifest.fileName());
+                    deletedManifests.add(manifest);
+                }
             }
-        }
 
-        for (Path sst : sstToDelete) {
-            FileUtils.deleteOrWarn(sst);
-        }
-        for (String manifestName : manifestsToDelete) {
-            FileUtils.deleteOrWarn(pathFactory.toManifestFilePath(manifestName));
+            // delete manifest lists
+            manifestList.delete(toExpire.baseManifestList());
+            manifestList.delete(toExpire.deltaManifestList());
+
+            // delete snapshot
+            FileUtils.deleteOrWarn(pathFactory.toSnapshotPath(id));
         }
     }
 }
diff --git a/flink-table-store-core/src/main/java/org/apache/flink/table/store/file/operation/FileStoreScanImpl.java b/flink-table-store-core/src/main/java/org/apache/flink/table/store/file/operation/FileStoreScanImpl.java
index 580d5af..d99a792 100644
--- a/flink-table-store-core/src/main/java/org/apache/flink/table/store/file/operation/FileStoreScanImpl.java
+++ b/flink-table-store-core/src/main/java/org/apache/flink/table/store/file/operation/FileStoreScanImpl.java
@@ -155,7 +155,7 @@ public class FileStoreScanImpl implements FileStoreScan {
                 manifests = Collections.emptyList();
             } else {
                 Snapshot snapshot = Snapshot.fromPath(pathFactory.toSnapshotPath(snapshotId));
-                manifests = manifestList.read(snapshot.manifestList());
+                manifests = snapshot.readAllManifests(manifestList);
             }
         }
 
diff --git a/flink-table-store-core/src/main/java/org/apache/flink/table/store/file/utils/FileUtils.java b/flink-table-store-core/src/main/java/org/apache/flink/table/store/file/utils/FileUtils.java
index 50cce1d..e797a28 100644
--- a/flink-table-store-core/src/main/java/org/apache/flink/table/store/file/utils/FileUtils.java
+++ b/flink-table-store-core/src/main/java/org/apache/flink/table/store/file/utils/FileUtils.java
@@ -110,6 +110,10 @@ public class FileUtils {
     }
 
     public static void deleteOrWarn(Path file) {
+        if (LOG.isDebugEnabled()) {
+            LOG.debug("Ready to delete " + file.toString());
+        }
+
         try {
             FileSystem fs = file.getFileSystem();
             if (!fs.delete(file, false) && fs.exists(file)) {
diff --git a/flink-table-store-core/src/test/java/org/apache/flink/table/store/file/TestFileStore.java b/flink-table-store-core/src/test/java/org/apache/flink/table/store/file/TestFileStore.java
index 38b4057..0057435 100644
--- a/flink-table-store-core/src/test/java/org/apache/flink/table/store/file/TestFileStore.java
+++ b/flink-table-store-core/src/test/java/org/apache/flink/table/store/file/TestFileStore.java
@@ -20,10 +20,13 @@ package org.apache.flink.table.store.file;
 
 import org.apache.flink.configuration.Configuration;
 import org.apache.flink.configuration.MemorySize;
+import org.apache.flink.core.fs.Path;
 import org.apache.flink.table.data.binary.BinaryRowData;
 import org.apache.flink.table.runtime.typeutils.RowDataSerializer;
 import org.apache.flink.table.store.file.manifest.ManifestCommittable;
 import org.apache.flink.table.store.file.manifest.ManifestEntry;
+import org.apache.flink.table.store.file.manifest.ManifestFileMeta;
+import org.apache.flink.table.store.file.manifest.ManifestList;
 import org.apache.flink.table.store.file.mergetree.Increment;
 import org.apache.flink.table.store.file.mergetree.MergeTreeOptions;
 import org.apache.flink.table.store.file.mergetree.compact.Accumulator;
@@ -31,6 +34,7 @@ import org.apache.flink.table.store.file.mergetree.sst.SstFileMeta;
 import org.apache.flink.table.store.file.operation.FileStoreCommit;
 import org.apache.flink.table.store.file.operation.FileStoreExpireImpl;
 import org.apache.flink.table.store.file.operation.FileStoreRead;
+import org.apache.flink.table.store.file.operation.FileStoreScan;
 import org.apache.flink.table.store.file.operation.FileStoreWrite;
 import org.apache.flink.table.store.file.utils.FileStorePathFactory;
 import org.apache.flink.table.store.file.utils.RecordReaderIterator;
@@ -42,24 +46,32 @@ import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
 import java.io.IOException;
+import java.nio.file.Files;
+import java.nio.file.Paths;
 import java.util.ArrayList;
 import java.util.Collections;
 import java.util.HashMap;
+import java.util.HashSet;
 import java.util.List;
 import java.util.Map;
 import java.util.Random;
+import java.util.Set;
 import java.util.UUID;
 import java.util.concurrent.ExecutorService;
 import java.util.concurrent.Executors;
 import java.util.concurrent.ThreadLocalRandom;
 import java.util.function.BiConsumer;
 import java.util.function.Function;
+import java.util.stream.Collectors;
+
+import static org.assertj.core.api.Assertions.assertThat;
 
 /** {@link FileStore} for tests. */
 public class TestFileStore extends FileStoreImpl {
 
     private static final Logger LOG = LoggerFactory.getLogger(TestFileStore.class);
 
+    private final String root;
     private final RowDataSerializer keySerializer;
     private final RowDataSerializer valueSerializer;
 
@@ -96,13 +108,18 @@ public class TestFileStore extends FileStoreImpl {
             RowType valueType,
             Accumulator accumulator) {
         super(conf, UUID.randomUUID().toString(), partitionType, keyType, valueType, accumulator);
+        this.root = conf.getString(FileStoreOptions.FILE_PATH);
         this.keySerializer = new RowDataSerializer(keyType);
         this.valueSerializer = new RowDataSerializer(valueType);
     }
 
     public FileStoreExpireImpl newExpire(int numRetained, long millisRetained) {
         return new FileStoreExpireImpl(
-                numRetained, millisRetained, pathFactory(), manifestListFactory(), newScan());
+                numRetained,
+                millisRetained,
+                pathFactory(),
+                manifestFileFactory(),
+                manifestListFactory());
     }
 
     public List<Snapshot> commitData(
@@ -264,4 +281,72 @@ public class TestFileStore extends FileStoreImpl {
         }
         return result;
     }
+
+    public void assertCleaned() {
+        Set<Path> filesInUse = getFilesInUse();
+        Set<Path> actualFiles;
+        try {
+            actualFiles =
+                    Files.walk(Paths.get(root))
+                            .filter(p -> Files.isRegularFile(p))
+                            .map(p -> new Path(p.toString()))
+                            .collect(Collectors.toSet());
+        } catch (IOException e) {
+            throw new RuntimeException(e);
+        }
+        assertThat(actualFiles).isEqualTo(filesInUse);
+    }
+
+    private Set<Path> getFilesInUse() {
+        FileStorePathFactory pathFactory = pathFactory();
+        ManifestList manifestList = manifestListFactory().create();
+        FileStoreScan scan = newScan();
+        FileStorePathFactory.SstPathFactoryCache sstPathFactoryCache =
+                new FileStorePathFactory.SstPathFactoryCache(pathFactory);
+
+        Long latestSnapshotId = pathFactory.latestSnapshotId();
+        if (latestSnapshotId == null) {
+            return Collections.emptySet();
+        }
+
+        long firstInUseSnapshotId = Snapshot.FIRST_SNAPSHOT_ID;
+        for (long id = latestSnapshotId - 1; id >= Snapshot.FIRST_SNAPSHOT_ID; id--) {
+            Path snapshotPath = pathFactory.toSnapshotPath(id);
+            try {
+                if (!snapshotPath.getFileSystem().exists(snapshotPath)) {
+                    firstInUseSnapshotId = id + 1;
+                    break;
+                }
+            } catch (IOException e) {
+                throw new RuntimeException(e);
+            }
+        }
+
+        Set<Path> result = new HashSet<>();
+        for (long id = firstInUseSnapshotId; id <= latestSnapshotId; id++) {
+            Path snapshotPath = pathFactory.toSnapshotPath(id);
+            Snapshot snapshot = Snapshot.fromPath(snapshotPath);
+
+            // snapshot file
+            result.add(snapshotPath);
+
+            // manifest lists
+            result.add(pathFactory.toManifestListPath(snapshot.baseManifestList()));
+            result.add(pathFactory.toManifestListPath(snapshot.deltaManifestList()));
+
+            // manifests
+            List<ManifestFileMeta> manifests = snapshot.readAllManifests(manifestList);
+            manifests.forEach(m -> result.add(pathFactory.toManifestFilePath(m.fileName())));
+
+            // sst
+            List<ManifestEntry> entries = scan.withManifestList(manifests).plan().files();
+            for (ManifestEntry entry : entries) {
+                result.add(
+                        sstPathFactoryCache
+                                .getSstPathFactory(entry.partition(), entry.bucket())
+                                .toPath(entry.file().fileName()));
+            }
+        }
+        return result;
+    }
 }
diff --git a/flink-table-store-core/src/test/java/org/apache/flink/table/store/file/operation/FileStoreExpireTest.java b/flink-table-store-core/src/test/java/org/apache/flink/table/store/file/operation/FileStoreExpireTest.java
index e007ca0..bb12a2f 100644
--- a/flink-table-store-core/src/test/java/org/apache/flink/table/store/file/operation/FileStoreExpireTest.java
+++ b/flink-table-store-core/src/test/java/org/apache/flink/table/store/file/operation/FileStoreExpireTest.java
@@ -28,6 +28,7 @@ import org.apache.flink.table.store.file.TestKeyValueGenerator;
 import org.apache.flink.table.store.file.mergetree.compact.DeduplicateAccumulator;
 import org.apache.flink.table.store.file.utils.FileStorePathFactory;
 
+import org.junit.jupiter.api.AfterEach;
 import org.junit.jupiter.api.BeforeEach;
 import org.junit.jupiter.api.Test;
 import org.junit.jupiter.api.io.TempDir;
@@ -65,6 +66,11 @@ public class FileStoreExpireTest {
         pathFactory = store.pathFactory();
     }
 
+    @AfterEach
+    public void afterEach() {
+        store.assertCleaned();
+    }
+
     @Test
     public void testNoSnapshot() {
         FileStoreExpire expire = store.newExpire(3, Long.MAX_VALUE);
diff --git a/flink-table-store-core/src/test/java/org/apache/flink/table/store/file/operation/FileStoreScanTest.java b/flink-table-store-core/src/test/java/org/apache/flink/table/store/file/operation/FileStoreScanTest.java
index bde976a..1c5d1b6 100644
--- a/flink-table-store-core/src/test/java/org/apache/flink/table/store/file/operation/FileStoreScanTest.java
+++ b/flink-table-store-core/src/test/java/org/apache/flink/table/store/file/operation/FileStoreScanTest.java
@@ -161,16 +161,14 @@ public class FileStoreScanTest {
         }
 
         ManifestList manifestList = store.manifestListFactory().create();
-        long wantedSnapshot = random.nextLong(pathFactory.latestSnapshotId()) + 1;
-        List<ManifestFileMeta> wantedManifests =
-                manifestList.read(
-                        Snapshot.fromPath(pathFactory.toSnapshotPath(wantedSnapshot))
-                                .manifestList());
+        long wantedSnapshotId = random.nextLong(pathFactory.latestSnapshotId()) + 1;
+        Snapshot wantedSnapshot = Snapshot.fromPath(pathFactory.toSnapshotPath(wantedSnapshotId));
+        List<ManifestFileMeta> wantedManifests = wantedSnapshot.readAllManifests(manifestList);
 
         FileStoreScan scan = store.newScan();
         scan.withManifestList(wantedManifests);
 
-        List<KeyValue> expectedKvs = store.readKvsFromSnapshot(wantedSnapshot);
+        List<KeyValue> expectedKvs = store.readKvsFromSnapshot(wantedSnapshotId);
         gen.sort(expectedKvs);
         Map<BinaryRowData, BinaryRowData> expected = store.toKvMap(expectedKvs);
         runTest(scan, null, expected);