You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by lz...@apache.org on 2022/06/13 02:27:22 UTC

[flink-table-store] branch master updated: [FLINK-27814] Extract snapshot related methods to SnapshotManager

This is an automated email from the ASF dual-hosted git repository.

lzljs3620320 pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/flink-table-store.git


The following commit(s) were added to refs/heads/master by this push:
     new b28ccbfe [FLINK-27814] Extract snapshot related methods to SnapshotManager
b28ccbfe is described below

commit b28ccbfe4fda3ddaccbea4489dad84b2e003350a
Author: tsreaper <ts...@gmail.com>
AuthorDate: Mon Jun 13 10:27:17 2022 +0800

    [FLINK-27814] Extract snapshot related methods to SnapshotManager
    
    This closes #152
---
 .../source/ContinuousFileSplitEnumerator.java      |   9 +-
 .../store/connector/source/FileStoreSource.java    |   7 +-
 .../store/connector/AlterTableCompactITCase.java   |  23 ++--
 .../table/store/connector/sink/TestFileStore.java  |   6 +
 .../store/connector/source/TestDataReadWrite.java  |   4 +
 .../apache/flink/table/store/file/FileStore.java   |   3 +
 .../flink/table/store/file/FileStoreImpl.java      |  11 +-
 .../store/file/operation/FileStoreCommitImpl.java  |  34 +++--
 .../store/file/operation/FileStoreExpireImpl.java  |  58 ++++----
 .../table/store/file/operation/FileStoreScan.java  |   7 -
 .../store/file/operation/FileStoreScanImpl.java    |  35 +----
 .../store/file/operation/FileStoreWriteImpl.java   |   6 +-
 .../store/file/utils/FileStorePathFactory.java     |  27 +---
 .../table/store/file/utils/SnapshotFinder.java     | 112 ---------------
 .../table/store/file/utils/SnapshotManager.java    | 150 +++++++++++++++++++++
 .../table/store/table/AbstractFileStoreTable.java  |  10 +-
 .../store/table/AppendOnlyFileStoreTable.java      |   3 +-
 .../table/ChangelogValueCountFileStoreTable.java   |   3 +-
 .../table/ChangelogWithKeyFileStoreTable.java      |   3 +-
 .../flink/table/store/table/FileStoreTable.java    |   3 +
 .../flink/table/store/file/TestFileStore.java      |  42 +++---
 .../store/file/operation/FileStoreCommitTest.java  |  26 ++--
 .../store/file/operation/FileStoreExpireTest.java  |  50 +++----
 .../store/file/operation/FileStoreReadTest.java    |   3 +-
 .../store/file/operation/FileStoreScanTest.java    |  13 +-
 .../store/file/utils/FileStorePathFactoryTest.java |   9 --
 .../store/file/utils/SnapshotManagerTest.java}     |  37 +++--
 27 files changed, 338 insertions(+), 356 deletions(-)

diff --git a/flink-table-store-connector/src/main/java/org/apache/flink/table/store/connector/source/ContinuousFileSplitEnumerator.java b/flink-table-store-connector/src/main/java/org/apache/flink/table/store/connector/source/ContinuousFileSplitEnumerator.java
index 78a51c15..66c70473 100644
--- a/flink-table-store-connector/src/main/java/org/apache/flink/table/store/connector/source/ContinuousFileSplitEnumerator.java
+++ b/flink-table-store-connector/src/main/java/org/apache/flink/table/store/connector/source/ContinuousFileSplitEnumerator.java
@@ -23,6 +23,7 @@ import org.apache.flink.api.connector.source.SplitEnumerator;
 import org.apache.flink.api.connector.source.SplitEnumeratorContext;
 import org.apache.flink.table.store.file.Snapshot;
 import org.apache.flink.table.store.file.operation.FileStoreScan;
+import org.apache.flink.table.store.file.utils.SnapshotManager;
 
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
@@ -55,6 +56,8 @@ public class ContinuousFileSplitEnumerator
 
     private final FileStoreScan scan;
 
+    private final SnapshotManager snapshotManager;
+
     private final Map<Integer, Queue<FileStoreSourceSplit>> bucketSplits;
 
     private final long discoveryInterval;
@@ -70,12 +73,14 @@ public class ContinuousFileSplitEnumerator
     public ContinuousFileSplitEnumerator(
             SplitEnumeratorContext<FileStoreSourceSplit> context,
             FileStoreScan scan,
+            SnapshotManager snapshotManager,
             Collection<FileStoreSourceSplit> remainSplits,
             long currentSnapshotId,
             long discoveryInterval) {
         checkArgument(discoveryInterval > 0L);
         this.context = checkNotNull(context);
         this.scan = checkNotNull(scan);
+        this.snapshotManager = snapshotManager;
         this.bucketSplits = new HashMap<>();
         addSplits(remainSplits);
         this.currentSnapshotId = currentSnapshotId;
@@ -191,7 +196,7 @@ public class ContinuousFileSplitEnumerator
         public EnumeratorResult call() {
             // TODO sync with processDiscoveredSplits to avoid too more splits in memory
             while (true) {
-                if (!scan.snapshotExists(nextSnapshotId)) {
+                if (!snapshotManager.snapshotExists(nextSnapshotId)) {
                     // TODO check latest snapshot id, expired?
                     LOG.debug(
                             "Next snapshot id {} not exists, wait for it to be generated.",
@@ -199,7 +204,7 @@ public class ContinuousFileSplitEnumerator
                     return null;
                 }
 
-                Snapshot snapshot = scan.snapshot(nextSnapshotId);
+                Snapshot snapshot = snapshotManager.snapshot(nextSnapshotId);
                 if (snapshot.commitKind() != Snapshot.CommitKind.APPEND) {
                     if (snapshot.commitKind() == Snapshot.CommitKind.OVERWRITE) {
                         LOG.warn("Ignore overwrite snapshot id {}.", nextSnapshotId);
diff --git a/flink-table-store-connector/src/main/java/org/apache/flink/table/store/connector/source/FileStoreSource.java b/flink-table-store-connector/src/main/java/org/apache/flink/table/store/connector/source/FileStoreSource.java
index fccd3c7e..6cce9d9d 100644
--- a/flink-table-store-connector/src/main/java/org/apache/flink/table/store/connector/source/FileStoreSource.java
+++ b/flink-table-store-connector/src/main/java/org/apache/flink/table/store/connector/source/FileStoreSource.java
@@ -31,6 +31,7 @@ import org.apache.flink.table.store.file.WriteMode;
 import org.apache.flink.table.store.file.operation.FileStoreRead;
 import org.apache.flink.table.store.file.operation.FileStoreScan;
 import org.apache.flink.table.store.file.predicate.Predicate;
+import org.apache.flink.table.store.file.utils.SnapshotManager;
 
 import javax.annotation.Nullable;
 
@@ -126,6 +127,7 @@ public class FileStoreSource
     public SplitEnumerator<FileStoreSourceSplit, PendingSplitsCheckpoint> restoreEnumerator(
             SplitEnumeratorContext<FileStoreSourceSplit> context,
             PendingSplitsCheckpoint checkpoint) {
+        SnapshotManager snapshotManager = fileStore.snapshotManager();
         FileStoreScan scan = fileStore.newScan();
 
         if (partitionPredicate != null) {
@@ -147,7 +149,7 @@ public class FileStoreSource
                 checkArgument(
                         isContinuous,
                         "The latest continuous can only be true when isContinuous is true.");
-                snapshotId = scan.latestSnapshot();
+                snapshotId = snapshotManager.latestSnapshotId();
                 splits = new ArrayList<>();
             } else {
                 FileStoreScan.Plan plan = scan.plan();
@@ -169,11 +171,12 @@ public class FileStoreSource
             return new ContinuousFileSplitEnumerator(
                     context,
                     scan.withIncremental(true), // the subsequent planning is all incremental
+                    snapshotManager,
                     splits,
                     currentSnapshot,
                     discoveryInterval);
         } else {
-            Snapshot snapshot = snapshotId == null ? null : scan.snapshot(snapshotId);
+            Snapshot snapshot = snapshotId == null ? null : snapshotManager.snapshot(snapshotId);
             return new StaticFileStoreSplitEnumerator(context, snapshot, splits);
         }
     }
diff --git a/flink-table-store-connector/src/test/java/org/apache/flink/table/store/connector/AlterTableCompactITCase.java b/flink-table-store-connector/src/test/java/org/apache/flink/table/store/connector/AlterTableCompactITCase.java
index d664f5ee..41e1dbeb 100644
--- a/flink-table-store-connector/src/test/java/org/apache/flink/table/store/connector/AlterTableCompactITCase.java
+++ b/flink-table-store-connector/src/test/java/org/apache/flink/table/store/connector/AlterTableCompactITCase.java
@@ -26,14 +26,13 @@ import org.apache.flink.table.store.file.KeyValue;
 import org.apache.flink.table.store.file.Snapshot;
 import org.apache.flink.table.store.file.TestKeyValueGenerator;
 import org.apache.flink.table.store.file.ValueKind;
-import org.apache.flink.table.store.file.utils.SnapshotFinder;
+import org.apache.flink.table.store.file.utils.SnapshotManager;
 import org.apache.flink.types.Row;
 import org.apache.flink.types.RowKind;
 
 import org.junit.Test;
 
 import java.io.IOException;
-import java.net.URI;
 import java.util.ArrayList;
 import java.util.Arrays;
 import java.util.List;
@@ -275,18 +274,18 @@ public class AlterTableCompactITCase extends FileStoreTableITCase {
         }
     }
 
-    private String getSnapshotDir(String tableName) {
-        return path
-                + relativeTablePath(
-                        ObjectIdentifier.of(
-                                bEnv.getCurrentCatalog(), bEnv.getCurrentDatabase(), tableName))
-                + "/snapshot";
+    private Path getTableDirectory(String tableName) {
+        return new Path(
+                path
+                        + relativeTablePath(
+                                ObjectIdentifier.of(
+                                        bEnv.getCurrentCatalog(),
+                                        bEnv.getCurrentDatabase(),
+                                        tableName)));
     }
 
     private Snapshot findLatestSnapshot(String tableName) throws IOException {
-        String snapshotDir = getSnapshotDir(tableName);
-        Long latest = SnapshotFinder.findLatest(new Path(URI.create(snapshotDir)));
-        return Snapshot.fromPath(
-                new Path(URI.create(snapshotDir + String.format("/snapshot-%d", latest))));
+        SnapshotManager snapshotManager = new SnapshotManager(getTableDirectory(tableName));
+        return snapshotManager.snapshot(snapshotManager.latestSnapshotId());
     }
 }
diff --git a/flink-table-store-connector/src/test/java/org/apache/flink/table/store/connector/sink/TestFileStore.java b/flink-table-store-connector/src/test/java/org/apache/flink/table/store/connector/sink/TestFileStore.java
index 5c8d255c..3b797e0e 100644
--- a/flink-table-store-connector/src/test/java/org/apache/flink/table/store/connector/sink/TestFileStore.java
+++ b/flink-table-store-connector/src/test/java/org/apache/flink/table/store/connector/sink/TestFileStore.java
@@ -32,6 +32,7 @@ import org.apache.flink.table.store.file.operation.FileStoreScan;
 import org.apache.flink.table.store.file.operation.FileStoreWrite;
 import org.apache.flink.table.store.file.operation.Lock;
 import org.apache.flink.table.store.file.stats.StatsTestUtils;
+import org.apache.flink.table.store.file.utils.SnapshotManager;
 import org.apache.flink.table.store.file.writer.CompactWriter;
 import org.apache.flink.table.store.file.writer.RecordWriter;
 import org.apache.flink.table.types.logical.RowType;
@@ -147,6 +148,11 @@ public class TestFileStore implements FileStore {
         throw new UnsupportedOperationException();
     }
 
+    @Override
+    public SnapshotManager snapshotManager() {
+        throw new UnsupportedOperationException();
+    }
+
     static class TestRecordWriter implements RecordWriter {
 
         final List<String> records = new ArrayList<>();
diff --git a/flink-table-store-connector/src/test/java/org/apache/flink/table/store/connector/source/TestDataReadWrite.java b/flink-table-store-connector/src/test/java/org/apache/flink/table/store/connector/source/TestDataReadWrite.java
index 8ecba0c9..e17d7b47 100644
--- a/flink-table-store-connector/src/test/java/org/apache/flink/table/store/connector/source/TestDataReadWrite.java
+++ b/flink-table-store-connector/src/test/java/org/apache/flink/table/store/connector/source/TestDataReadWrite.java
@@ -35,6 +35,7 @@ import org.apache.flink.table.store.file.operation.FileStoreRead;
 import org.apache.flink.table.store.file.operation.FileStoreReadImpl;
 import org.apache.flink.table.store.file.operation.FileStoreWriteImpl;
 import org.apache.flink.table.store.file.utils.FileStorePathFactory;
+import org.apache.flink.table.store.file.utils.SnapshotManager;
 import org.apache.flink.table.store.file.writer.RecordWriter;
 import org.apache.flink.table.types.logical.BigIntType;
 import org.apache.flink.table.types.logical.IntType;
@@ -60,6 +61,7 @@ public class TestDataReadWrite {
 
     private final FileFormat avro;
     private final FileStorePathFactory pathFactory;
+    private final SnapshotManager snapshotManager;
     private final ExecutorService service;
 
     public TestDataReadWrite(String root, ExecutorService service) {
@@ -74,6 +76,7 @@ public class TestDataReadWrite {
                         RowType.of(new IntType()),
                         "default",
                         FileStoreOptions.FILE_FORMAT.defaultValue());
+        this.snapshotManager = new SnapshotManager(new Path(root));
         this.service = service;
     }
 
@@ -111,6 +114,7 @@ public class TestDataReadWrite {
                         new DeduplicateMergeFunction(),
                         avro,
                         pathFactory,
+                        snapshotManager,
                         null, // not used, we only create an empty writer
                         options)
                 .createEmptyWriter(partition, bucket, service);
diff --git a/flink-table-store-core/src/main/java/org/apache/flink/table/store/file/FileStore.java b/flink-table-store-core/src/main/java/org/apache/flink/table/store/file/FileStore.java
index cc36435f..c6c205ff 100644
--- a/flink-table-store-core/src/main/java/org/apache/flink/table/store/file/FileStore.java
+++ b/flink-table-store-core/src/main/java/org/apache/flink/table/store/file/FileStore.java
@@ -23,6 +23,7 @@ import org.apache.flink.table.store.file.operation.FileStoreExpire;
 import org.apache.flink.table.store.file.operation.FileStoreRead;
 import org.apache.flink.table.store.file.operation.FileStoreScan;
 import org.apache.flink.table.store.file.operation.FileStoreWrite;
+import org.apache.flink.table.store.file.utils.SnapshotManager;
 import org.apache.flink.table.types.logical.RowType;
 
 import java.io.Serializable;
@@ -40,6 +41,8 @@ public interface FileStore extends Serializable {
 
     FileStoreScan newScan();
 
+    SnapshotManager snapshotManager();
+
     RowType keyType();
 
     RowType valueType();
diff --git a/flink-table-store-core/src/main/java/org/apache/flink/table/store/file/FileStoreImpl.java b/flink-table-store-core/src/main/java/org/apache/flink/table/store/file/FileStoreImpl.java
index ee7428a9..a0b2d243 100644
--- a/flink-table-store-core/src/main/java/org/apache/flink/table/store/file/FileStoreImpl.java
+++ b/flink-table-store-core/src/main/java/org/apache/flink/table/store/file/FileStoreImpl.java
@@ -33,6 +33,7 @@ import org.apache.flink.table.store.file.operation.FileStoreScanImpl;
 import org.apache.flink.table.store.file.operation.FileStoreWriteImpl;
 import org.apache.flink.table.store.file.utils.FileStorePathFactory;
 import org.apache.flink.table.store.file.utils.KeyComparatorSupplier;
+import org.apache.flink.table.store.file.utils.SnapshotManager;
 import org.apache.flink.table.types.logical.BigIntType;
 import org.apache.flink.table.types.logical.LogicalType;
 import org.apache.flink.table.types.logical.RowType;
@@ -113,6 +114,7 @@ public class FileStoreImpl implements FileStore {
                 mergeFunction,
                 options.fileFormat(),
                 pathFactory(),
+                snapshotManager(),
                 newScan(),
                 options.mergeTreeOptions());
     }
@@ -136,6 +138,7 @@ public class FileStoreImpl implements FileStore {
                 user,
                 partitionType,
                 pathFactory(),
+                snapshotManager(),
                 manifestFileFactory(),
                 manifestListFactory(),
                 newScan(),
@@ -151,6 +154,7 @@ public class FileStoreImpl implements FileStore {
                 options.snapshotNumRetainMax(),
                 options.snapshotTimeRetain().toMillis(),
                 pathFactory(),
+                snapshotManager(),
                 manifestFileFactory(),
                 manifestListFactory());
     }
@@ -161,12 +165,17 @@ public class FileStoreImpl implements FileStore {
                 partitionType,
                 keyType,
                 valueType,
-                pathFactory(),
+                snapshotManager(),
                 manifestFileFactory(),
                 manifestListFactory(),
                 options.bucket());
     }
 
+    @Override
+    public SnapshotManager snapshotManager() {
+        return new SnapshotManager(options.path());
+    }
+
     @Override
     public RowType keyType() {
         return keyType;
diff --git a/flink-table-store-core/src/main/java/org/apache/flink/table/store/file/operation/FileStoreCommitImpl.java b/flink-table-store-core/src/main/java/org/apache/flink/table/store/file/operation/FileStoreCommitImpl.java
index 1c45ecfa..79b0f8cf 100644
--- a/flink-table-store-core/src/main/java/org/apache/flink/table/store/file/operation/FileStoreCommitImpl.java
+++ b/flink-table-store-core/src/main/java/org/apache/flink/table/store/file/operation/FileStoreCommitImpl.java
@@ -35,7 +35,7 @@ import org.apache.flink.table.store.file.predicate.PredicateConverter;
 import org.apache.flink.table.store.file.utils.FileStorePathFactory;
 import org.apache.flink.table.store.file.utils.FileUtils;
 import org.apache.flink.table.store.file.utils.RowDataToObjectArrayConverter;
-import org.apache.flink.table.store.file.utils.SnapshotFinder;
+import org.apache.flink.table.store.file.utils.SnapshotManager;
 import org.apache.flink.table.types.logical.RowType;
 
 import org.slf4j.Logger;
@@ -43,7 +43,6 @@ import org.slf4j.LoggerFactory;
 
 import javax.annotation.Nullable;
 
-import java.io.IOException;
 import java.util.ArrayList;
 import java.util.HashSet;
 import java.util.LinkedHashMap;
@@ -78,6 +77,7 @@ public class FileStoreCommitImpl implements FileStoreCommit {
     private final RowType partitionType;
     private final RowDataToObjectArrayConverter partitionObjectConverter;
     private final FileStorePathFactory pathFactory;
+    private final SnapshotManager snapshotManager;
     private final ManifestFile manifestFile;
     private final ManifestList manifestList;
     private final FileStoreScan scan;
@@ -92,6 +92,7 @@ public class FileStoreCommitImpl implements FileStoreCommit {
             String commitUser,
             RowType partitionType,
             FileStorePathFactory pathFactory,
+            SnapshotManager snapshotManager,
             ManifestFile.Factory manifestFileFactory,
             ManifestList.Factory manifestListFactory,
             FileStoreScan scan,
@@ -103,6 +104,7 @@ public class FileStoreCommitImpl implements FileStoreCommit {
         this.partitionType = partitionType;
         this.partitionObjectConverter = new RowDataToObjectArrayConverter(partitionType);
         this.pathFactory = pathFactory;
+        this.snapshotManager = snapshotManager;
         this.manifestFile = manifestFileFactory.create();
         this.manifestList = manifestListFactory.create();
         this.scan = scan;
@@ -127,7 +129,7 @@ public class FileStoreCommitImpl implements FileStoreCommit {
         }
 
         // if there is no previous snapshots then nothing should be filtered
-        Long latestSnapshotId = pathFactory.latestSnapshotId();
+        Long latestSnapshotId = snapshotManager.latestSnapshotId();
         if (latestSnapshotId == null) {
             return committableList;
         }
@@ -139,16 +141,11 @@ public class FileStoreCommitImpl implements FileStoreCommit {
         }
 
         for (long id = latestSnapshotId; id >= Snapshot.FIRST_SNAPSHOT_ID; id--) {
-            Path snapshotPath = pathFactory.toSnapshotPath(id);
-            try {
-                if (!snapshotPath.getFileSystem().exists(snapshotPath)) {
-                    // snapshots before this are expired
-                    break;
-                }
-            } catch (IOException e) {
-                throw new RuntimeException("Cannot determine if snapshot #" + id + " exists.", e);
+            if (!snapshotManager.snapshotExists(id)) {
+                // snapshots before this are expired
+                break;
             }
-            Snapshot snapshot = Snapshot.fromPath(snapshotPath);
+            Snapshot snapshot = snapshotManager.snapshot(id);
             if (commitUser.equals(snapshot.commitUser())) {
                 if (identifiers.containsKey(snapshot.commitIdentifier())) {
                     identifiers.remove(snapshot.commitIdentifier());
@@ -244,7 +241,7 @@ public class FileStoreCommitImpl implements FileStoreCommit {
             Snapshot.CommitKind commitKind,
             boolean checkDeletedFiles) {
         while (true) {
-            Long latestSnapshotId = pathFactory.latestSnapshotId();
+            Long latestSnapshotId = snapshotManager.latestSnapshotId();
             if (tryCommitOnce(
                     changes, hash, logOffsets, commitKind, latestSnapshotId, checkDeletedFiles)) {
                 break;
@@ -258,7 +255,7 @@ public class FileStoreCommitImpl implements FileStoreCommit {
             String identifier,
             Map<Integer, Long> logOffsets) {
         while (true) {
-            Long latestSnapshotId = pathFactory.latestSnapshotId();
+            Long latestSnapshotId = snapshotManager.latestSnapshotId();
 
             List<ManifestEntry> changesWithOverwrite = new ArrayList<>();
             if (latestSnapshotId != null) {
@@ -323,8 +320,8 @@ public class FileStoreCommitImpl implements FileStoreCommit {
             boolean checkDeletedFiles) {
         long newSnapshotId =
                 latestSnapshotId == null ? Snapshot.FIRST_SNAPSHOT_ID : latestSnapshotId + 1;
-        Path newSnapshotPath = pathFactory.toSnapshotPath(newSnapshotId);
-        Path tmpSnapshotPath = pathFactory.toTmpSnapshotPath(newSnapshotId);
+        Path newSnapshotPath = snapshotManager.snapshotPath(newSnapshotId);
+        Path tmpSnapshotPath = snapshotManager.tmpSnapshotPath(newSnapshotId);
 
         if (LOG.isDebugEnabled()) {
             LOG.debug("Ready to commit changes to snapshot #" + newSnapshotId);
@@ -338,7 +335,7 @@ public class FileStoreCommitImpl implements FileStoreCommit {
             if (checkDeletedFiles) {
                 noConflictsOrFail(latestSnapshotId, changes);
             }
-            latestSnapshot = Snapshot.fromPath(pathFactory.toSnapshotPath(latestSnapshotId));
+            latestSnapshot = snapshotManager.snapshot(latestSnapshotId);
         }
 
         Snapshot newSnapshot;
@@ -410,8 +407,7 @@ public class FileStoreCommitImpl implements FileStoreCommit {
                     () -> {
                         boolean committed = fs.rename(tmpSnapshotPath, newSnapshotPath);
                         if (committed) {
-                            SnapshotFinder.commitLatestHint(
-                                    pathFactory.snapshotDirectory(), newSnapshotId);
+                            snapshotManager.commitLatestHint(newSnapshotId);
                         }
                         return committed;
                     };
diff --git a/flink-table-store-core/src/main/java/org/apache/flink/table/store/file/operation/FileStoreExpireImpl.java b/flink-table-store-core/src/main/java/org/apache/flink/table/store/file/operation/FileStoreExpireImpl.java
index 1cc24cc1..690ba672 100644
--- a/flink-table-store-core/src/main/java/org/apache/flink/table/store/file/operation/FileStoreExpireImpl.java
+++ b/flink-table-store-core/src/main/java/org/apache/flink/table/store/file/operation/FileStoreExpireImpl.java
@@ -27,7 +27,7 @@ import org.apache.flink.table.store.file.manifest.ManifestFileMeta;
 import org.apache.flink.table.store.file.manifest.ManifestList;
 import org.apache.flink.table.store.file.utils.FileStorePathFactory;
 import org.apache.flink.table.store.file.utils.FileUtils;
-import org.apache.flink.table.store.file.utils.SnapshotFinder;
+import org.apache.flink.table.store.file.utils.SnapshotManager;
 
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
@@ -58,6 +58,7 @@ public class FileStoreExpireImpl implements FileStoreExpire {
     private final long millisRetained;
 
     private final FileStorePathFactory pathFactory;
+    private final SnapshotManager snapshotManager;
     private final ManifestFile manifestFile;
     private final ManifestList manifestList;
 
@@ -68,12 +69,14 @@ public class FileStoreExpireImpl implements FileStoreExpire {
             int numRetainedMax,
             long millisRetained,
             FileStorePathFactory pathFactory,
+            SnapshotManager snapshotManager,
             ManifestFile.Factory manifestFileFactory,
             ManifestList.Factory manifestListFactory) {
         this.numRetainedMin = numRetainedMin;
         this.numRetainedMax = numRetainedMax;
         this.millisRetained = millisRetained;
         this.pathFactory = pathFactory;
+        this.snapshotManager = snapshotManager;
         this.manifestFile = manifestFileFactory.create();
         this.manifestList = manifestListFactory.create();
     }
@@ -86,7 +89,7 @@ public class FileStoreExpireImpl implements FileStoreExpire {
 
     @Override
     public void expire() {
-        Long latestSnapshotId = pathFactory.latestSnapshotId();
+        Long latestSnapshotId = snapshotManager.latestSnapshotId();
         if (latestSnapshotId == null) {
             // no snapshot, nothing to expire
             return;
@@ -96,7 +99,7 @@ public class FileStoreExpireImpl implements FileStoreExpire {
 
         Long earliest;
         try {
-            earliest = SnapshotFinder.findEarliest(pathFactory.snapshotDirectory());
+            earliest = snapshotManager.findEarliest();
         } catch (IOException e) {
             throw new RuntimeException("Failed to find earliest snapshot id", e);
         }
@@ -108,19 +111,13 @@ public class FileStoreExpireImpl implements FileStoreExpire {
         for (long id = Math.max(latestSnapshotId - numRetainedMax + 1, earliest);
                 id <= latestSnapshotId - numRetainedMin;
                 id++) {
-            Path snapshotPath = pathFactory.toSnapshotPath(id);
-            try {
-                if (snapshotPath.getFileSystem().exists(snapshotPath)
-                        && currentMillis - Snapshot.fromPath(snapshotPath).timeMillis()
-                                <= millisRetained) {
-                    // within time threshold, can assume that all snapshots after it are also within
-                    // the threshold
-                    expireUntil(earliest, id);
-                    return;
-                }
-            } catch (IOException e) {
-                throw new RuntimeException(
-                        "Failed to determine if snapshot #" + id + " still exists", e);
+            if (snapshotManager.snapshotExists(id)
+                    && currentMillis - snapshotManager.snapshot(id).timeMillis()
+                            <= millisRetained) {
+                // within time threshold, can assume that all snapshots after it are also within
+                // the threshold
+                expireUntil(earliest, id);
+                return;
             }
         }
 
@@ -133,9 +130,8 @@ public class FileStoreExpireImpl implements FileStoreExpire {
             // No expire happens:
             // write the hint file in order to see the earliest snapshot directly next time
             // should avoid duplicate writes when the file exists
-            Path hint = new Path(pathFactory.snapshotDirectory(), SnapshotFinder.EARLIEST);
             try {
-                if (!hint.getFileSystem().exists(hint)) {
+                if (snapshotManager.readHint(SnapshotManager.EARLIEST) == null) {
                     writeEarliestHint(endExclusiveId);
                 }
             } catch (IOException e) {
@@ -149,17 +145,11 @@ public class FileStoreExpireImpl implements FileStoreExpire {
         // find first snapshot to expire
         long beginInclusiveId = earliestId;
         for (long id = endExclusiveId - 1; id >= earliestId; id--) {
-            Path snapshotPath = pathFactory.toSnapshotPath(id);
-            try {
-                if (!snapshotPath.getFileSystem().exists(snapshotPath)) {
-                    // only latest snapshots are retained, as we cannot find this snapshot, we can
-                    // assume that all snapshots preceding it have been removed
-                    beginInclusiveId = id + 1;
-                    break;
-                }
-            } catch (IOException e) {
-                throw new RuntimeException(
-                        "Failed to determine if snapshot #" + id + " still exists", e);
+            if (!snapshotManager.snapshotExists(id)) {
+                // only latest snapshots are retained, as we cannot find this snapshot, we can
+                // assume that all snapshots preceding it have been removed
+                beginInclusiveId = id + 1;
+                break;
             }
         }
         if (LOG.isDebugEnabled()) {
@@ -177,7 +167,7 @@ public class FileStoreExpireImpl implements FileStoreExpire {
                 LOG.debug("Ready to delete data files in snapshot #" + id);
             }
 
-            Snapshot toExpire = Snapshot.fromPath(pathFactory.toSnapshotPath(id));
+            Snapshot toExpire = snapshotManager.snapshot(id);
             List<ManifestFileMeta> deltaManifests = manifestList.read(toExpire.deltaManifestList());
 
             // we cannot delete a data file directly when we meet a DELETE entry, because that
@@ -208,7 +198,7 @@ public class FileStoreExpireImpl implements FileStoreExpire {
         }
 
         // delete manifests
-        Snapshot exclusiveSnapshot = Snapshot.fromPath(pathFactory.toSnapshotPath(endExclusiveId));
+        Snapshot exclusiveSnapshot = snapshotManager.snapshot(endExclusiveId);
         Set<ManifestFileMeta> manifestsInUse =
                 new HashSet<>(exclusiveSnapshot.readAllManifests(manifestList));
         // to avoid deleting twice
@@ -218,7 +208,7 @@ public class FileStoreExpireImpl implements FileStoreExpire {
                 LOG.debug("Ready to delete manifests in snapshot #" + id);
             }
 
-            Snapshot toExpire = Snapshot.fromPath(pathFactory.toSnapshotPath(id));
+            Snapshot toExpire = snapshotManager.snapshot(id);
 
             for (ManifestFileMeta manifest : toExpire.readAllManifests(manifestList)) {
                 if (!manifestsInUse.contains(manifest) && !deletedManifests.contains(manifest)) {
@@ -232,7 +222,7 @@ public class FileStoreExpireImpl implements FileStoreExpire {
             manifestList.delete(toExpire.deltaManifestList());
 
             // delete snapshot
-            FileUtils.deleteOrWarn(pathFactory.toSnapshotPath(id));
+            FileUtils.deleteOrWarn(snapshotManager.snapshotPath(id));
         }
 
         writeEarliestHint(endExclusiveId);
@@ -243,7 +233,7 @@ public class FileStoreExpireImpl implements FileStoreExpire {
 
         Callable<Void> callable =
                 () -> {
-                    SnapshotFinder.commitEarliestHint(pathFactory.snapshotDirectory(), earliest);
+                    snapshotManager.commitEarliestHint(earliest);
                     return null;
                 };
 
diff --git a/flink-table-store-core/src/main/java/org/apache/flink/table/store/file/operation/FileStoreScan.java b/flink-table-store-core/src/main/java/org/apache/flink/table/store/file/operation/FileStoreScan.java
index 8d4171c7..7a9667be 100644
--- a/flink-table-store-core/src/main/java/org/apache/flink/table/store/file/operation/FileStoreScan.java
+++ b/flink-table-store-core/src/main/java/org/apache/flink/table/store/file/operation/FileStoreScan.java
@@ -19,7 +19,6 @@
 package org.apache.flink.table.store.file.operation;
 
 import org.apache.flink.table.data.binary.BinaryRowData;
-import org.apache.flink.table.store.file.Snapshot;
 import org.apache.flink.table.store.file.ValueKind;
 import org.apache.flink.table.store.file.data.DataFileMeta;
 import org.apache.flink.table.store.file.manifest.ManifestEntry;
@@ -38,12 +37,6 @@ import static org.apache.flink.util.Preconditions.checkArgument;
 /** Scan operation which produces a plan. */
 public interface FileStoreScan {
 
-    Long latestSnapshot();
-
-    boolean snapshotExists(long snapshotId);
-
-    Snapshot snapshot(long snapshotId);
-
     FileStoreScan withPartitionFilter(Predicate predicate);
 
     FileStoreScan withPartitionFilter(List<BinaryRowData> partitions);
diff --git a/flink-table-store-core/src/main/java/org/apache/flink/table/store/file/operation/FileStoreScanImpl.java b/flink-table-store-core/src/main/java/org/apache/flink/table/store/file/operation/FileStoreScanImpl.java
index 6599fe1f..873bcf2a 100644
--- a/flink-table-store-core/src/main/java/org/apache/flink/table/store/file/operation/FileStoreScanImpl.java
+++ b/flink-table-store-core/src/main/java/org/apache/flink/table/store/file/operation/FileStoreScanImpl.java
@@ -18,7 +18,6 @@
 
 package org.apache.flink.table.store.file.operation;
 
-import org.apache.flink.core.fs.Path;
 import org.apache.flink.table.data.binary.BinaryRowData;
 import org.apache.flink.table.store.file.Snapshot;
 import org.apache.flink.table.store.file.manifest.ManifestEntry;
@@ -29,16 +28,14 @@ import org.apache.flink.table.store.file.predicate.Literal;
 import org.apache.flink.table.store.file.predicate.Predicate;
 import org.apache.flink.table.store.file.predicate.PredicateBuilder;
 import org.apache.flink.table.store.file.stats.FieldStatsArraySerializer;
-import org.apache.flink.table.store.file.utils.FileStorePathFactory;
 import org.apache.flink.table.store.file.utils.FileUtils;
 import org.apache.flink.table.store.file.utils.RowDataToObjectArrayConverter;
+import org.apache.flink.table.store.file.utils.SnapshotManager;
 import org.apache.flink.table.types.logical.RowType;
 import org.apache.flink.util.Preconditions;
 
 import javax.annotation.Nullable;
 
-import java.io.IOException;
-import java.io.UncheckedIOException;
 import java.util.ArrayList;
 import java.util.Collections;
 import java.util.HashMap;
@@ -55,7 +52,7 @@ public class FileStoreScanImpl implements FileStoreScan {
     private final FieldStatsArraySerializer keyStatsConverter;
     private final FieldStatsArraySerializer valueStatsConverter;
     private final RowDataToObjectArrayConverter partitionConverter;
-    private final FileStorePathFactory pathFactory;
+    private final SnapshotManager snapshotManager;
     private final ManifestFile.Factory manifestFileFactory;
     private final ManifestList manifestList;
     private final int numOfBuckets;
@@ -73,7 +70,7 @@ public class FileStoreScanImpl implements FileStoreScan {
             RowType partitionType,
             RowType keyType,
             RowType valueType,
-            FileStorePathFactory pathFactory,
+            SnapshotManager snapshotManager,
             ManifestFile.Factory manifestFileFactory,
             ManifestList.Factory manifestListFactory,
             int numOfBuckets) {
@@ -81,32 +78,12 @@ public class FileStoreScanImpl implements FileStoreScan {
         this.keyStatsConverter = new FieldStatsArraySerializer(keyType);
         this.valueStatsConverter = new FieldStatsArraySerializer(valueType);
         this.partitionConverter = new RowDataToObjectArrayConverter(partitionType);
-        this.pathFactory = pathFactory;
+        this.snapshotManager = snapshotManager;
         this.manifestFileFactory = manifestFileFactory;
         this.manifestList = manifestListFactory.create();
         this.numOfBuckets = numOfBuckets;
     }
 
-    @Override
-    public Long latestSnapshot() {
-        return pathFactory.latestSnapshotId();
-    }
-
-    @Override
-    public boolean snapshotExists(long snapshotId) {
-        Path path = pathFactory.toSnapshotPath(snapshotId);
-        try {
-            return path.getFileSystem().exists(path);
-        } catch (IOException e) {
-            throw new UncheckedIOException(e);
-        }
-    }
-
-    @Override
-    public Snapshot snapshot(long snapshotId) {
-        return Snapshot.fromPath(pathFactory.toSnapshotPath(snapshotId));
-    }
-
     @Override
     public FileStoreScan withPartitionFilter(Predicate predicate) {
         this.partitionFilter = predicate;
@@ -188,12 +165,12 @@ public class FileStoreScanImpl implements FileStoreScan {
         Long snapshotId = specifiedSnapshotId;
         if (manifests == null) {
             if (snapshotId == null) {
-                snapshotId = pathFactory.latestSnapshotId();
+                snapshotId = snapshotManager.latestSnapshotId();
             }
             if (snapshotId == null) {
                 manifests = Collections.emptyList();
             } else {
-                Snapshot snapshot = snapshot(snapshotId);
+                Snapshot snapshot = snapshotManager.snapshot(snapshotId);
                 manifests =
                         isIncremental
                                 ? manifestList.read(snapshot.deltaManifestList())
diff --git a/flink-table-store-core/src/main/java/org/apache/flink/table/store/file/operation/FileStoreWriteImpl.java b/flink-table-store-core/src/main/java/org/apache/flink/table/store/file/operation/FileStoreWriteImpl.java
index 53c2f3b3..447f06b6 100644
--- a/flink-table-store-core/src/main/java/org/apache/flink/table/store/file/operation/FileStoreWriteImpl.java
+++ b/flink-table-store-core/src/main/java/org/apache/flink/table/store/file/operation/FileStoreWriteImpl.java
@@ -39,6 +39,7 @@ import org.apache.flink.table.store.file.mergetree.compact.MergeFunction;
 import org.apache.flink.table.store.file.mergetree.compact.UniversalCompaction;
 import org.apache.flink.table.store.file.utils.FileStorePathFactory;
 import org.apache.flink.table.store.file.utils.RecordReaderIterator;
+import org.apache.flink.table.store.file.utils.SnapshotManager;
 import org.apache.flink.table.store.file.writer.AppendOnlyWriter;
 import org.apache.flink.table.store.file.writer.CompactWriter;
 import org.apache.flink.table.store.file.writer.RecordWriter;
@@ -64,6 +65,7 @@ public class FileStoreWriteImpl implements FileStoreWrite {
     private final Supplier<Comparator<RowData>> keyComparatorSupplier;
     private final MergeFunction mergeFunction;
     private final FileStorePathFactory pathFactory;
+    private final SnapshotManager snapshotManager;
     private final FileStoreScan scan;
     private final MergeTreeOptions options;
 
@@ -75,6 +77,7 @@ public class FileStoreWriteImpl implements FileStoreWrite {
             MergeFunction mergeFunction,
             FileFormat fileFormat,
             FileStorePathFactory pathFactory,
+            SnapshotManager snapshotManager,
             FileStoreScan scan,
             MergeTreeOptions options) {
         this.valueType = valueType;
@@ -88,6 +91,7 @@ public class FileStoreWriteImpl implements FileStoreWrite {
         this.keyComparatorSupplier = keyComparatorSupplier;
         this.mergeFunction = mergeFunction;
         this.pathFactory = pathFactory;
+        this.snapshotManager = snapshotManager;
         this.scan = scan;
         this.options = options;
     }
@@ -95,7 +99,7 @@ public class FileStoreWriteImpl implements FileStoreWrite {
     @Override
     public RecordWriter createWriter(
             BinaryRowData partition, int bucket, ExecutorService compactExecutor) {
-        Long latestSnapshotId = pathFactory.latestSnapshotId();
+        Long latestSnapshotId = snapshotManager.latestSnapshotId();
         List<DataFileMeta> existingFileMetas = Lists.newArrayList();
         if (latestSnapshotId != null) {
             // Concat all the DataFileMeta of existing files into existingFileMetas.
diff --git a/flink-table-store-core/src/main/java/org/apache/flink/table/store/file/utils/FileStorePathFactory.java b/flink-table-store-core/src/main/java/org/apache/flink/table/store/file/utils/FileStorePathFactory.java
index 4f79a568..a992a74e 100644
--- a/flink-table-store-core/src/main/java/org/apache/flink/table/store/file/utils/FileStorePathFactory.java
+++ b/flink-table-store-core/src/main/java/org/apache/flink/table/store/file/utils/FileStorePathFactory.java
@@ -31,18 +31,14 @@ import org.apache.flink.table.types.utils.LogicalTypeDataTypeConverter;
 import org.apache.flink.table.utils.PartitionPathUtils;
 import org.apache.flink.util.Preconditions;
 
-import javax.annotation.Nullable;
 import javax.annotation.concurrent.ThreadSafe;
 
-import java.io.IOException;
 import java.util.HashMap;
 import java.util.Map;
 import java.util.UUID;
 import java.util.concurrent.atomic.AtomicInteger;
 
-import static org.apache.flink.table.store.file.utils.SnapshotFinder.SNAPSHOT_PREFIX;
-
-/** Factory which produces {@link Path}s for each type of files. */
+/** Factory which produces {@link Path}s for manifest files. */
 @ThreadSafe
 public class FileStorePathFactory {
 
@@ -110,14 +106,6 @@ public class FileStorePathFactory {
         return new Path(root + "/manifest/" + manifestListName);
     }
 
-    public Path toSnapshotPath(long id) {
-        return new Path(root + "/snapshot/" + SNAPSHOT_PREFIX + id);
-    }
-
-    public Path toTmpSnapshotPath(long id) {
-        return new Path(root + "/snapshot/." + SNAPSHOT_PREFIX + id + "-" + UUID.randomUUID());
-    }
-
     public DataFilePathFactory createDataFilePathFactory(BinaryRowData partition, int bucket) {
         return new DataFilePathFactory(
                 root, getPartitionString(partition), bucket, formatIdentifier);
@@ -131,19 +119,6 @@ public class FileStorePathFactory {
                                 partition, "Partition row data is null. This is unexpected.")));
     }
 
-    public Path snapshotDirectory() {
-        return new Path(root + "/snapshot");
-    }
-
-    @Nullable
-    public Long latestSnapshotId() {
-        try {
-            return SnapshotFinder.findLatest(snapshotDirectory());
-        } catch (IOException e) {
-            throw new RuntimeException("Failed to find latest snapshot id", e);
-        }
-    }
-
     @VisibleForTesting
     public String uuid() {
         return uuid;
diff --git a/flink-table-store-core/src/main/java/org/apache/flink/table/store/file/utils/SnapshotFinder.java b/flink-table-store-core/src/main/java/org/apache/flink/table/store/file/utils/SnapshotFinder.java
deleted file mode 100644
index 026d3f2e..00000000
--- a/flink-table-store-core/src/main/java/org/apache/flink/table/store/file/utils/SnapshotFinder.java
+++ /dev/null
@@ -1,112 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.table.store.file.utils;
-
-import org.apache.flink.annotation.VisibleForTesting;
-import org.apache.flink.core.fs.FileSystem;
-import org.apache.flink.core.fs.Path;
-
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-import java.io.IOException;
-import java.util.UUID;
-import java.util.function.BinaryOperator;
-
-import static org.apache.flink.table.store.file.utils.FileUtils.listVersionedFiles;
-
-/** Find latest and earliest snapshot. */
-public class SnapshotFinder {
-
-    private static final Logger LOG = LoggerFactory.getLogger(SnapshotFinder.class);
-
-    public static final String SNAPSHOT_PREFIX = "snapshot-";
-
-    public static final String EARLIEST = "EARLIEST";
-
-    public static final String LATEST = "LATEST";
-
-    public static Long findLatest(Path snapshotDir) throws IOException {
-        FileSystem fs = snapshotDir.getFileSystem();
-        if (!fs.exists(snapshotDir)) {
-            return null;
-        }
-
-        Long snapshotId = readHint(snapshotDir, LATEST);
-        if (snapshotId != null) {
-            long nextSnapshot = snapshotId + 1;
-            // it is the latest only there is no next one
-            if (!fs.exists(new Path(snapshotDir, SNAPSHOT_PREFIX + nextSnapshot))) {
-                return snapshotId;
-            }
-        }
-
-        return findByListFiles(snapshotDir, Math::max);
-    }
-
-    public static Long findEarliest(Path snapshotDir) throws IOException {
-        FileSystem fs = snapshotDir.getFileSystem();
-        if (!fs.exists(snapshotDir)) {
-            return null;
-        }
-
-        Long snapshotId = readHint(snapshotDir, EARLIEST);
-        // null and it is the earliest only it exists
-        if (snapshotId != null && fs.exists(new Path(snapshotDir, SNAPSHOT_PREFIX + snapshotId))) {
-            return snapshotId;
-        }
-
-        return findByListFiles(snapshotDir, Math::min);
-    }
-
-    @VisibleForTesting
-    public static Long readHint(Path snapshotDir, String fileName) throws IOException {
-        Path path = new Path(snapshotDir, fileName);
-        if (path.getFileSystem().exists(path)) {
-            return Long.parseLong(FileUtils.readFileUtf8(path));
-        }
-        return null;
-    }
-
-    private static Long findByListFiles(Path snapshotDir, BinaryOperator<Long> reducer)
-            throws IOException {
-        return listVersionedFiles(snapshotDir, SNAPSHOT_PREFIX).reduce(reducer).orElse(null);
-    }
-
-    public static void commitLatestHint(Path snapshotDir, long snapshotId) throws IOException {
-        commitHint(snapshotDir, snapshotId, LATEST);
-    }
-
-    public static void commitEarliestHint(Path snapshotDir, long snapshotId) throws IOException {
-        commitHint(snapshotDir, snapshotId, EARLIEST);
-    }
-
-    private static void commitHint(Path snapshotDir, long snapshotId, String fileName)
-            throws IOException {
-        FileSystem fs = snapshotDir.getFileSystem();
-        Path hintFile = new Path(snapshotDir, fileName);
-        Path tempFile = new Path(snapshotDir, UUID.randomUUID() + "-" + fileName + ".temp");
-        FileUtils.writeFileUtf8(tempFile, String.valueOf(snapshotId));
-        fs.delete(hintFile, false);
-        boolean success = fs.rename(tempFile, hintFile);
-        if (!success) {
-            fs.delete(tempFile, false);
-        }
-    }
-}
diff --git a/flink-table-store-core/src/main/java/org/apache/flink/table/store/file/utils/SnapshotManager.java b/flink-table-store-core/src/main/java/org/apache/flink/table/store/file/utils/SnapshotManager.java
new file mode 100644
index 00000000..8bdbc015
--- /dev/null
+++ b/flink-table-store-core/src/main/java/org/apache/flink/table/store/file/utils/SnapshotManager.java
@@ -0,0 +1,150 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.store.file.utils;
+
+import org.apache.flink.core.fs.FileSystem;
+import org.apache.flink.core.fs.Path;
+import org.apache.flink.table.store.file.Snapshot;
+
+import javax.annotation.Nullable;
+
+import java.io.IOException;
+import java.util.UUID;
+import java.util.function.BinaryOperator;
+
+import static org.apache.flink.table.store.file.utils.FileUtils.listVersionedFiles;
+
+/** Manager for {@link Snapshot}, providing utility methods related to paths and snapshot hints. */
+public class SnapshotManager {
+
+    private static final String SNAPSHOT_PREFIX = "snapshot-";
+    public static final String EARLIEST = "EARLIEST";
+    public static final String LATEST = "LATEST";
+
+    private final Path tablePath;
+
+    public SnapshotManager(Path tablePath) {
+        this.tablePath = tablePath;
+    }
+
+    public Path snapshotDirectory() {
+        return new Path(tablePath + "/snapshot");
+    }
+
+    public Path snapshotPath(long snapshotId) {
+        return new Path(tablePath + "/snapshot/" + SNAPSHOT_PREFIX + snapshotId);
+    }
+
+    public Path tmpSnapshotPath(long id) {
+        return new Path(tablePath + "/snapshot/." + SNAPSHOT_PREFIX + id + "-" + UUID.randomUUID());
+    }
+
+    public Snapshot snapshot(long snapshotId) {
+        return Snapshot.fromPath(snapshotPath(snapshotId));
+    }
+
+    public boolean snapshotExists(long snapshotId) {
+        Path path = snapshotPath(snapshotId);
+        try {
+            return path.getFileSystem().exists(path);
+        } catch (IOException e) {
+            throw new RuntimeException(
+                    "Failed to determine if snapshot #" + snapshotId + " exists in path " + path,
+                    e);
+        }
+    }
+
+    public @Nullable Long latestSnapshotId() {
+        try {
+            return findLatest();
+        } catch (IOException e) {
+            throw new RuntimeException("Failed to find latest snapshot id", e);
+        }
+    }
+
+    public Long findLatest() throws IOException {
+        Path snapshotDir = snapshotDirectory();
+        FileSystem fs = snapshotDir.getFileSystem();
+        if (!fs.exists(snapshotDir)) {
+            return null;
+        }
+
+        Long snapshotId = readHint(LATEST);
+        if (snapshotId != null) {
+            long nextSnapshot = snapshotId + 1;
+            // it is the latest only there is no next one
+            if (!snapshotExists(nextSnapshot)) {
+                return snapshotId;
+            }
+        }
+
+        return findByListFiles(Math::max);
+    }
+
+    public Long findEarliest() throws IOException {
+        Path snapshotDir = snapshotDirectory();
+        FileSystem fs = snapshotDir.getFileSystem();
+        if (!fs.exists(snapshotDir)) {
+            return null;
+        }
+
+        Long snapshotId = readHint(EARLIEST);
+        // null and it is the earliest only it exists
+        if (snapshotId != null && snapshotExists(snapshotId)) {
+            return snapshotId;
+        }
+
+        return findByListFiles(Math::min);
+    }
+
+    public Long readHint(String fileName) throws IOException {
+        Path snapshotDir = snapshotDirectory();
+        Path path = new Path(snapshotDir, fileName);
+        if (path.getFileSystem().exists(path)) {
+            return Long.parseLong(FileUtils.readFileUtf8(path));
+        }
+        return null;
+    }
+
+    private Long findByListFiles(BinaryOperator<Long> reducer) throws IOException {
+        Path snapshotDir = snapshotDirectory();
+        return listVersionedFiles(snapshotDir, SNAPSHOT_PREFIX).reduce(reducer).orElse(null);
+    }
+
+    public void commitLatestHint(long snapshotId) throws IOException {
+        commitHint(snapshotId, LATEST);
+    }
+
+    public void commitEarliestHint(long snapshotId) throws IOException {
+        commitHint(snapshotId, EARLIEST);
+    }
+
+    private void commitHint(long snapshotId, String fileName) throws IOException {
+        Path snapshotDir = snapshotDirectory();
+        FileSystem fs = snapshotDir.getFileSystem();
+        Path hintFile = new Path(snapshotDir, fileName);
+        Path tempFile = new Path(snapshotDir, UUID.randomUUID() + "-" + fileName + ".temp");
+        FileUtils.writeFileUtf8(tempFile, String.valueOf(snapshotId));
+        fs.delete(hintFile, false);
+        boolean success = fs.rename(tempFile, hintFile);
+        if (!success) {
+            fs.delete(tempFile, false);
+        }
+    }
+}
diff --git a/flink-table-store-core/src/main/java/org/apache/flink/table/store/table/AbstractFileStoreTable.java b/flink-table-store-core/src/main/java/org/apache/flink/table/store/table/AbstractFileStoreTable.java
index 6d52c0b3..6f7a34d4 100644
--- a/flink-table-store-core/src/main/java/org/apache/flink/table/store/table/AbstractFileStoreTable.java
+++ b/flink-table-store-core/src/main/java/org/apache/flink/table/store/table/AbstractFileStoreTable.java
@@ -18,8 +18,9 @@
 
 package org.apache.flink.table.store.table;
 
-import org.apache.flink.table.store.file.FileStore;
+import org.apache.flink.table.store.file.FileStoreImpl;
 import org.apache.flink.table.store.file.schema.Schema;
+import org.apache.flink.table.store.file.utils.SnapshotManager;
 import org.apache.flink.table.store.table.sink.TableCommit;
 import org.apache.flink.table.types.logical.RowType;
 
@@ -46,10 +47,15 @@ public abstract class AbstractFileStoreTable implements FileStoreTable {
         return schema.logicalRowType();
     }
 
+    @Override
+    public SnapshotManager snapshotManager() {
+        return store().snapshotManager();
+    }
+
     @Override
     public TableCommit newCommit() {
         return new TableCommit(store().newCommit(), store().newExpire());
     }
 
-    protected abstract FileStore store();
+    protected abstract FileStoreImpl store();
 }
diff --git a/flink-table-store-core/src/main/java/org/apache/flink/table/store/table/AppendOnlyFileStoreTable.java b/flink-table-store-core/src/main/java/org/apache/flink/table/store/table/AppendOnlyFileStoreTable.java
index 70e31059..26472592 100644
--- a/flink-table-store-core/src/main/java/org/apache/flink/table/store/table/AppendOnlyFileStoreTable.java
+++ b/flink-table-store-core/src/main/java/org/apache/flink/table/store/table/AppendOnlyFileStoreTable.java
@@ -20,7 +20,6 @@ package org.apache.flink.table.store.table;
 
 import org.apache.flink.table.data.RowData;
 import org.apache.flink.table.data.binary.BinaryRowDataUtil;
-import org.apache.flink.table.store.file.FileStore;
 import org.apache.flink.table.store.file.FileStoreImpl;
 import org.apache.flink.table.store.file.FileStoreOptions;
 import org.apache.flink.table.store.file.KeyValue;
@@ -111,7 +110,7 @@ public class AppendOnlyFileStoreTable extends AbstractFileStoreTable {
     }
 
     @Override
-    public FileStore store() {
+    public FileStoreImpl store() {
         return store;
     }
 }
diff --git a/flink-table-store-core/src/main/java/org/apache/flink/table/store/table/ChangelogValueCountFileStoreTable.java b/flink-table-store-core/src/main/java/org/apache/flink/table/store/table/ChangelogValueCountFileStoreTable.java
index 071e8f56..caa98e3b 100644
--- a/flink-table-store-core/src/main/java/org/apache/flink/table/store/table/ChangelogValueCountFileStoreTable.java
+++ b/flink-table-store-core/src/main/java/org/apache/flink/table/store/table/ChangelogValueCountFileStoreTable.java
@@ -20,7 +20,6 @@ package org.apache.flink.table.store.table;
 
 import org.apache.flink.table.data.GenericRowData;
 import org.apache.flink.table.data.RowData;
-import org.apache.flink.table.store.file.FileStore;
 import org.apache.flink.table.store.file.FileStoreImpl;
 import org.apache.flink.table.store.file.FileStoreOptions;
 import org.apache.flink.table.store.file.KeyValue;
@@ -134,7 +133,7 @@ public class ChangelogValueCountFileStoreTable extends AbstractFileStoreTable {
     }
 
     @Override
-    public FileStore store() {
+    public FileStoreImpl store() {
         return store;
     }
 }
diff --git a/flink-table-store-core/src/main/java/org/apache/flink/table/store/table/ChangelogWithKeyFileStoreTable.java b/flink-table-store-core/src/main/java/org/apache/flink/table/store/table/ChangelogWithKeyFileStoreTable.java
index fb56416b..f02259e6 100644
--- a/flink-table-store-core/src/main/java/org/apache/flink/table/store/table/ChangelogWithKeyFileStoreTable.java
+++ b/flink-table-store-core/src/main/java/org/apache/flink/table/store/table/ChangelogWithKeyFileStoreTable.java
@@ -20,7 +20,6 @@ package org.apache.flink.table.store.table;
 
 import org.apache.flink.configuration.Configuration;
 import org.apache.flink.table.data.RowData;
-import org.apache.flink.table.store.file.FileStore;
 import org.apache.flink.table.store.file.FileStoreImpl;
 import org.apache.flink.table.store.file.FileStoreOptions;
 import org.apache.flink.table.store.file.KeyValue;
@@ -183,7 +182,7 @@ public class ChangelogWithKeyFileStoreTable extends AbstractFileStoreTable {
     }
 
     @Override
-    public FileStore store() {
+    public FileStoreImpl store() {
         return store;
     }
 }
diff --git a/flink-table-store-core/src/main/java/org/apache/flink/table/store/table/FileStoreTable.java b/flink-table-store-core/src/main/java/org/apache/flink/table/store/table/FileStoreTable.java
index 65645f76..7bd7feb6 100644
--- a/flink-table-store-core/src/main/java/org/apache/flink/table/store/table/FileStoreTable.java
+++ b/flink-table-store-core/src/main/java/org/apache/flink/table/store/table/FileStoreTable.java
@@ -18,6 +18,7 @@
 
 package org.apache.flink.table.store.table;
 
+import org.apache.flink.table.store.file.utils.SnapshotManager;
 import org.apache.flink.table.store.table.sink.TableCommit;
 import org.apache.flink.table.store.table.sink.TableWrite;
 import org.apache.flink.table.store.table.source.TableRead;
@@ -36,6 +37,8 @@ public interface FileStoreTable extends Serializable {
 
     RowType rowType();
 
+    SnapshotManager snapshotManager();
+
     TableScan newScan();
 
     TableRead newRead();
diff --git a/flink-table-store-core/src/test/java/org/apache/flink/table/store/file/TestFileStore.java b/flink-table-store-core/src/test/java/org/apache/flink/table/store/file/TestFileStore.java
index e1688f2b..a8180385 100644
--- a/flink-table-store-core/src/test/java/org/apache/flink/table/store/file/TestFileStore.java
+++ b/flink-table-store-core/src/test/java/org/apache/flink/table/store/file/TestFileStore.java
@@ -38,7 +38,7 @@ import org.apache.flink.table.store.file.operation.FileStoreScan;
 import org.apache.flink.table.store.file.operation.FileStoreWrite;
 import org.apache.flink.table.store.file.utils.FileStorePathFactory;
 import org.apache.flink.table.store.file.utils.RecordReaderIterator;
-import org.apache.flink.table.store.file.utils.SnapshotFinder;
+import org.apache.flink.table.store.file.utils.SnapshotManager;
 import org.apache.flink.table.store.file.writer.RecordWriter;
 import org.apache.flink.table.types.logical.RowType;
 import org.apache.flink.util.function.QuadFunction;
@@ -131,6 +131,7 @@ public class TestFileStore extends FileStoreImpl {
                 numRetainedMax,
                 millisRetained,
                 pathFactory(),
+                snapshotManager(),
                 manifestFileFactory(),
                 manifestListFactory());
     }
@@ -216,13 +217,13 @@ public class TestFileStore extends FileStoreImpl {
             }
         }
 
-        FileStorePathFactory pathFactory = pathFactory();
-        Long snapshotIdBeforeCommit = pathFactory.latestSnapshotId();
+        SnapshotManager snapshotManager = snapshotManager();
+        Long snapshotIdBeforeCommit = snapshotManager.latestSnapshotId();
         if (snapshotIdBeforeCommit == null) {
             snapshotIdBeforeCommit = Snapshot.FIRST_SNAPSHOT_ID - 1;
         }
         commitFunction.accept(commit, committable);
-        Long snapshotIdAfterCommit = pathFactory.latestSnapshotId();
+        Long snapshotIdAfterCommit = snapshotManager.latestSnapshotId();
         if (snapshotIdAfterCommit == null) {
             snapshotIdAfterCommit = Snapshot.FIRST_SNAPSHOT_ID - 1;
         }
@@ -240,7 +241,7 @@ public class TestFileStore extends FileStoreImpl {
 
         List<Snapshot> snapshots = new ArrayList<>();
         for (long id = snapshotIdBeforeCommit + 1; id <= snapshotIdAfterCommit; id++) {
-            snapshots.add(Snapshot.fromPath(pathFactory.toSnapshotPath(id)));
+            snapshots.add(snapshotManager.snapshot(id));
         }
         return snapshots;
     }
@@ -320,18 +321,19 @@ public class TestFileStore extends FileStoreImpl {
         // possibly not the most accurate, so this check is only.
         // - latest should < true_latest
         // - earliest should < true_earliest
-        Path snapshotDir = pathFactory().snapshotDirectory();
-        Path earliest = new Path(snapshotDir, SnapshotFinder.EARLIEST);
-        Path latest = new Path(snapshotDir, SnapshotFinder.LATEST);
+        SnapshotManager snapshotManager = snapshotManager();
+        Path snapshotDir = snapshotManager.snapshotDirectory();
+        Path earliest = new Path(snapshotDir, SnapshotManager.EARLIEST);
+        Path latest = new Path(snapshotDir, SnapshotManager.LATEST);
         if (actualFiles.remove(earliest)) {
-            long earliestId = SnapshotFinder.readHint(snapshotDir, SnapshotFinder.EARLIEST);
+            long earliestId = snapshotManager.readHint(SnapshotManager.EARLIEST);
             earliest.getFileSystem().delete(earliest, false);
-            assertThat(earliestId <= SnapshotFinder.findEarliest(snapshotDir)).isTrue();
+            assertThat(earliestId <= snapshotManager.findEarliest()).isTrue();
         }
         if (actualFiles.remove(latest)) {
-            long latestId = SnapshotFinder.readHint(snapshotDir, SnapshotFinder.LATEST);
+            long latestId = snapshotManager.readHint(SnapshotManager.LATEST);
             latest.getFileSystem().delete(latest, false);
-            assertThat(latestId <= SnapshotFinder.findLatest(snapshotDir)).isTrue();
+            assertThat(latestId <= snapshotManager.findLatest()).isTrue();
         }
         actualFiles.remove(latest);
 
@@ -345,27 +347,23 @@ public class TestFileStore extends FileStoreImpl {
         FileStorePathFactory.DataFilePathFactoryCache dataFilePathFactoryCache =
                 new FileStorePathFactory.DataFilePathFactoryCache(pathFactory);
 
-        Long latestSnapshotId = pathFactory.latestSnapshotId();
+        SnapshotManager snapshotManager = snapshotManager();
+        Long latestSnapshotId = snapshotManager.latestSnapshotId();
         if (latestSnapshotId == null) {
             return Collections.emptySet();
         }
 
         long firstInUseSnapshotId = Snapshot.FIRST_SNAPSHOT_ID;
         for (long id = latestSnapshotId - 1; id >= Snapshot.FIRST_SNAPSHOT_ID; id--) {
-            Path snapshotPath = pathFactory.toSnapshotPath(id);
-            try {
-                if (!snapshotPath.getFileSystem().exists(snapshotPath)) {
-                    firstInUseSnapshotId = id + 1;
-                    break;
-                }
-            } catch (IOException e) {
-                throw new RuntimeException(e);
+            if (!snapshotManager.snapshotExists(id)) {
+                firstInUseSnapshotId = id + 1;
+                break;
             }
         }
 
         Set<Path> result = new HashSet<>();
         for (long id = firstInUseSnapshotId; id <= latestSnapshotId; id++) {
-            Path snapshotPath = pathFactory.toSnapshotPath(id);
+            Path snapshotPath = snapshotManager.snapshotPath(id);
             Snapshot snapshot = Snapshot.fromPath(snapshotPath);
 
             // snapshot file
diff --git a/flink-table-store-core/src/test/java/org/apache/flink/table/store/file/operation/FileStoreCommitTest.java b/flink-table-store-core/src/test/java/org/apache/flink/table/store/file/operation/FileStoreCommitTest.java
index ed270788..5c07c31e 100644
--- a/flink-table-store-core/src/test/java/org/apache/flink/table/store/file/operation/FileStoreCommitTest.java
+++ b/flink-table-store-core/src/test/java/org/apache/flink/table/store/file/operation/FileStoreCommitTest.java
@@ -28,9 +28,8 @@ import org.apache.flink.table.store.file.ValueKind;
 import org.apache.flink.table.store.file.manifest.ManifestCommittable;
 import org.apache.flink.table.store.file.mergetree.compact.DeduplicateMergeFunction;
 import org.apache.flink.table.store.file.utils.FailingAtomicRenameFileSystem;
-import org.apache.flink.table.store.file.utils.FileStorePathFactory;
 import org.apache.flink.table.store.file.utils.FileUtils;
-import org.apache.flink.table.store.file.utils.SnapshotFinder;
+import org.apache.flink.table.store.file.utils.SnapshotManager;
 import org.apache.flink.table.store.file.utils.TestAtomicRenameFileSystem;
 
 import org.junit.jupiter.api.BeforeEach;
@@ -41,7 +40,6 @@ import org.junit.jupiter.params.provider.ValueSource;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
-import java.io.IOException;
 import java.util.ArrayList;
 import java.util.Collection;
 import java.util.Collections;
@@ -63,7 +61,7 @@ public class FileStoreCommitTest {
     @TempDir java.nio.file.Path tempDir;
 
     @BeforeEach
-    public void beforeEach() throws IOException {
+    public void beforeEach() {
         gen = new TestKeyValueGenerator();
         // for failure tests
         FailingAtomicRenameFileSystem.get().reset(100, 5000);
@@ -84,17 +82,18 @@ public class FileStoreCommitTest {
     @Test
     public void testLatestHint() throws Exception {
         testRandomConcurrentNoConflict(1, false);
-        Path snapshotDir = createStore(false, 1).pathFactory().snapshotDirectory();
-        Path latest = new Path(snapshotDir, SnapshotFinder.LATEST);
+        SnapshotManager snapshotManager = createStore(false, 1).snapshotManager();
+        Path snapshotDir = snapshotManager.snapshotDirectory();
+        Path latest = new Path(snapshotDir, SnapshotManager.LATEST);
 
         assertThat(latest.getFileSystem().exists(latest)).isTrue();
 
-        Long latestId = SnapshotFinder.findLatest(snapshotDir);
+        Long latestId = snapshotManager.findLatest();
 
         // remove latest hint file
         latest.getFileSystem().delete(latest, false);
 
-        assertThat(SnapshotFinder.findLatest(snapshotDir)).isEqualTo(latestId);
+        assertThat(snapshotManager.findLatest()).isEqualTo(latestId);
     }
 
     @Test
@@ -102,8 +101,8 @@ public class FileStoreCommitTest {
         testRandomConcurrentNoConflict(1, false);
         // remove first snapshot to mimic expiration
         TestFileStore store = createStore(false);
-        FileStorePathFactory pathFactory = store.pathFactory();
-        Path firstSnapshotPath = pathFactory.toSnapshotPath(Snapshot.FIRST_SNAPSHOT_ID);
+        SnapshotManager snapshotManager = store.snapshotManager();
+        Path firstSnapshotPath = snapshotManager.snapshotPath(Snapshot.FIRST_SNAPSHOT_ID);
         FileUtils.deleteOrWarn(firstSnapshotPath);
         // this test succeeds if this call does not fail
         store.newCommit()
@@ -159,7 +158,7 @@ public class FileStoreCommitTest {
                                 .collect(Collectors.toList()));
 
         // read actual data and compare
-        Long snapshotId = store.pathFactory().latestSnapshotId();
+        Long snapshotId = store.snapshotManager().latestSnapshotId();
         assertThat(snapshotId).isNotNull();
         List<KeyValue> actualKvs = store.readKvsFromSnapshot(snapshotId);
         gen.sort(actualKvs);
@@ -231,7 +230,7 @@ public class FileStoreCommitTest {
         Map<BinaryRowData, BinaryRowData> expected = store.toKvMap(expectedKvs);
 
         List<KeyValue> actualKvs =
-                store.readKvsFromSnapshot(store.pathFactory().latestSnapshotId());
+                store.readKvsFromSnapshot(store.snapshotManager().latestSnapshotId());
         gen.sort(actualKvs);
         Map<BinaryRowData, BinaryRowData> actual = store.toKvMap(actualKvs);
 
@@ -276,9 +275,8 @@ public class FileStoreCommitTest {
 
         store.commitData(
                 Collections.emptyList(), gen::getPartition, kv -> 0, Collections.emptyMap());
-        Path snapshotDir = store.pathFactory().snapshotDirectory();
 
-        assertThat(SnapshotFinder.findLatest(snapshotDir)).isEqualTo(snapshot.id());
+        assertThat(store.snapshotManager().findLatest()).isEqualTo(snapshot.id());
     }
 
     private TestFileStore createStore(boolean failing) {
diff --git a/flink-table-store-core/src/test/java/org/apache/flink/table/store/file/operation/FileStoreExpireTest.java b/flink-table-store-core/src/test/java/org/apache/flink/table/store/file/operation/FileStoreExpireTest.java
index 25516460..7b9bc4b0 100644
--- a/flink-table-store-core/src/test/java/org/apache/flink/table/store/file/operation/FileStoreExpireTest.java
+++ b/flink-table-store-core/src/test/java/org/apache/flink/table/store/file/operation/FileStoreExpireTest.java
@@ -18,7 +18,6 @@
 
 package org.apache.flink.table.store.file.operation;
 
-import org.apache.flink.core.fs.FileSystem;
 import org.apache.flink.core.fs.Path;
 import org.apache.flink.table.data.binary.BinaryRowData;
 import org.apache.flink.table.store.file.KeyValue;
@@ -26,8 +25,7 @@ import org.apache.flink.table.store.file.Snapshot;
 import org.apache.flink.table.store.file.TestFileStore;
 import org.apache.flink.table.store.file.TestKeyValueGenerator;
 import org.apache.flink.table.store.file.mergetree.compact.DeduplicateMergeFunction;
-import org.apache.flink.table.store.file.utils.FileStorePathFactory;
-import org.apache.flink.table.store.file.utils.SnapshotFinder;
+import org.apache.flink.table.store.file.utils.SnapshotManager;
 
 import org.junit.jupiter.api.AfterEach;
 import org.junit.jupiter.api.BeforeEach;
@@ -48,7 +46,7 @@ public class FileStoreExpireTest {
     private TestKeyValueGenerator gen;
     @TempDir java.nio.file.Path tempDir;
     private TestFileStore store;
-    private FileStorePathFactory pathFactory;
+    private SnapshotManager snapshotManager;
 
     @BeforeEach
     public void beforeEach() throws IOException {
@@ -62,7 +60,7 @@ public class FileStoreExpireTest {
                         TestKeyValueGenerator.KEY_TYPE,
                         TestKeyValueGenerator.DEFAULT_ROW_TYPE,
                         new DeduplicateMergeFunction());
-        pathFactory = store.pathFactory();
+        snapshotManager = store.snapshotManager();
     }
 
     @AfterEach
@@ -75,7 +73,7 @@ public class FileStoreExpireTest {
         FileStoreExpire expire = store.newExpire(1, 3, Long.MAX_VALUE);
         expire.expire();
 
-        assertThat(pathFactory.latestSnapshotId()).isNull();
+        assertThat(snapshotManager.latestSnapshotId()).isNull();
     }
 
     @Test
@@ -83,13 +81,12 @@ public class FileStoreExpireTest {
         List<KeyValue> allData = new ArrayList<>();
         List<Integer> snapshotPositions = new ArrayList<>();
         commit(2, allData, snapshotPositions);
-        int latestSnapshotId = pathFactory.latestSnapshotId().intValue();
+        int latestSnapshotId = snapshotManager.latestSnapshotId().intValue();
         FileStoreExpire expire = store.newExpire(1, latestSnapshotId + 1, Long.MAX_VALUE);
         expire.expire();
 
-        FileSystem fs = pathFactory.toSnapshotPath(latestSnapshotId).getFileSystem();
         for (int i = 1; i <= latestSnapshotId; i++) {
-            assertThat(fs.exists(pathFactory.toSnapshotPath(i))).isTrue();
+            assertThat(snapshotManager.snapshotExists(i)).isTrue();
             assertSnapshot(i, allData, snapshotPositions);
         }
     }
@@ -99,13 +96,12 @@ public class FileStoreExpireTest {
         List<KeyValue> allData = new ArrayList<>();
         List<Integer> snapshotPositions = new ArrayList<>();
         commit(5, allData, snapshotPositions);
-        int latestSnapshotId = pathFactory.latestSnapshotId().intValue();
+        int latestSnapshotId = snapshotManager.latestSnapshotId().intValue();
         FileStoreExpire expire = store.newExpire(1, Integer.MAX_VALUE, Long.MAX_VALUE);
         expire.expire();
 
-        FileSystem fs = pathFactory.toSnapshotPath(latestSnapshotId).getFileSystem();
         for (int i = 1; i <= latestSnapshotId; i++) {
-            assertThat(fs.exists(pathFactory.toSnapshotPath(i))).isTrue();
+            assertThat(snapshotManager.snapshotExists(i)).isTrue();
             assertSnapshot(i, allData, snapshotPositions);
         }
     }
@@ -118,17 +114,16 @@ public class FileStoreExpireTest {
         List<KeyValue> allData = new ArrayList<>();
         List<Integer> snapshotPositions = new ArrayList<>();
         commit(numRetainedMin + random.nextInt(5), allData, snapshotPositions);
-        int latestSnapshotId = pathFactory.latestSnapshotId().intValue();
+        int latestSnapshotId = snapshotManager.latestSnapshotId().intValue();
         Thread.sleep(100);
         FileStoreExpire expire = store.newExpire(numRetainedMin, Integer.MAX_VALUE, 1);
         expire.expire();
 
-        FileSystem fs = pathFactory.toSnapshotPath(latestSnapshotId).getFileSystem();
         for (int i = 1; i <= latestSnapshotId - numRetainedMin; i++) {
-            assertThat(fs.exists(pathFactory.toSnapshotPath(i))).isFalse();
+            assertThat(snapshotManager.snapshotExists(i)).isFalse();
         }
         for (int i = latestSnapshotId - numRetainedMin + 1; i <= latestSnapshotId; i++) {
-            assertThat(fs.exists(pathFactory.toSnapshotPath(i))).isTrue();
+            assertThat(snapshotManager.snapshotExists(i)).isTrue();
             assertSnapshot(i, allData, snapshotPositions);
         }
     }
@@ -143,31 +138,30 @@ public class FileStoreExpireTest {
             commit(ThreadLocalRandom.current().nextInt(5) + 1, allData, snapshotPositions);
             expire.expire();
 
-            int latestSnapshotId = pathFactory.latestSnapshotId().intValue();
-            FileSystem fs = pathFactory.toSnapshotPath(latestSnapshotId).getFileSystem();
+            int latestSnapshotId = snapshotManager.latestSnapshotId().intValue();
             for (int j = 1; j <= latestSnapshotId; j++) {
                 if (j > latestSnapshotId - 3) {
-                    assertThat(fs.exists(pathFactory.toSnapshotPath(j))).isTrue();
+                    assertThat(snapshotManager.snapshotExists(j)).isTrue();
                     assertSnapshot(j, allData, snapshotPositions);
                 } else {
-                    assertThat(fs.exists(pathFactory.toSnapshotPath(j))).isFalse();
+                    assertThat(snapshotManager.snapshotExists(j)).isFalse();
                 }
             }
         }
 
         // validate earliest hint file
 
-        Path snapshotDir = pathFactory.snapshotDirectory();
-        Path earliest = new Path(snapshotDir, SnapshotFinder.EARLIEST);
+        Path snapshotDir = snapshotManager.snapshotDirectory();
+        Path earliest = new Path(snapshotDir, SnapshotManager.EARLIEST);
 
         assertThat(earliest.getFileSystem().exists(earliest)).isTrue();
 
-        Long earliestId = SnapshotFinder.findEarliest(snapshotDir);
+        Long earliestId = snapshotManager.findEarliest();
 
         // remove earliest hint file
         earliest.getFileSystem().delete(earliest, false);
 
-        assertThat(SnapshotFinder.findEarliest(snapshotDir)).isEqualTo(earliestId);
+        assertThat(snapshotManager.findEarliest()).isEqualTo(earliestId);
     }
 
     @Test
@@ -184,12 +178,10 @@ public class FileStoreExpireTest {
         expire.expire();
         expire.expire();
 
-        int latestSnapshotId = pathFactory.latestSnapshotId().intValue();
-        FileSystem fs = pathFactory.toSnapshotPath(latestSnapshotId).getFileSystem();
+        int latestSnapshotId = snapshotManager.latestSnapshotId().intValue();
         for (int i = 1; i <= latestSnapshotId; i++) {
-            Path snapshotPath = pathFactory.toSnapshotPath(i);
-            if (fs.exists(snapshotPath)) {
-                assertThat(Snapshot.fromPath(snapshotPath).timeMillis())
+            if (snapshotManager.snapshotExists(i)) {
+                assertThat(snapshotManager.snapshot(i).timeMillis())
                         .isBetween(expireMillis - 1000, expireMillis);
                 assertSnapshot(i, allData, snapshotPositions);
             }
diff --git a/flink-table-store-core/src/test/java/org/apache/flink/table/store/file/operation/FileStoreReadTest.java b/flink-table-store-core/src/test/java/org/apache/flink/table/store/file/operation/FileStoreReadTest.java
index e236d517..a3378ed1 100644
--- a/flink-table-store-core/src/test/java/org/apache/flink/table/store/file/operation/FileStoreReadTest.java
+++ b/flink-table-store-core/src/test/java/org/apache/flink/table/store/file/operation/FileStoreReadTest.java
@@ -186,7 +186,8 @@ public class FileStoreReadTest {
         store.commitData(data, partitionCalculator, kv -> 0);
         FileStoreScan scan = store.newScan();
         Map<BinaryRowData, List<ManifestEntry>> filesGroupedByPartition =
-                scan.withSnapshot(store.pathFactory().latestSnapshotId()).plan().files().stream()
+                scan.withSnapshot(store.snapshotManager().latestSnapshotId()).plan().files()
+                        .stream()
                         .collect(Collectors.groupingBy(ManifestEntry::partition));
         FileStoreRead read = store.newRead();
         if (keyProjection != null) {
diff --git a/flink-table-store-core/src/test/java/org/apache/flink/table/store/file/operation/FileStoreScanTest.java b/flink-table-store-core/src/test/java/org/apache/flink/table/store/file/operation/FileStoreScanTest.java
index e2ca788c..f6245761 100644
--- a/flink-table-store-core/src/test/java/org/apache/flink/table/store/file/operation/FileStoreScanTest.java
+++ b/flink-table-store-core/src/test/java/org/apache/flink/table/store/file/operation/FileStoreScanTest.java
@@ -28,14 +28,13 @@ import org.apache.flink.table.store.file.manifest.ManifestList;
 import org.apache.flink.table.store.file.mergetree.compact.DeduplicateMergeFunction;
 import org.apache.flink.table.store.file.predicate.Literal;
 import org.apache.flink.table.store.file.predicate.PredicateBuilder;
-import org.apache.flink.table.store.file.utils.FileStorePathFactory;
+import org.apache.flink.table.store.file.utils.SnapshotManager;
 import org.apache.flink.table.types.logical.IntType;
 
 import org.junit.jupiter.api.BeforeEach;
 import org.junit.jupiter.api.Test;
 import org.junit.jupiter.api.io.TempDir;
 
-import java.io.IOException;
 import java.util.ArrayList;
 import java.util.Collection;
 import java.util.HashSet;
@@ -55,10 +54,10 @@ public class FileStoreScanTest {
     private TestKeyValueGenerator gen;
     @TempDir java.nio.file.Path tempDir;
     private TestFileStore store;
-    private FileStorePathFactory pathFactory;
+    private SnapshotManager snapshotManager;
 
     @BeforeEach
-    public void beforeEach() throws IOException {
+    public void beforeEach() {
         gen = new TestKeyValueGenerator();
         store =
                 TestFileStore.create(
@@ -69,7 +68,7 @@ public class FileStoreScanTest {
                         TestKeyValueGenerator.KEY_TYPE,
                         TestKeyValueGenerator.DEFAULT_ROW_TYPE,
                         new DeduplicateMergeFunction());
-        pathFactory = store.pathFactory();
+        snapshotManager = store.snapshotManager();
     }
 
     @Test
@@ -203,8 +202,8 @@ public class FileStoreScanTest {
         }
 
         ManifestList manifestList = store.manifestListFactory().create();
-        long wantedSnapshotId = random.nextLong(pathFactory.latestSnapshotId()) + 1;
-        Snapshot wantedSnapshot = Snapshot.fromPath(pathFactory.toSnapshotPath(wantedSnapshotId));
+        long wantedSnapshotId = random.nextLong(snapshotManager.latestSnapshotId()) + 1;
+        Snapshot wantedSnapshot = snapshotManager.snapshot(wantedSnapshotId);
         List<ManifestFileMeta> wantedManifests = wantedSnapshot.readAllManifests(manifestList);
 
         FileStoreScan scan = store.newScan();
diff --git a/flink-table-store-core/src/test/java/org/apache/flink/table/store/file/utils/FileStorePathFactoryTest.java b/flink-table-store-core/src/test/java/org/apache/flink/table/store/file/utils/FileStorePathFactoryTest.java
index cd3a7827..154ec613 100644
--- a/flink-table-store-core/src/test/java/org/apache/flink/table/store/file/utils/FileStorePathFactoryTest.java
+++ b/flink-table-store-core/src/test/java/org/apache/flink/table/store/file/utils/FileStorePathFactoryTest.java
@@ -66,15 +66,6 @@ public class FileStorePathFactoryTest {
                 .isEqualTo(new Path(tempDir.toString() + "/manifest/my-manifest-list-file-name"));
     }
 
-    @Test
-    public void testSnapshotPath() {
-        FileStorePathFactory pathFactory = new FileStorePathFactory(new Path(tempDir.toString()));
-        for (int i = 0; i < 20; i++) {
-            assertThat(pathFactory.toSnapshotPath(i))
-                    .isEqualTo(new Path(tempDir.toString() + "/snapshot/snapshot-" + i));
-        }
-    }
-
     @Test
     public void testCreateDataFilePathFactoryNoPartition() {
         FileStorePathFactory pathFactory = new FileStorePathFactory(new Path(tempDir.toString()));
diff --git a/flink-table-store-core/src/main/java/org/apache/flink/table/store/table/FileStoreTable.java b/flink-table-store-core/src/test/java/org/apache/flink/table/store/file/utils/SnapshotManagerTest.java
similarity index 52%
copy from flink-table-store-core/src/main/java/org/apache/flink/table/store/table/FileStoreTable.java
copy to flink-table-store-core/src/test/java/org/apache/flink/table/store/file/utils/SnapshotManagerTest.java
index 65645f76..97525003 100644
--- a/flink-table-store-core/src/main/java/org/apache/flink/table/store/table/FileStoreTable.java
+++ b/flink-table-store-core/src/test/java/org/apache/flink/table/store/file/utils/SnapshotManagerTest.java
@@ -16,31 +16,26 @@
  * limitations under the License.
  */
 
-package org.apache.flink.table.store.table;
+package org.apache.flink.table.store.file.utils;
 
-import org.apache.flink.table.store.table.sink.TableCommit;
-import org.apache.flink.table.store.table.sink.TableWrite;
-import org.apache.flink.table.store.table.source.TableRead;
-import org.apache.flink.table.store.table.source.TableScan;
-import org.apache.flink.table.types.logical.RowType;
+import org.apache.flink.core.fs.Path;
 
-import java.io.Serializable;
+import org.junit.jupiter.api.Test;
+import org.junit.jupiter.api.io.TempDir;
 
-/**
- * An abstraction layer above {@link org.apache.flink.table.store.file.FileStore} to provide reading
- * and writing of {@link org.apache.flink.table.data.RowData}.
- */
-public interface FileStoreTable extends Serializable {
-
-    String name();
-
-    RowType rowType();
-
-    TableScan newScan();
+import static org.assertj.core.api.Assertions.assertThat;
 
-    TableRead newRead();
+/** Tests for {@link SnapshotManager}. */
+public class SnapshotManagerTest {
 
-    TableWrite newWrite();
+    @TempDir java.nio.file.Path tempDir;
 
-    TableCommit newCommit();
+    @Test
+    public void testSnapshotPath() {
+        SnapshotManager snapshotManager = new SnapshotManager(new Path(tempDir.toString()));
+        for (int i = 0; i < 20; i++) {
+            assertThat(snapshotManager.snapshotPath(i))
+                    .isEqualTo(new Path(tempDir.toString() + "/snapshot/snapshot-" + i));
+        }
+    }
 }