You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by lz...@apache.org on 2022/06/13 02:27:22 UTC
[flink-table-store] branch master updated: [FLINK-27814] Extract snapshot related methods to SnapshotManager
This is an automated email from the ASF dual-hosted git repository.
lzljs3620320 pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/flink-table-store.git
The following commit(s) were added to refs/heads/master by this push:
new b28ccbfe [FLINK-27814] Extract snapshot related methods to SnapshotManager
b28ccbfe is described below
commit b28ccbfe4fda3ddaccbea4489dad84b2e003350a
Author: tsreaper <ts...@gmail.com>
AuthorDate: Mon Jun 13 10:27:17 2022 +0800
[FLINK-27814] Extract snapshot related methods to SnapshotManager
This closes #152
---
.../source/ContinuousFileSplitEnumerator.java | 9 +-
.../store/connector/source/FileStoreSource.java | 7 +-
.../store/connector/AlterTableCompactITCase.java | 23 ++--
.../table/store/connector/sink/TestFileStore.java | 6 +
.../store/connector/source/TestDataReadWrite.java | 4 +
.../apache/flink/table/store/file/FileStore.java | 3 +
.../flink/table/store/file/FileStoreImpl.java | 11 +-
.../store/file/operation/FileStoreCommitImpl.java | 34 +++--
.../store/file/operation/FileStoreExpireImpl.java | 58 ++++----
.../table/store/file/operation/FileStoreScan.java | 7 -
.../store/file/operation/FileStoreScanImpl.java | 35 +----
.../store/file/operation/FileStoreWriteImpl.java | 6 +-
.../store/file/utils/FileStorePathFactory.java | 27 +---
.../table/store/file/utils/SnapshotFinder.java | 112 ---------------
.../table/store/file/utils/SnapshotManager.java | 150 +++++++++++++++++++++
.../table/store/table/AbstractFileStoreTable.java | 10 +-
.../store/table/AppendOnlyFileStoreTable.java | 3 +-
.../table/ChangelogValueCountFileStoreTable.java | 3 +-
.../table/ChangelogWithKeyFileStoreTable.java | 3 +-
.../flink/table/store/table/FileStoreTable.java | 3 +
.../flink/table/store/file/TestFileStore.java | 42 +++---
.../store/file/operation/FileStoreCommitTest.java | 26 ++--
.../store/file/operation/FileStoreExpireTest.java | 50 +++----
.../store/file/operation/FileStoreReadTest.java | 3 +-
.../store/file/operation/FileStoreScanTest.java | 13 +-
.../store/file/utils/FileStorePathFactoryTest.java | 9 --
.../store/file/utils/SnapshotManagerTest.java} | 37 +++--
27 files changed, 338 insertions(+), 356 deletions(-)
diff --git a/flink-table-store-connector/src/main/java/org/apache/flink/table/store/connector/source/ContinuousFileSplitEnumerator.java b/flink-table-store-connector/src/main/java/org/apache/flink/table/store/connector/source/ContinuousFileSplitEnumerator.java
index 78a51c15..66c70473 100644
--- a/flink-table-store-connector/src/main/java/org/apache/flink/table/store/connector/source/ContinuousFileSplitEnumerator.java
+++ b/flink-table-store-connector/src/main/java/org/apache/flink/table/store/connector/source/ContinuousFileSplitEnumerator.java
@@ -23,6 +23,7 @@ import org.apache.flink.api.connector.source.SplitEnumerator;
import org.apache.flink.api.connector.source.SplitEnumeratorContext;
import org.apache.flink.table.store.file.Snapshot;
import org.apache.flink.table.store.file.operation.FileStoreScan;
+import org.apache.flink.table.store.file.utils.SnapshotManager;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@@ -55,6 +56,8 @@ public class ContinuousFileSplitEnumerator
private final FileStoreScan scan;
+ private final SnapshotManager snapshotManager;
+
private final Map<Integer, Queue<FileStoreSourceSplit>> bucketSplits;
private final long discoveryInterval;
@@ -70,12 +73,14 @@ public class ContinuousFileSplitEnumerator
public ContinuousFileSplitEnumerator(
SplitEnumeratorContext<FileStoreSourceSplit> context,
FileStoreScan scan,
+ SnapshotManager snapshotManager,
Collection<FileStoreSourceSplit> remainSplits,
long currentSnapshotId,
long discoveryInterval) {
checkArgument(discoveryInterval > 0L);
this.context = checkNotNull(context);
this.scan = checkNotNull(scan);
+ this.snapshotManager = snapshotManager;
this.bucketSplits = new HashMap<>();
addSplits(remainSplits);
this.currentSnapshotId = currentSnapshotId;
@@ -191,7 +196,7 @@ public class ContinuousFileSplitEnumerator
public EnumeratorResult call() {
// TODO sync with processDiscoveredSplits to avoid too more splits in memory
while (true) {
- if (!scan.snapshotExists(nextSnapshotId)) {
+ if (!snapshotManager.snapshotExists(nextSnapshotId)) {
// TODO check latest snapshot id, expired?
LOG.debug(
"Next snapshot id {} not exists, wait for it to be generated.",
@@ -199,7 +204,7 @@ public class ContinuousFileSplitEnumerator
return null;
}
- Snapshot snapshot = scan.snapshot(nextSnapshotId);
+ Snapshot snapshot = snapshotManager.snapshot(nextSnapshotId);
if (snapshot.commitKind() != Snapshot.CommitKind.APPEND) {
if (snapshot.commitKind() == Snapshot.CommitKind.OVERWRITE) {
LOG.warn("Ignore overwrite snapshot id {}.", nextSnapshotId);
diff --git a/flink-table-store-connector/src/main/java/org/apache/flink/table/store/connector/source/FileStoreSource.java b/flink-table-store-connector/src/main/java/org/apache/flink/table/store/connector/source/FileStoreSource.java
index fccd3c7e..6cce9d9d 100644
--- a/flink-table-store-connector/src/main/java/org/apache/flink/table/store/connector/source/FileStoreSource.java
+++ b/flink-table-store-connector/src/main/java/org/apache/flink/table/store/connector/source/FileStoreSource.java
@@ -31,6 +31,7 @@ import org.apache.flink.table.store.file.WriteMode;
import org.apache.flink.table.store.file.operation.FileStoreRead;
import org.apache.flink.table.store.file.operation.FileStoreScan;
import org.apache.flink.table.store.file.predicate.Predicate;
+import org.apache.flink.table.store.file.utils.SnapshotManager;
import javax.annotation.Nullable;
@@ -126,6 +127,7 @@ public class FileStoreSource
public SplitEnumerator<FileStoreSourceSplit, PendingSplitsCheckpoint> restoreEnumerator(
SplitEnumeratorContext<FileStoreSourceSplit> context,
PendingSplitsCheckpoint checkpoint) {
+ SnapshotManager snapshotManager = fileStore.snapshotManager();
FileStoreScan scan = fileStore.newScan();
if (partitionPredicate != null) {
@@ -147,7 +149,7 @@ public class FileStoreSource
checkArgument(
isContinuous,
"The latest continuous can only be true when isContinuous is true.");
- snapshotId = scan.latestSnapshot();
+ snapshotId = snapshotManager.latestSnapshotId();
splits = new ArrayList<>();
} else {
FileStoreScan.Plan plan = scan.plan();
@@ -169,11 +171,12 @@ public class FileStoreSource
return new ContinuousFileSplitEnumerator(
context,
scan.withIncremental(true), // the subsequent planning is all incremental
+ snapshotManager,
splits,
currentSnapshot,
discoveryInterval);
} else {
- Snapshot snapshot = snapshotId == null ? null : scan.snapshot(snapshotId);
+ Snapshot snapshot = snapshotId == null ? null : snapshotManager.snapshot(snapshotId);
return new StaticFileStoreSplitEnumerator(context, snapshot, splits);
}
}
diff --git a/flink-table-store-connector/src/test/java/org/apache/flink/table/store/connector/AlterTableCompactITCase.java b/flink-table-store-connector/src/test/java/org/apache/flink/table/store/connector/AlterTableCompactITCase.java
index d664f5ee..41e1dbeb 100644
--- a/flink-table-store-connector/src/test/java/org/apache/flink/table/store/connector/AlterTableCompactITCase.java
+++ b/flink-table-store-connector/src/test/java/org/apache/flink/table/store/connector/AlterTableCompactITCase.java
@@ -26,14 +26,13 @@ import org.apache.flink.table.store.file.KeyValue;
import org.apache.flink.table.store.file.Snapshot;
import org.apache.flink.table.store.file.TestKeyValueGenerator;
import org.apache.flink.table.store.file.ValueKind;
-import org.apache.flink.table.store.file.utils.SnapshotFinder;
+import org.apache.flink.table.store.file.utils.SnapshotManager;
import org.apache.flink.types.Row;
import org.apache.flink.types.RowKind;
import org.junit.Test;
import java.io.IOException;
-import java.net.URI;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.List;
@@ -275,18 +274,18 @@ public class AlterTableCompactITCase extends FileStoreTableITCase {
}
}
- private String getSnapshotDir(String tableName) {
- return path
- + relativeTablePath(
- ObjectIdentifier.of(
- bEnv.getCurrentCatalog(), bEnv.getCurrentDatabase(), tableName))
- + "/snapshot";
+ private Path getTableDirectory(String tableName) {
+ return new Path(
+ path
+ + relativeTablePath(
+ ObjectIdentifier.of(
+ bEnv.getCurrentCatalog(),
+ bEnv.getCurrentDatabase(),
+ tableName)));
}
private Snapshot findLatestSnapshot(String tableName) throws IOException {
- String snapshotDir = getSnapshotDir(tableName);
- Long latest = SnapshotFinder.findLatest(new Path(URI.create(snapshotDir)));
- return Snapshot.fromPath(
- new Path(URI.create(snapshotDir + String.format("/snapshot-%d", latest))));
+ SnapshotManager snapshotManager = new SnapshotManager(getTableDirectory(tableName));
+ return snapshotManager.snapshot(snapshotManager.latestSnapshotId());
}
}
diff --git a/flink-table-store-connector/src/test/java/org/apache/flink/table/store/connector/sink/TestFileStore.java b/flink-table-store-connector/src/test/java/org/apache/flink/table/store/connector/sink/TestFileStore.java
index 5c8d255c..3b797e0e 100644
--- a/flink-table-store-connector/src/test/java/org/apache/flink/table/store/connector/sink/TestFileStore.java
+++ b/flink-table-store-connector/src/test/java/org/apache/flink/table/store/connector/sink/TestFileStore.java
@@ -32,6 +32,7 @@ import org.apache.flink.table.store.file.operation.FileStoreScan;
import org.apache.flink.table.store.file.operation.FileStoreWrite;
import org.apache.flink.table.store.file.operation.Lock;
import org.apache.flink.table.store.file.stats.StatsTestUtils;
+import org.apache.flink.table.store.file.utils.SnapshotManager;
import org.apache.flink.table.store.file.writer.CompactWriter;
import org.apache.flink.table.store.file.writer.RecordWriter;
import org.apache.flink.table.types.logical.RowType;
@@ -147,6 +148,11 @@ public class TestFileStore implements FileStore {
throw new UnsupportedOperationException();
}
+ @Override
+ public SnapshotManager snapshotManager() {
+ throw new UnsupportedOperationException();
+ }
+
static class TestRecordWriter implements RecordWriter {
final List<String> records = new ArrayList<>();
diff --git a/flink-table-store-connector/src/test/java/org/apache/flink/table/store/connector/source/TestDataReadWrite.java b/flink-table-store-connector/src/test/java/org/apache/flink/table/store/connector/source/TestDataReadWrite.java
index 8ecba0c9..e17d7b47 100644
--- a/flink-table-store-connector/src/test/java/org/apache/flink/table/store/connector/source/TestDataReadWrite.java
+++ b/flink-table-store-connector/src/test/java/org/apache/flink/table/store/connector/source/TestDataReadWrite.java
@@ -35,6 +35,7 @@ import org.apache.flink.table.store.file.operation.FileStoreRead;
import org.apache.flink.table.store.file.operation.FileStoreReadImpl;
import org.apache.flink.table.store.file.operation.FileStoreWriteImpl;
import org.apache.flink.table.store.file.utils.FileStorePathFactory;
+import org.apache.flink.table.store.file.utils.SnapshotManager;
import org.apache.flink.table.store.file.writer.RecordWriter;
import org.apache.flink.table.types.logical.BigIntType;
import org.apache.flink.table.types.logical.IntType;
@@ -60,6 +61,7 @@ public class TestDataReadWrite {
private final FileFormat avro;
private final FileStorePathFactory pathFactory;
+ private final SnapshotManager snapshotManager;
private final ExecutorService service;
public TestDataReadWrite(String root, ExecutorService service) {
@@ -74,6 +76,7 @@ public class TestDataReadWrite {
RowType.of(new IntType()),
"default",
FileStoreOptions.FILE_FORMAT.defaultValue());
+ this.snapshotManager = new SnapshotManager(new Path(root));
this.service = service;
}
@@ -111,6 +114,7 @@ public class TestDataReadWrite {
new DeduplicateMergeFunction(),
avro,
pathFactory,
+ snapshotManager,
null, // not used, we only create an empty writer
options)
.createEmptyWriter(partition, bucket, service);
diff --git a/flink-table-store-core/src/main/java/org/apache/flink/table/store/file/FileStore.java b/flink-table-store-core/src/main/java/org/apache/flink/table/store/file/FileStore.java
index cc36435f..c6c205ff 100644
--- a/flink-table-store-core/src/main/java/org/apache/flink/table/store/file/FileStore.java
+++ b/flink-table-store-core/src/main/java/org/apache/flink/table/store/file/FileStore.java
@@ -23,6 +23,7 @@ import org.apache.flink.table.store.file.operation.FileStoreExpire;
import org.apache.flink.table.store.file.operation.FileStoreRead;
import org.apache.flink.table.store.file.operation.FileStoreScan;
import org.apache.flink.table.store.file.operation.FileStoreWrite;
+import org.apache.flink.table.store.file.utils.SnapshotManager;
import org.apache.flink.table.types.logical.RowType;
import java.io.Serializable;
@@ -40,6 +41,8 @@ public interface FileStore extends Serializable {
FileStoreScan newScan();
+ SnapshotManager snapshotManager();
+
RowType keyType();
RowType valueType();
diff --git a/flink-table-store-core/src/main/java/org/apache/flink/table/store/file/FileStoreImpl.java b/flink-table-store-core/src/main/java/org/apache/flink/table/store/file/FileStoreImpl.java
index ee7428a9..a0b2d243 100644
--- a/flink-table-store-core/src/main/java/org/apache/flink/table/store/file/FileStoreImpl.java
+++ b/flink-table-store-core/src/main/java/org/apache/flink/table/store/file/FileStoreImpl.java
@@ -33,6 +33,7 @@ import org.apache.flink.table.store.file.operation.FileStoreScanImpl;
import org.apache.flink.table.store.file.operation.FileStoreWriteImpl;
import org.apache.flink.table.store.file.utils.FileStorePathFactory;
import org.apache.flink.table.store.file.utils.KeyComparatorSupplier;
+import org.apache.flink.table.store.file.utils.SnapshotManager;
import org.apache.flink.table.types.logical.BigIntType;
import org.apache.flink.table.types.logical.LogicalType;
import org.apache.flink.table.types.logical.RowType;
@@ -113,6 +114,7 @@ public class FileStoreImpl implements FileStore {
mergeFunction,
options.fileFormat(),
pathFactory(),
+ snapshotManager(),
newScan(),
options.mergeTreeOptions());
}
@@ -136,6 +138,7 @@ public class FileStoreImpl implements FileStore {
user,
partitionType,
pathFactory(),
+ snapshotManager(),
manifestFileFactory(),
manifestListFactory(),
newScan(),
@@ -151,6 +154,7 @@ public class FileStoreImpl implements FileStore {
options.snapshotNumRetainMax(),
options.snapshotTimeRetain().toMillis(),
pathFactory(),
+ snapshotManager(),
manifestFileFactory(),
manifestListFactory());
}
@@ -161,12 +165,17 @@ public class FileStoreImpl implements FileStore {
partitionType,
keyType,
valueType,
- pathFactory(),
+ snapshotManager(),
manifestFileFactory(),
manifestListFactory(),
options.bucket());
}
+ @Override
+ public SnapshotManager snapshotManager() {
+ return new SnapshotManager(options.path());
+ }
+
@Override
public RowType keyType() {
return keyType;
diff --git a/flink-table-store-core/src/main/java/org/apache/flink/table/store/file/operation/FileStoreCommitImpl.java b/flink-table-store-core/src/main/java/org/apache/flink/table/store/file/operation/FileStoreCommitImpl.java
index 1c45ecfa..79b0f8cf 100644
--- a/flink-table-store-core/src/main/java/org/apache/flink/table/store/file/operation/FileStoreCommitImpl.java
+++ b/flink-table-store-core/src/main/java/org/apache/flink/table/store/file/operation/FileStoreCommitImpl.java
@@ -35,7 +35,7 @@ import org.apache.flink.table.store.file.predicate.PredicateConverter;
import org.apache.flink.table.store.file.utils.FileStorePathFactory;
import org.apache.flink.table.store.file.utils.FileUtils;
import org.apache.flink.table.store.file.utils.RowDataToObjectArrayConverter;
-import org.apache.flink.table.store.file.utils.SnapshotFinder;
+import org.apache.flink.table.store.file.utils.SnapshotManager;
import org.apache.flink.table.types.logical.RowType;
import org.slf4j.Logger;
@@ -43,7 +43,6 @@ import org.slf4j.LoggerFactory;
import javax.annotation.Nullable;
-import java.io.IOException;
import java.util.ArrayList;
import java.util.HashSet;
import java.util.LinkedHashMap;
@@ -78,6 +77,7 @@ public class FileStoreCommitImpl implements FileStoreCommit {
private final RowType partitionType;
private final RowDataToObjectArrayConverter partitionObjectConverter;
private final FileStorePathFactory pathFactory;
+ private final SnapshotManager snapshotManager;
private final ManifestFile manifestFile;
private final ManifestList manifestList;
private final FileStoreScan scan;
@@ -92,6 +92,7 @@ public class FileStoreCommitImpl implements FileStoreCommit {
String commitUser,
RowType partitionType,
FileStorePathFactory pathFactory,
+ SnapshotManager snapshotManager,
ManifestFile.Factory manifestFileFactory,
ManifestList.Factory manifestListFactory,
FileStoreScan scan,
@@ -103,6 +104,7 @@ public class FileStoreCommitImpl implements FileStoreCommit {
this.partitionType = partitionType;
this.partitionObjectConverter = new RowDataToObjectArrayConverter(partitionType);
this.pathFactory = pathFactory;
+ this.snapshotManager = snapshotManager;
this.manifestFile = manifestFileFactory.create();
this.manifestList = manifestListFactory.create();
this.scan = scan;
@@ -127,7 +129,7 @@ public class FileStoreCommitImpl implements FileStoreCommit {
}
// if there is no previous snapshots then nothing should be filtered
- Long latestSnapshotId = pathFactory.latestSnapshotId();
+ Long latestSnapshotId = snapshotManager.latestSnapshotId();
if (latestSnapshotId == null) {
return committableList;
}
@@ -139,16 +141,11 @@ public class FileStoreCommitImpl implements FileStoreCommit {
}
for (long id = latestSnapshotId; id >= Snapshot.FIRST_SNAPSHOT_ID; id--) {
- Path snapshotPath = pathFactory.toSnapshotPath(id);
- try {
- if (!snapshotPath.getFileSystem().exists(snapshotPath)) {
- // snapshots before this are expired
- break;
- }
- } catch (IOException e) {
- throw new RuntimeException("Cannot determine if snapshot #" + id + " exists.", e);
+ if (!snapshotManager.snapshotExists(id)) {
+ // snapshots before this are expired
+ break;
}
- Snapshot snapshot = Snapshot.fromPath(snapshotPath);
+ Snapshot snapshot = snapshotManager.snapshot(id);
if (commitUser.equals(snapshot.commitUser())) {
if (identifiers.containsKey(snapshot.commitIdentifier())) {
identifiers.remove(snapshot.commitIdentifier());
@@ -244,7 +241,7 @@ public class FileStoreCommitImpl implements FileStoreCommit {
Snapshot.CommitKind commitKind,
boolean checkDeletedFiles) {
while (true) {
- Long latestSnapshotId = pathFactory.latestSnapshotId();
+ Long latestSnapshotId = snapshotManager.latestSnapshotId();
if (tryCommitOnce(
changes, hash, logOffsets, commitKind, latestSnapshotId, checkDeletedFiles)) {
break;
@@ -258,7 +255,7 @@ public class FileStoreCommitImpl implements FileStoreCommit {
String identifier,
Map<Integer, Long> logOffsets) {
while (true) {
- Long latestSnapshotId = pathFactory.latestSnapshotId();
+ Long latestSnapshotId = snapshotManager.latestSnapshotId();
List<ManifestEntry> changesWithOverwrite = new ArrayList<>();
if (latestSnapshotId != null) {
@@ -323,8 +320,8 @@ public class FileStoreCommitImpl implements FileStoreCommit {
boolean checkDeletedFiles) {
long newSnapshotId =
latestSnapshotId == null ? Snapshot.FIRST_SNAPSHOT_ID : latestSnapshotId + 1;
- Path newSnapshotPath = pathFactory.toSnapshotPath(newSnapshotId);
- Path tmpSnapshotPath = pathFactory.toTmpSnapshotPath(newSnapshotId);
+ Path newSnapshotPath = snapshotManager.snapshotPath(newSnapshotId);
+ Path tmpSnapshotPath = snapshotManager.tmpSnapshotPath(newSnapshotId);
if (LOG.isDebugEnabled()) {
LOG.debug("Ready to commit changes to snapshot #" + newSnapshotId);
@@ -338,7 +335,7 @@ public class FileStoreCommitImpl implements FileStoreCommit {
if (checkDeletedFiles) {
noConflictsOrFail(latestSnapshotId, changes);
}
- latestSnapshot = Snapshot.fromPath(pathFactory.toSnapshotPath(latestSnapshotId));
+ latestSnapshot = snapshotManager.snapshot(latestSnapshotId);
}
Snapshot newSnapshot;
@@ -410,8 +407,7 @@ public class FileStoreCommitImpl implements FileStoreCommit {
() -> {
boolean committed = fs.rename(tmpSnapshotPath, newSnapshotPath);
if (committed) {
- SnapshotFinder.commitLatestHint(
- pathFactory.snapshotDirectory(), newSnapshotId);
+ snapshotManager.commitLatestHint(newSnapshotId);
}
return committed;
};
diff --git a/flink-table-store-core/src/main/java/org/apache/flink/table/store/file/operation/FileStoreExpireImpl.java b/flink-table-store-core/src/main/java/org/apache/flink/table/store/file/operation/FileStoreExpireImpl.java
index 1cc24cc1..690ba672 100644
--- a/flink-table-store-core/src/main/java/org/apache/flink/table/store/file/operation/FileStoreExpireImpl.java
+++ b/flink-table-store-core/src/main/java/org/apache/flink/table/store/file/operation/FileStoreExpireImpl.java
@@ -27,7 +27,7 @@ import org.apache.flink.table.store.file.manifest.ManifestFileMeta;
import org.apache.flink.table.store.file.manifest.ManifestList;
import org.apache.flink.table.store.file.utils.FileStorePathFactory;
import org.apache.flink.table.store.file.utils.FileUtils;
-import org.apache.flink.table.store.file.utils.SnapshotFinder;
+import org.apache.flink.table.store.file.utils.SnapshotManager;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@@ -58,6 +58,7 @@ public class FileStoreExpireImpl implements FileStoreExpire {
private final long millisRetained;
private final FileStorePathFactory pathFactory;
+ private final SnapshotManager snapshotManager;
private final ManifestFile manifestFile;
private final ManifestList manifestList;
@@ -68,12 +69,14 @@ public class FileStoreExpireImpl implements FileStoreExpire {
int numRetainedMax,
long millisRetained,
FileStorePathFactory pathFactory,
+ SnapshotManager snapshotManager,
ManifestFile.Factory manifestFileFactory,
ManifestList.Factory manifestListFactory) {
this.numRetainedMin = numRetainedMin;
this.numRetainedMax = numRetainedMax;
this.millisRetained = millisRetained;
this.pathFactory = pathFactory;
+ this.snapshotManager = snapshotManager;
this.manifestFile = manifestFileFactory.create();
this.manifestList = manifestListFactory.create();
}
@@ -86,7 +89,7 @@ public class FileStoreExpireImpl implements FileStoreExpire {
@Override
public void expire() {
- Long latestSnapshotId = pathFactory.latestSnapshotId();
+ Long latestSnapshotId = snapshotManager.latestSnapshotId();
if (latestSnapshotId == null) {
// no snapshot, nothing to expire
return;
@@ -96,7 +99,7 @@ public class FileStoreExpireImpl implements FileStoreExpire {
Long earliest;
try {
- earliest = SnapshotFinder.findEarliest(pathFactory.snapshotDirectory());
+ earliest = snapshotManager.findEarliest();
} catch (IOException e) {
throw new RuntimeException("Failed to find earliest snapshot id", e);
}
@@ -108,19 +111,13 @@ public class FileStoreExpireImpl implements FileStoreExpire {
for (long id = Math.max(latestSnapshotId - numRetainedMax + 1, earliest);
id <= latestSnapshotId - numRetainedMin;
id++) {
- Path snapshotPath = pathFactory.toSnapshotPath(id);
- try {
- if (snapshotPath.getFileSystem().exists(snapshotPath)
- && currentMillis - Snapshot.fromPath(snapshotPath).timeMillis()
- <= millisRetained) {
- // within time threshold, can assume that all snapshots after it are also within
- // the threshold
- expireUntil(earliest, id);
- return;
- }
- } catch (IOException e) {
- throw new RuntimeException(
- "Failed to determine if snapshot #" + id + " still exists", e);
+ if (snapshotManager.snapshotExists(id)
+ && currentMillis - snapshotManager.snapshot(id).timeMillis()
+ <= millisRetained) {
+ // within time threshold, can assume that all snapshots after it are also within
+ // the threshold
+ expireUntil(earliest, id);
+ return;
}
}
@@ -133,9 +130,8 @@ public class FileStoreExpireImpl implements FileStoreExpire {
// No expire happens:
// write the hint file in order to see the earliest snapshot directly next time
// should avoid duplicate writes when the file exists
- Path hint = new Path(pathFactory.snapshotDirectory(), SnapshotFinder.EARLIEST);
try {
- if (!hint.getFileSystem().exists(hint)) {
+ if (snapshotManager.readHint(SnapshotManager.EARLIEST) == null) {
writeEarliestHint(endExclusiveId);
}
} catch (IOException e) {
@@ -149,17 +145,11 @@ public class FileStoreExpireImpl implements FileStoreExpire {
// find first snapshot to expire
long beginInclusiveId = earliestId;
for (long id = endExclusiveId - 1; id >= earliestId; id--) {
- Path snapshotPath = pathFactory.toSnapshotPath(id);
- try {
- if (!snapshotPath.getFileSystem().exists(snapshotPath)) {
- // only latest snapshots are retained, as we cannot find this snapshot, we can
- // assume that all snapshots preceding it have been removed
- beginInclusiveId = id + 1;
- break;
- }
- } catch (IOException e) {
- throw new RuntimeException(
- "Failed to determine if snapshot #" + id + " still exists", e);
+ if (!snapshotManager.snapshotExists(id)) {
+ // only latest snapshots are retained, as we cannot find this snapshot, we can
+ // assume that all snapshots preceding it have been removed
+ beginInclusiveId = id + 1;
+ break;
}
}
if (LOG.isDebugEnabled()) {
@@ -177,7 +167,7 @@ public class FileStoreExpireImpl implements FileStoreExpire {
LOG.debug("Ready to delete data files in snapshot #" + id);
}
- Snapshot toExpire = Snapshot.fromPath(pathFactory.toSnapshotPath(id));
+ Snapshot toExpire = snapshotManager.snapshot(id);
List<ManifestFileMeta> deltaManifests = manifestList.read(toExpire.deltaManifestList());
// we cannot delete a data file directly when we meet a DELETE entry, because that
@@ -208,7 +198,7 @@ public class FileStoreExpireImpl implements FileStoreExpire {
}
// delete manifests
- Snapshot exclusiveSnapshot = Snapshot.fromPath(pathFactory.toSnapshotPath(endExclusiveId));
+ Snapshot exclusiveSnapshot = snapshotManager.snapshot(endExclusiveId);
Set<ManifestFileMeta> manifestsInUse =
new HashSet<>(exclusiveSnapshot.readAllManifests(manifestList));
// to avoid deleting twice
@@ -218,7 +208,7 @@ public class FileStoreExpireImpl implements FileStoreExpire {
LOG.debug("Ready to delete manifests in snapshot #" + id);
}
- Snapshot toExpire = Snapshot.fromPath(pathFactory.toSnapshotPath(id));
+ Snapshot toExpire = snapshotManager.snapshot(id);
for (ManifestFileMeta manifest : toExpire.readAllManifests(manifestList)) {
if (!manifestsInUse.contains(manifest) && !deletedManifests.contains(manifest)) {
@@ -232,7 +222,7 @@ public class FileStoreExpireImpl implements FileStoreExpire {
manifestList.delete(toExpire.deltaManifestList());
// delete snapshot
- FileUtils.deleteOrWarn(pathFactory.toSnapshotPath(id));
+ FileUtils.deleteOrWarn(snapshotManager.snapshotPath(id));
}
writeEarliestHint(endExclusiveId);
@@ -243,7 +233,7 @@ public class FileStoreExpireImpl implements FileStoreExpire {
Callable<Void> callable =
() -> {
- SnapshotFinder.commitEarliestHint(pathFactory.snapshotDirectory(), earliest);
+ snapshotManager.commitEarliestHint(earliest);
return null;
};
diff --git a/flink-table-store-core/src/main/java/org/apache/flink/table/store/file/operation/FileStoreScan.java b/flink-table-store-core/src/main/java/org/apache/flink/table/store/file/operation/FileStoreScan.java
index 8d4171c7..7a9667be 100644
--- a/flink-table-store-core/src/main/java/org/apache/flink/table/store/file/operation/FileStoreScan.java
+++ b/flink-table-store-core/src/main/java/org/apache/flink/table/store/file/operation/FileStoreScan.java
@@ -19,7 +19,6 @@
package org.apache.flink.table.store.file.operation;
import org.apache.flink.table.data.binary.BinaryRowData;
-import org.apache.flink.table.store.file.Snapshot;
import org.apache.flink.table.store.file.ValueKind;
import org.apache.flink.table.store.file.data.DataFileMeta;
import org.apache.flink.table.store.file.manifest.ManifestEntry;
@@ -38,12 +37,6 @@ import static org.apache.flink.util.Preconditions.checkArgument;
/** Scan operation which produces a plan. */
public interface FileStoreScan {
- Long latestSnapshot();
-
- boolean snapshotExists(long snapshotId);
-
- Snapshot snapshot(long snapshotId);
-
FileStoreScan withPartitionFilter(Predicate predicate);
FileStoreScan withPartitionFilter(List<BinaryRowData> partitions);
diff --git a/flink-table-store-core/src/main/java/org/apache/flink/table/store/file/operation/FileStoreScanImpl.java b/flink-table-store-core/src/main/java/org/apache/flink/table/store/file/operation/FileStoreScanImpl.java
index 6599fe1f..873bcf2a 100644
--- a/flink-table-store-core/src/main/java/org/apache/flink/table/store/file/operation/FileStoreScanImpl.java
+++ b/flink-table-store-core/src/main/java/org/apache/flink/table/store/file/operation/FileStoreScanImpl.java
@@ -18,7 +18,6 @@
package org.apache.flink.table.store.file.operation;
-import org.apache.flink.core.fs.Path;
import org.apache.flink.table.data.binary.BinaryRowData;
import org.apache.flink.table.store.file.Snapshot;
import org.apache.flink.table.store.file.manifest.ManifestEntry;
@@ -29,16 +28,14 @@ import org.apache.flink.table.store.file.predicate.Literal;
import org.apache.flink.table.store.file.predicate.Predicate;
import org.apache.flink.table.store.file.predicate.PredicateBuilder;
import org.apache.flink.table.store.file.stats.FieldStatsArraySerializer;
-import org.apache.flink.table.store.file.utils.FileStorePathFactory;
import org.apache.flink.table.store.file.utils.FileUtils;
import org.apache.flink.table.store.file.utils.RowDataToObjectArrayConverter;
+import org.apache.flink.table.store.file.utils.SnapshotManager;
import org.apache.flink.table.types.logical.RowType;
import org.apache.flink.util.Preconditions;
import javax.annotation.Nullable;
-import java.io.IOException;
-import java.io.UncheckedIOException;
import java.util.ArrayList;
import java.util.Collections;
import java.util.HashMap;
@@ -55,7 +52,7 @@ public class FileStoreScanImpl implements FileStoreScan {
private final FieldStatsArraySerializer keyStatsConverter;
private final FieldStatsArraySerializer valueStatsConverter;
private final RowDataToObjectArrayConverter partitionConverter;
- private final FileStorePathFactory pathFactory;
+ private final SnapshotManager snapshotManager;
private final ManifestFile.Factory manifestFileFactory;
private final ManifestList manifestList;
private final int numOfBuckets;
@@ -73,7 +70,7 @@ public class FileStoreScanImpl implements FileStoreScan {
RowType partitionType,
RowType keyType,
RowType valueType,
- FileStorePathFactory pathFactory,
+ SnapshotManager snapshotManager,
ManifestFile.Factory manifestFileFactory,
ManifestList.Factory manifestListFactory,
int numOfBuckets) {
@@ -81,32 +78,12 @@ public class FileStoreScanImpl implements FileStoreScan {
this.keyStatsConverter = new FieldStatsArraySerializer(keyType);
this.valueStatsConverter = new FieldStatsArraySerializer(valueType);
this.partitionConverter = new RowDataToObjectArrayConverter(partitionType);
- this.pathFactory = pathFactory;
+ this.snapshotManager = snapshotManager;
this.manifestFileFactory = manifestFileFactory;
this.manifestList = manifestListFactory.create();
this.numOfBuckets = numOfBuckets;
}
- @Override
- public Long latestSnapshot() {
- return pathFactory.latestSnapshotId();
- }
-
- @Override
- public boolean snapshotExists(long snapshotId) {
- Path path = pathFactory.toSnapshotPath(snapshotId);
- try {
- return path.getFileSystem().exists(path);
- } catch (IOException e) {
- throw new UncheckedIOException(e);
- }
- }
-
- @Override
- public Snapshot snapshot(long snapshotId) {
- return Snapshot.fromPath(pathFactory.toSnapshotPath(snapshotId));
- }
-
@Override
public FileStoreScan withPartitionFilter(Predicate predicate) {
this.partitionFilter = predicate;
@@ -188,12 +165,12 @@ public class FileStoreScanImpl implements FileStoreScan {
Long snapshotId = specifiedSnapshotId;
if (manifests == null) {
if (snapshotId == null) {
- snapshotId = pathFactory.latestSnapshotId();
+ snapshotId = snapshotManager.latestSnapshotId();
}
if (snapshotId == null) {
manifests = Collections.emptyList();
} else {
- Snapshot snapshot = snapshot(snapshotId);
+ Snapshot snapshot = snapshotManager.snapshot(snapshotId);
manifests =
isIncremental
? manifestList.read(snapshot.deltaManifestList())
diff --git a/flink-table-store-core/src/main/java/org/apache/flink/table/store/file/operation/FileStoreWriteImpl.java b/flink-table-store-core/src/main/java/org/apache/flink/table/store/file/operation/FileStoreWriteImpl.java
index 53c2f3b3..447f06b6 100644
--- a/flink-table-store-core/src/main/java/org/apache/flink/table/store/file/operation/FileStoreWriteImpl.java
+++ b/flink-table-store-core/src/main/java/org/apache/flink/table/store/file/operation/FileStoreWriteImpl.java
@@ -39,6 +39,7 @@ import org.apache.flink.table.store.file.mergetree.compact.MergeFunction;
import org.apache.flink.table.store.file.mergetree.compact.UniversalCompaction;
import org.apache.flink.table.store.file.utils.FileStorePathFactory;
import org.apache.flink.table.store.file.utils.RecordReaderIterator;
+import org.apache.flink.table.store.file.utils.SnapshotManager;
import org.apache.flink.table.store.file.writer.AppendOnlyWriter;
import org.apache.flink.table.store.file.writer.CompactWriter;
import org.apache.flink.table.store.file.writer.RecordWriter;
@@ -64,6 +65,7 @@ public class FileStoreWriteImpl implements FileStoreWrite {
private final Supplier<Comparator<RowData>> keyComparatorSupplier;
private final MergeFunction mergeFunction;
private final FileStorePathFactory pathFactory;
+ private final SnapshotManager snapshotManager;
private final FileStoreScan scan;
private final MergeTreeOptions options;
@@ -75,6 +77,7 @@ public class FileStoreWriteImpl implements FileStoreWrite {
MergeFunction mergeFunction,
FileFormat fileFormat,
FileStorePathFactory pathFactory,
+ SnapshotManager snapshotManager,
FileStoreScan scan,
MergeTreeOptions options) {
this.valueType = valueType;
@@ -88,6 +91,7 @@ public class FileStoreWriteImpl implements FileStoreWrite {
this.keyComparatorSupplier = keyComparatorSupplier;
this.mergeFunction = mergeFunction;
this.pathFactory = pathFactory;
+ this.snapshotManager = snapshotManager;
this.scan = scan;
this.options = options;
}
@@ -95,7 +99,7 @@ public class FileStoreWriteImpl implements FileStoreWrite {
@Override
public RecordWriter createWriter(
BinaryRowData partition, int bucket, ExecutorService compactExecutor) {
- Long latestSnapshotId = pathFactory.latestSnapshotId();
+ Long latestSnapshotId = snapshotManager.latestSnapshotId();
List<DataFileMeta> existingFileMetas = Lists.newArrayList();
if (latestSnapshotId != null) {
// Concat all the DataFileMeta of existing files into existingFileMetas.
diff --git a/flink-table-store-core/src/main/java/org/apache/flink/table/store/file/utils/FileStorePathFactory.java b/flink-table-store-core/src/main/java/org/apache/flink/table/store/file/utils/FileStorePathFactory.java
index 4f79a568..a992a74e 100644
--- a/flink-table-store-core/src/main/java/org/apache/flink/table/store/file/utils/FileStorePathFactory.java
+++ b/flink-table-store-core/src/main/java/org/apache/flink/table/store/file/utils/FileStorePathFactory.java
@@ -31,18 +31,14 @@ import org.apache.flink.table.types.utils.LogicalTypeDataTypeConverter;
import org.apache.flink.table.utils.PartitionPathUtils;
import org.apache.flink.util.Preconditions;
-import javax.annotation.Nullable;
import javax.annotation.concurrent.ThreadSafe;
-import java.io.IOException;
import java.util.HashMap;
import java.util.Map;
import java.util.UUID;
import java.util.concurrent.atomic.AtomicInteger;
-import static org.apache.flink.table.store.file.utils.SnapshotFinder.SNAPSHOT_PREFIX;
-
-/** Factory which produces {@link Path}s for each type of files. */
+/** Factory which produces {@link Path}s for manifest files. */
@ThreadSafe
public class FileStorePathFactory {
@@ -110,14 +106,6 @@ public class FileStorePathFactory {
return new Path(root + "/manifest/" + manifestListName);
}
- public Path toSnapshotPath(long id) {
- return new Path(root + "/snapshot/" + SNAPSHOT_PREFIX + id);
- }
-
- public Path toTmpSnapshotPath(long id) {
- return new Path(root + "/snapshot/." + SNAPSHOT_PREFIX + id + "-" + UUID.randomUUID());
- }
-
public DataFilePathFactory createDataFilePathFactory(BinaryRowData partition, int bucket) {
return new DataFilePathFactory(
root, getPartitionString(partition), bucket, formatIdentifier);
@@ -131,19 +119,6 @@ public class FileStorePathFactory {
partition, "Partition row data is null. This is unexpected.")));
}
- public Path snapshotDirectory() {
- return new Path(root + "/snapshot");
- }
-
- @Nullable
- public Long latestSnapshotId() {
- try {
- return SnapshotFinder.findLatest(snapshotDirectory());
- } catch (IOException e) {
- throw new RuntimeException("Failed to find latest snapshot id", e);
- }
- }
-
@VisibleForTesting
public String uuid() {
return uuid;
diff --git a/flink-table-store-core/src/main/java/org/apache/flink/table/store/file/utils/SnapshotFinder.java b/flink-table-store-core/src/main/java/org/apache/flink/table/store/file/utils/SnapshotFinder.java
deleted file mode 100644
index 026d3f2e..00000000
--- a/flink-table-store-core/src/main/java/org/apache/flink/table/store/file/utils/SnapshotFinder.java
+++ /dev/null
@@ -1,112 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.table.store.file.utils;
-
-import org.apache.flink.annotation.VisibleForTesting;
-import org.apache.flink.core.fs.FileSystem;
-import org.apache.flink.core.fs.Path;
-
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-import java.io.IOException;
-import java.util.UUID;
-import java.util.function.BinaryOperator;
-
-import static org.apache.flink.table.store.file.utils.FileUtils.listVersionedFiles;
-
-/** Find latest and earliest snapshot. */
-public class SnapshotFinder {
-
- private static final Logger LOG = LoggerFactory.getLogger(SnapshotFinder.class);
-
- public static final String SNAPSHOT_PREFIX = "snapshot-";
-
- public static final String EARLIEST = "EARLIEST";
-
- public static final String LATEST = "LATEST";
-
- public static Long findLatest(Path snapshotDir) throws IOException {
- FileSystem fs = snapshotDir.getFileSystem();
- if (!fs.exists(snapshotDir)) {
- return null;
- }
-
- Long snapshotId = readHint(snapshotDir, LATEST);
- if (snapshotId != null) {
- long nextSnapshot = snapshotId + 1;
- // it is the latest only there is no next one
- if (!fs.exists(new Path(snapshotDir, SNAPSHOT_PREFIX + nextSnapshot))) {
- return snapshotId;
- }
- }
-
- return findByListFiles(snapshotDir, Math::max);
- }
-
- public static Long findEarliest(Path snapshotDir) throws IOException {
- FileSystem fs = snapshotDir.getFileSystem();
- if (!fs.exists(snapshotDir)) {
- return null;
- }
-
- Long snapshotId = readHint(snapshotDir, EARLIEST);
- // null and it is the earliest only it exists
- if (snapshotId != null && fs.exists(new Path(snapshotDir, SNAPSHOT_PREFIX + snapshotId))) {
- return snapshotId;
- }
-
- return findByListFiles(snapshotDir, Math::min);
- }
-
- @VisibleForTesting
- public static Long readHint(Path snapshotDir, String fileName) throws IOException {
- Path path = new Path(snapshotDir, fileName);
- if (path.getFileSystem().exists(path)) {
- return Long.parseLong(FileUtils.readFileUtf8(path));
- }
- return null;
- }
-
- private static Long findByListFiles(Path snapshotDir, BinaryOperator<Long> reducer)
- throws IOException {
- return listVersionedFiles(snapshotDir, SNAPSHOT_PREFIX).reduce(reducer).orElse(null);
- }
-
- public static void commitLatestHint(Path snapshotDir, long snapshotId) throws IOException {
- commitHint(snapshotDir, snapshotId, LATEST);
- }
-
- public static void commitEarliestHint(Path snapshotDir, long snapshotId) throws IOException {
- commitHint(snapshotDir, snapshotId, EARLIEST);
- }
-
- private static void commitHint(Path snapshotDir, long snapshotId, String fileName)
- throws IOException {
- FileSystem fs = snapshotDir.getFileSystem();
- Path hintFile = new Path(snapshotDir, fileName);
- Path tempFile = new Path(snapshotDir, UUID.randomUUID() + "-" + fileName + ".temp");
- FileUtils.writeFileUtf8(tempFile, String.valueOf(snapshotId));
- fs.delete(hintFile, false);
- boolean success = fs.rename(tempFile, hintFile);
- if (!success) {
- fs.delete(tempFile, false);
- }
- }
-}
diff --git a/flink-table-store-core/src/main/java/org/apache/flink/table/store/file/utils/SnapshotManager.java b/flink-table-store-core/src/main/java/org/apache/flink/table/store/file/utils/SnapshotManager.java
new file mode 100644
index 00000000..8bdbc015
--- /dev/null
+++ b/flink-table-store-core/src/main/java/org/apache/flink/table/store/file/utils/SnapshotManager.java
@@ -0,0 +1,150 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.store.file.utils;
+
+import org.apache.flink.core.fs.FileSystem;
+import org.apache.flink.core.fs.Path;
+import org.apache.flink.table.store.file.Snapshot;
+
+import javax.annotation.Nullable;
+
+import java.io.IOException;
+import java.util.UUID;
+import java.util.function.BinaryOperator;
+
+import static org.apache.flink.table.store.file.utils.FileUtils.listVersionedFiles;
+
+/** Manager for {@link Snapshot}, providing utility methods related to paths and snapshot hints. */
+public class SnapshotManager {
+
+ private static final String SNAPSHOT_PREFIX = "snapshot-";
+ public static final String EARLIEST = "EARLIEST";
+ public static final String LATEST = "LATEST";
+
+ private final Path tablePath;
+
+ public SnapshotManager(Path tablePath) {
+ this.tablePath = tablePath;
+ }
+
+ public Path snapshotDirectory() {
+ return new Path(tablePath + "/snapshot");
+ }
+
+ public Path snapshotPath(long snapshotId) {
+ return new Path(tablePath + "/snapshot/" + SNAPSHOT_PREFIX + snapshotId);
+ }
+
+ public Path tmpSnapshotPath(long id) {
+ return new Path(tablePath + "/snapshot/." + SNAPSHOT_PREFIX + id + "-" + UUID.randomUUID());
+ }
+
+ public Snapshot snapshot(long snapshotId) {
+ return Snapshot.fromPath(snapshotPath(snapshotId));
+ }
+
+ public boolean snapshotExists(long snapshotId) {
+ Path path = snapshotPath(snapshotId);
+ try {
+ return path.getFileSystem().exists(path);
+ } catch (IOException e) {
+ throw new RuntimeException(
+ "Failed to determine if snapshot #" + snapshotId + " exists in path " + path,
+ e);
+ }
+ }
+
+ public @Nullable Long latestSnapshotId() {
+ try {
+ return findLatest();
+ } catch (IOException e) {
+ throw new RuntimeException("Failed to find latest snapshot id", e);
+ }
+ }
+
+ public Long findLatest() throws IOException {
+ Path snapshotDir = snapshotDirectory();
+ FileSystem fs = snapshotDir.getFileSystem();
+ if (!fs.exists(snapshotDir)) {
+ return null;
+ }
+
+ Long snapshotId = readHint(LATEST);
+ if (snapshotId != null) {
+ long nextSnapshot = snapshotId + 1;
+ // it is the latest only there is no next one
+ if (!snapshotExists(nextSnapshot)) {
+ return snapshotId;
+ }
+ }
+
+ return findByListFiles(Math::max);
+ }
+
+ public Long findEarliest() throws IOException {
+ Path snapshotDir = snapshotDirectory();
+ FileSystem fs = snapshotDir.getFileSystem();
+ if (!fs.exists(snapshotDir)) {
+ return null;
+ }
+
+ Long snapshotId = readHint(EARLIEST);
+ // null and it is the earliest only it exists
+ if (snapshotId != null && snapshotExists(snapshotId)) {
+ return snapshotId;
+ }
+
+ return findByListFiles(Math::min);
+ }
+
+ public Long readHint(String fileName) throws IOException {
+ Path snapshotDir = snapshotDirectory();
+ Path path = new Path(snapshotDir, fileName);
+ if (path.getFileSystem().exists(path)) {
+ return Long.parseLong(FileUtils.readFileUtf8(path));
+ }
+ return null;
+ }
+
+ private Long findByListFiles(BinaryOperator<Long> reducer) throws IOException {
+ Path snapshotDir = snapshotDirectory();
+ return listVersionedFiles(snapshotDir, SNAPSHOT_PREFIX).reduce(reducer).orElse(null);
+ }
+
+ public void commitLatestHint(long snapshotId) throws IOException {
+ commitHint(snapshotId, LATEST);
+ }
+
+ public void commitEarliestHint(long snapshotId) throws IOException {
+ commitHint(snapshotId, EARLIEST);
+ }
+
+ private void commitHint(long snapshotId, String fileName) throws IOException {
+ Path snapshotDir = snapshotDirectory();
+ FileSystem fs = snapshotDir.getFileSystem();
+ Path hintFile = new Path(snapshotDir, fileName);
+ Path tempFile = new Path(snapshotDir, UUID.randomUUID() + "-" + fileName + ".temp");
+ FileUtils.writeFileUtf8(tempFile, String.valueOf(snapshotId));
+ fs.delete(hintFile, false);
+ boolean success = fs.rename(tempFile, hintFile);
+ if (!success) {
+ fs.delete(tempFile, false);
+ }
+ }
+}
diff --git a/flink-table-store-core/src/main/java/org/apache/flink/table/store/table/AbstractFileStoreTable.java b/flink-table-store-core/src/main/java/org/apache/flink/table/store/table/AbstractFileStoreTable.java
index 6d52c0b3..6f7a34d4 100644
--- a/flink-table-store-core/src/main/java/org/apache/flink/table/store/table/AbstractFileStoreTable.java
+++ b/flink-table-store-core/src/main/java/org/apache/flink/table/store/table/AbstractFileStoreTable.java
@@ -18,8 +18,9 @@
package org.apache.flink.table.store.table;
-import org.apache.flink.table.store.file.FileStore;
+import org.apache.flink.table.store.file.FileStoreImpl;
import org.apache.flink.table.store.file.schema.Schema;
+import org.apache.flink.table.store.file.utils.SnapshotManager;
import org.apache.flink.table.store.table.sink.TableCommit;
import org.apache.flink.table.types.logical.RowType;
@@ -46,10 +47,15 @@ public abstract class AbstractFileStoreTable implements FileStoreTable {
return schema.logicalRowType();
}
+ @Override
+ public SnapshotManager snapshotManager() {
+ return store().snapshotManager();
+ }
+
@Override
public TableCommit newCommit() {
return new TableCommit(store().newCommit(), store().newExpire());
}
- protected abstract FileStore store();
+ protected abstract FileStoreImpl store();
}
diff --git a/flink-table-store-core/src/main/java/org/apache/flink/table/store/table/AppendOnlyFileStoreTable.java b/flink-table-store-core/src/main/java/org/apache/flink/table/store/table/AppendOnlyFileStoreTable.java
index 70e31059..26472592 100644
--- a/flink-table-store-core/src/main/java/org/apache/flink/table/store/table/AppendOnlyFileStoreTable.java
+++ b/flink-table-store-core/src/main/java/org/apache/flink/table/store/table/AppendOnlyFileStoreTable.java
@@ -20,7 +20,6 @@ package org.apache.flink.table.store.table;
import org.apache.flink.table.data.RowData;
import org.apache.flink.table.data.binary.BinaryRowDataUtil;
-import org.apache.flink.table.store.file.FileStore;
import org.apache.flink.table.store.file.FileStoreImpl;
import org.apache.flink.table.store.file.FileStoreOptions;
import org.apache.flink.table.store.file.KeyValue;
@@ -111,7 +110,7 @@ public class AppendOnlyFileStoreTable extends AbstractFileStoreTable {
}
@Override
- public FileStore store() {
+ public FileStoreImpl store() {
return store;
}
}
diff --git a/flink-table-store-core/src/main/java/org/apache/flink/table/store/table/ChangelogValueCountFileStoreTable.java b/flink-table-store-core/src/main/java/org/apache/flink/table/store/table/ChangelogValueCountFileStoreTable.java
index 071e8f56..caa98e3b 100644
--- a/flink-table-store-core/src/main/java/org/apache/flink/table/store/table/ChangelogValueCountFileStoreTable.java
+++ b/flink-table-store-core/src/main/java/org/apache/flink/table/store/table/ChangelogValueCountFileStoreTable.java
@@ -20,7 +20,6 @@ package org.apache.flink.table.store.table;
import org.apache.flink.table.data.GenericRowData;
import org.apache.flink.table.data.RowData;
-import org.apache.flink.table.store.file.FileStore;
import org.apache.flink.table.store.file.FileStoreImpl;
import org.apache.flink.table.store.file.FileStoreOptions;
import org.apache.flink.table.store.file.KeyValue;
@@ -134,7 +133,7 @@ public class ChangelogValueCountFileStoreTable extends AbstractFileStoreTable {
}
@Override
- public FileStore store() {
+ public FileStoreImpl store() {
return store;
}
}
diff --git a/flink-table-store-core/src/main/java/org/apache/flink/table/store/table/ChangelogWithKeyFileStoreTable.java b/flink-table-store-core/src/main/java/org/apache/flink/table/store/table/ChangelogWithKeyFileStoreTable.java
index fb56416b..f02259e6 100644
--- a/flink-table-store-core/src/main/java/org/apache/flink/table/store/table/ChangelogWithKeyFileStoreTable.java
+++ b/flink-table-store-core/src/main/java/org/apache/flink/table/store/table/ChangelogWithKeyFileStoreTable.java
@@ -20,7 +20,6 @@ package org.apache.flink.table.store.table;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.table.data.RowData;
-import org.apache.flink.table.store.file.FileStore;
import org.apache.flink.table.store.file.FileStoreImpl;
import org.apache.flink.table.store.file.FileStoreOptions;
import org.apache.flink.table.store.file.KeyValue;
@@ -183,7 +182,7 @@ public class ChangelogWithKeyFileStoreTable extends AbstractFileStoreTable {
}
@Override
- public FileStore store() {
+ public FileStoreImpl store() {
return store;
}
}
diff --git a/flink-table-store-core/src/main/java/org/apache/flink/table/store/table/FileStoreTable.java b/flink-table-store-core/src/main/java/org/apache/flink/table/store/table/FileStoreTable.java
index 65645f76..7bd7feb6 100644
--- a/flink-table-store-core/src/main/java/org/apache/flink/table/store/table/FileStoreTable.java
+++ b/flink-table-store-core/src/main/java/org/apache/flink/table/store/table/FileStoreTable.java
@@ -18,6 +18,7 @@
package org.apache.flink.table.store.table;
+import org.apache.flink.table.store.file.utils.SnapshotManager;
import org.apache.flink.table.store.table.sink.TableCommit;
import org.apache.flink.table.store.table.sink.TableWrite;
import org.apache.flink.table.store.table.source.TableRead;
@@ -36,6 +37,8 @@ public interface FileStoreTable extends Serializable {
RowType rowType();
+ SnapshotManager snapshotManager();
+
TableScan newScan();
TableRead newRead();
diff --git a/flink-table-store-core/src/test/java/org/apache/flink/table/store/file/TestFileStore.java b/flink-table-store-core/src/test/java/org/apache/flink/table/store/file/TestFileStore.java
index e1688f2b..a8180385 100644
--- a/flink-table-store-core/src/test/java/org/apache/flink/table/store/file/TestFileStore.java
+++ b/flink-table-store-core/src/test/java/org/apache/flink/table/store/file/TestFileStore.java
@@ -38,7 +38,7 @@ import org.apache.flink.table.store.file.operation.FileStoreScan;
import org.apache.flink.table.store.file.operation.FileStoreWrite;
import org.apache.flink.table.store.file.utils.FileStorePathFactory;
import org.apache.flink.table.store.file.utils.RecordReaderIterator;
-import org.apache.flink.table.store.file.utils.SnapshotFinder;
+import org.apache.flink.table.store.file.utils.SnapshotManager;
import org.apache.flink.table.store.file.writer.RecordWriter;
import org.apache.flink.table.types.logical.RowType;
import org.apache.flink.util.function.QuadFunction;
@@ -131,6 +131,7 @@ public class TestFileStore extends FileStoreImpl {
numRetainedMax,
millisRetained,
pathFactory(),
+ snapshotManager(),
manifestFileFactory(),
manifestListFactory());
}
@@ -216,13 +217,13 @@ public class TestFileStore extends FileStoreImpl {
}
}
- FileStorePathFactory pathFactory = pathFactory();
- Long snapshotIdBeforeCommit = pathFactory.latestSnapshotId();
+ SnapshotManager snapshotManager = snapshotManager();
+ Long snapshotIdBeforeCommit = snapshotManager.latestSnapshotId();
if (snapshotIdBeforeCommit == null) {
snapshotIdBeforeCommit = Snapshot.FIRST_SNAPSHOT_ID - 1;
}
commitFunction.accept(commit, committable);
- Long snapshotIdAfterCommit = pathFactory.latestSnapshotId();
+ Long snapshotIdAfterCommit = snapshotManager.latestSnapshotId();
if (snapshotIdAfterCommit == null) {
snapshotIdAfterCommit = Snapshot.FIRST_SNAPSHOT_ID - 1;
}
@@ -240,7 +241,7 @@ public class TestFileStore extends FileStoreImpl {
List<Snapshot> snapshots = new ArrayList<>();
for (long id = snapshotIdBeforeCommit + 1; id <= snapshotIdAfterCommit; id++) {
- snapshots.add(Snapshot.fromPath(pathFactory.toSnapshotPath(id)));
+ snapshots.add(snapshotManager.snapshot(id));
}
return snapshots;
}
@@ -320,18 +321,19 @@ public class TestFileStore extends FileStoreImpl {
// possibly not the most accurate, so this check is only.
// - latest should < true_latest
// - earliest should < true_earliest
- Path snapshotDir = pathFactory().snapshotDirectory();
- Path earliest = new Path(snapshotDir, SnapshotFinder.EARLIEST);
- Path latest = new Path(snapshotDir, SnapshotFinder.LATEST);
+ SnapshotManager snapshotManager = snapshotManager();
+ Path snapshotDir = snapshotManager.snapshotDirectory();
+ Path earliest = new Path(snapshotDir, SnapshotManager.EARLIEST);
+ Path latest = new Path(snapshotDir, SnapshotManager.LATEST);
if (actualFiles.remove(earliest)) {
- long earliestId = SnapshotFinder.readHint(snapshotDir, SnapshotFinder.EARLIEST);
+ long earliestId = snapshotManager.readHint(SnapshotManager.EARLIEST);
earliest.getFileSystem().delete(earliest, false);
- assertThat(earliestId <= SnapshotFinder.findEarliest(snapshotDir)).isTrue();
+ assertThat(earliestId <= snapshotManager.findEarliest()).isTrue();
}
if (actualFiles.remove(latest)) {
- long latestId = SnapshotFinder.readHint(snapshotDir, SnapshotFinder.LATEST);
+ long latestId = snapshotManager.readHint(SnapshotManager.LATEST);
latest.getFileSystem().delete(latest, false);
- assertThat(latestId <= SnapshotFinder.findLatest(snapshotDir)).isTrue();
+ assertThat(latestId <= snapshotManager.findLatest()).isTrue();
}
actualFiles.remove(latest);
@@ -345,27 +347,23 @@ public class TestFileStore extends FileStoreImpl {
FileStorePathFactory.DataFilePathFactoryCache dataFilePathFactoryCache =
new FileStorePathFactory.DataFilePathFactoryCache(pathFactory);
- Long latestSnapshotId = pathFactory.latestSnapshotId();
+ SnapshotManager snapshotManager = snapshotManager();
+ Long latestSnapshotId = snapshotManager.latestSnapshotId();
if (latestSnapshotId == null) {
return Collections.emptySet();
}
long firstInUseSnapshotId = Snapshot.FIRST_SNAPSHOT_ID;
for (long id = latestSnapshotId - 1; id >= Snapshot.FIRST_SNAPSHOT_ID; id--) {
- Path snapshotPath = pathFactory.toSnapshotPath(id);
- try {
- if (!snapshotPath.getFileSystem().exists(snapshotPath)) {
- firstInUseSnapshotId = id + 1;
- break;
- }
- } catch (IOException e) {
- throw new RuntimeException(e);
+ if (!snapshotManager.snapshotExists(id)) {
+ firstInUseSnapshotId = id + 1;
+ break;
}
}
Set<Path> result = new HashSet<>();
for (long id = firstInUseSnapshotId; id <= latestSnapshotId; id++) {
- Path snapshotPath = pathFactory.toSnapshotPath(id);
+ Path snapshotPath = snapshotManager.snapshotPath(id);
Snapshot snapshot = Snapshot.fromPath(snapshotPath);
// snapshot file
diff --git a/flink-table-store-core/src/test/java/org/apache/flink/table/store/file/operation/FileStoreCommitTest.java b/flink-table-store-core/src/test/java/org/apache/flink/table/store/file/operation/FileStoreCommitTest.java
index ed270788..5c07c31e 100644
--- a/flink-table-store-core/src/test/java/org/apache/flink/table/store/file/operation/FileStoreCommitTest.java
+++ b/flink-table-store-core/src/test/java/org/apache/flink/table/store/file/operation/FileStoreCommitTest.java
@@ -28,9 +28,8 @@ import org.apache.flink.table.store.file.ValueKind;
import org.apache.flink.table.store.file.manifest.ManifestCommittable;
import org.apache.flink.table.store.file.mergetree.compact.DeduplicateMergeFunction;
import org.apache.flink.table.store.file.utils.FailingAtomicRenameFileSystem;
-import org.apache.flink.table.store.file.utils.FileStorePathFactory;
import org.apache.flink.table.store.file.utils.FileUtils;
-import org.apache.flink.table.store.file.utils.SnapshotFinder;
+import org.apache.flink.table.store.file.utils.SnapshotManager;
import org.apache.flink.table.store.file.utils.TestAtomicRenameFileSystem;
import org.junit.jupiter.api.BeforeEach;
@@ -41,7 +40,6 @@ import org.junit.jupiter.params.provider.ValueSource;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
-import java.io.IOException;
import java.util.ArrayList;
import java.util.Collection;
import java.util.Collections;
@@ -63,7 +61,7 @@ public class FileStoreCommitTest {
@TempDir java.nio.file.Path tempDir;
@BeforeEach
- public void beforeEach() throws IOException {
+ public void beforeEach() {
gen = new TestKeyValueGenerator();
// for failure tests
FailingAtomicRenameFileSystem.get().reset(100, 5000);
@@ -84,17 +82,18 @@ public class FileStoreCommitTest {
@Test
public void testLatestHint() throws Exception {
testRandomConcurrentNoConflict(1, false);
- Path snapshotDir = createStore(false, 1).pathFactory().snapshotDirectory();
- Path latest = new Path(snapshotDir, SnapshotFinder.LATEST);
+ SnapshotManager snapshotManager = createStore(false, 1).snapshotManager();
+ Path snapshotDir = snapshotManager.snapshotDirectory();
+ Path latest = new Path(snapshotDir, SnapshotManager.LATEST);
assertThat(latest.getFileSystem().exists(latest)).isTrue();
- Long latestId = SnapshotFinder.findLatest(snapshotDir);
+ Long latestId = snapshotManager.findLatest();
// remove latest hint file
latest.getFileSystem().delete(latest, false);
- assertThat(SnapshotFinder.findLatest(snapshotDir)).isEqualTo(latestId);
+ assertThat(snapshotManager.findLatest()).isEqualTo(latestId);
}
@Test
@@ -102,8 +101,8 @@ public class FileStoreCommitTest {
testRandomConcurrentNoConflict(1, false);
// remove first snapshot to mimic expiration
TestFileStore store = createStore(false);
- FileStorePathFactory pathFactory = store.pathFactory();
- Path firstSnapshotPath = pathFactory.toSnapshotPath(Snapshot.FIRST_SNAPSHOT_ID);
+ SnapshotManager snapshotManager = store.snapshotManager();
+ Path firstSnapshotPath = snapshotManager.snapshotPath(Snapshot.FIRST_SNAPSHOT_ID);
FileUtils.deleteOrWarn(firstSnapshotPath);
// this test succeeds if this call does not fail
store.newCommit()
@@ -159,7 +158,7 @@ public class FileStoreCommitTest {
.collect(Collectors.toList()));
// read actual data and compare
- Long snapshotId = store.pathFactory().latestSnapshotId();
+ Long snapshotId = store.snapshotManager().latestSnapshotId();
assertThat(snapshotId).isNotNull();
List<KeyValue> actualKvs = store.readKvsFromSnapshot(snapshotId);
gen.sort(actualKvs);
@@ -231,7 +230,7 @@ public class FileStoreCommitTest {
Map<BinaryRowData, BinaryRowData> expected = store.toKvMap(expectedKvs);
List<KeyValue> actualKvs =
- store.readKvsFromSnapshot(store.pathFactory().latestSnapshotId());
+ store.readKvsFromSnapshot(store.snapshotManager().latestSnapshotId());
gen.sort(actualKvs);
Map<BinaryRowData, BinaryRowData> actual = store.toKvMap(actualKvs);
@@ -276,9 +275,8 @@ public class FileStoreCommitTest {
store.commitData(
Collections.emptyList(), gen::getPartition, kv -> 0, Collections.emptyMap());
- Path snapshotDir = store.pathFactory().snapshotDirectory();
- assertThat(SnapshotFinder.findLatest(snapshotDir)).isEqualTo(snapshot.id());
+ assertThat(store.snapshotManager().findLatest()).isEqualTo(snapshot.id());
}
private TestFileStore createStore(boolean failing) {
diff --git a/flink-table-store-core/src/test/java/org/apache/flink/table/store/file/operation/FileStoreExpireTest.java b/flink-table-store-core/src/test/java/org/apache/flink/table/store/file/operation/FileStoreExpireTest.java
index 25516460..7b9bc4b0 100644
--- a/flink-table-store-core/src/test/java/org/apache/flink/table/store/file/operation/FileStoreExpireTest.java
+++ b/flink-table-store-core/src/test/java/org/apache/flink/table/store/file/operation/FileStoreExpireTest.java
@@ -18,7 +18,6 @@
package org.apache.flink.table.store.file.operation;
-import org.apache.flink.core.fs.FileSystem;
import org.apache.flink.core.fs.Path;
import org.apache.flink.table.data.binary.BinaryRowData;
import org.apache.flink.table.store.file.KeyValue;
@@ -26,8 +25,7 @@ import org.apache.flink.table.store.file.Snapshot;
import org.apache.flink.table.store.file.TestFileStore;
import org.apache.flink.table.store.file.TestKeyValueGenerator;
import org.apache.flink.table.store.file.mergetree.compact.DeduplicateMergeFunction;
-import org.apache.flink.table.store.file.utils.FileStorePathFactory;
-import org.apache.flink.table.store.file.utils.SnapshotFinder;
+import org.apache.flink.table.store.file.utils.SnapshotManager;
import org.junit.jupiter.api.AfterEach;
import org.junit.jupiter.api.BeforeEach;
@@ -48,7 +46,7 @@ public class FileStoreExpireTest {
private TestKeyValueGenerator gen;
@TempDir java.nio.file.Path tempDir;
private TestFileStore store;
- private FileStorePathFactory pathFactory;
+ private SnapshotManager snapshotManager;
@BeforeEach
public void beforeEach() throws IOException {
@@ -62,7 +60,7 @@ public class FileStoreExpireTest {
TestKeyValueGenerator.KEY_TYPE,
TestKeyValueGenerator.DEFAULT_ROW_TYPE,
new DeduplicateMergeFunction());
- pathFactory = store.pathFactory();
+ snapshotManager = store.snapshotManager();
}
@AfterEach
@@ -75,7 +73,7 @@ public class FileStoreExpireTest {
FileStoreExpire expire = store.newExpire(1, 3, Long.MAX_VALUE);
expire.expire();
- assertThat(pathFactory.latestSnapshotId()).isNull();
+ assertThat(snapshotManager.latestSnapshotId()).isNull();
}
@Test
@@ -83,13 +81,12 @@ public class FileStoreExpireTest {
List<KeyValue> allData = new ArrayList<>();
List<Integer> snapshotPositions = new ArrayList<>();
commit(2, allData, snapshotPositions);
- int latestSnapshotId = pathFactory.latestSnapshotId().intValue();
+ int latestSnapshotId = snapshotManager.latestSnapshotId().intValue();
FileStoreExpire expire = store.newExpire(1, latestSnapshotId + 1, Long.MAX_VALUE);
expire.expire();
- FileSystem fs = pathFactory.toSnapshotPath(latestSnapshotId).getFileSystem();
for (int i = 1; i <= latestSnapshotId; i++) {
- assertThat(fs.exists(pathFactory.toSnapshotPath(i))).isTrue();
+ assertThat(snapshotManager.snapshotExists(i)).isTrue();
assertSnapshot(i, allData, snapshotPositions);
}
}
@@ -99,13 +96,12 @@ public class FileStoreExpireTest {
List<KeyValue> allData = new ArrayList<>();
List<Integer> snapshotPositions = new ArrayList<>();
commit(5, allData, snapshotPositions);
- int latestSnapshotId = pathFactory.latestSnapshotId().intValue();
+ int latestSnapshotId = snapshotManager.latestSnapshotId().intValue();
FileStoreExpire expire = store.newExpire(1, Integer.MAX_VALUE, Long.MAX_VALUE);
expire.expire();
- FileSystem fs = pathFactory.toSnapshotPath(latestSnapshotId).getFileSystem();
for (int i = 1; i <= latestSnapshotId; i++) {
- assertThat(fs.exists(pathFactory.toSnapshotPath(i))).isTrue();
+ assertThat(snapshotManager.snapshotExists(i)).isTrue();
assertSnapshot(i, allData, snapshotPositions);
}
}
@@ -118,17 +114,16 @@ public class FileStoreExpireTest {
List<KeyValue> allData = new ArrayList<>();
List<Integer> snapshotPositions = new ArrayList<>();
commit(numRetainedMin + random.nextInt(5), allData, snapshotPositions);
- int latestSnapshotId = pathFactory.latestSnapshotId().intValue();
+ int latestSnapshotId = snapshotManager.latestSnapshotId().intValue();
Thread.sleep(100);
FileStoreExpire expire = store.newExpire(numRetainedMin, Integer.MAX_VALUE, 1);
expire.expire();
- FileSystem fs = pathFactory.toSnapshotPath(latestSnapshotId).getFileSystem();
for (int i = 1; i <= latestSnapshotId - numRetainedMin; i++) {
- assertThat(fs.exists(pathFactory.toSnapshotPath(i))).isFalse();
+ assertThat(snapshotManager.snapshotExists(i)).isFalse();
}
for (int i = latestSnapshotId - numRetainedMin + 1; i <= latestSnapshotId; i++) {
- assertThat(fs.exists(pathFactory.toSnapshotPath(i))).isTrue();
+ assertThat(snapshotManager.snapshotExists(i)).isTrue();
assertSnapshot(i, allData, snapshotPositions);
}
}
@@ -143,31 +138,30 @@ public class FileStoreExpireTest {
commit(ThreadLocalRandom.current().nextInt(5) + 1, allData, snapshotPositions);
expire.expire();
- int latestSnapshotId = pathFactory.latestSnapshotId().intValue();
- FileSystem fs = pathFactory.toSnapshotPath(latestSnapshotId).getFileSystem();
+ int latestSnapshotId = snapshotManager.latestSnapshotId().intValue();
for (int j = 1; j <= latestSnapshotId; j++) {
if (j > latestSnapshotId - 3) {
- assertThat(fs.exists(pathFactory.toSnapshotPath(j))).isTrue();
+ assertThat(snapshotManager.snapshotExists(j)).isTrue();
assertSnapshot(j, allData, snapshotPositions);
} else {
- assertThat(fs.exists(pathFactory.toSnapshotPath(j))).isFalse();
+ assertThat(snapshotManager.snapshotExists(j)).isFalse();
}
}
}
// validate earliest hint file
- Path snapshotDir = pathFactory.snapshotDirectory();
- Path earliest = new Path(snapshotDir, SnapshotFinder.EARLIEST);
+ Path snapshotDir = snapshotManager.snapshotDirectory();
+ Path earliest = new Path(snapshotDir, SnapshotManager.EARLIEST);
assertThat(earliest.getFileSystem().exists(earliest)).isTrue();
- Long earliestId = SnapshotFinder.findEarliest(snapshotDir);
+ Long earliestId = snapshotManager.findEarliest();
// remove earliest hint file
earliest.getFileSystem().delete(earliest, false);
- assertThat(SnapshotFinder.findEarliest(snapshotDir)).isEqualTo(earliestId);
+ assertThat(snapshotManager.findEarliest()).isEqualTo(earliestId);
}
@Test
@@ -184,12 +178,10 @@ public class FileStoreExpireTest {
expire.expire();
expire.expire();
- int latestSnapshotId = pathFactory.latestSnapshotId().intValue();
- FileSystem fs = pathFactory.toSnapshotPath(latestSnapshotId).getFileSystem();
+ int latestSnapshotId = snapshotManager.latestSnapshotId().intValue();
for (int i = 1; i <= latestSnapshotId; i++) {
- Path snapshotPath = pathFactory.toSnapshotPath(i);
- if (fs.exists(snapshotPath)) {
- assertThat(Snapshot.fromPath(snapshotPath).timeMillis())
+ if (snapshotManager.snapshotExists(i)) {
+ assertThat(snapshotManager.snapshot(i).timeMillis())
.isBetween(expireMillis - 1000, expireMillis);
assertSnapshot(i, allData, snapshotPositions);
}
diff --git a/flink-table-store-core/src/test/java/org/apache/flink/table/store/file/operation/FileStoreReadTest.java b/flink-table-store-core/src/test/java/org/apache/flink/table/store/file/operation/FileStoreReadTest.java
index e236d517..a3378ed1 100644
--- a/flink-table-store-core/src/test/java/org/apache/flink/table/store/file/operation/FileStoreReadTest.java
+++ b/flink-table-store-core/src/test/java/org/apache/flink/table/store/file/operation/FileStoreReadTest.java
@@ -186,7 +186,8 @@ public class FileStoreReadTest {
store.commitData(data, partitionCalculator, kv -> 0);
FileStoreScan scan = store.newScan();
Map<BinaryRowData, List<ManifestEntry>> filesGroupedByPartition =
- scan.withSnapshot(store.pathFactory().latestSnapshotId()).plan().files().stream()
+ scan.withSnapshot(store.snapshotManager().latestSnapshotId()).plan().files()
+ .stream()
.collect(Collectors.groupingBy(ManifestEntry::partition));
FileStoreRead read = store.newRead();
if (keyProjection != null) {
diff --git a/flink-table-store-core/src/test/java/org/apache/flink/table/store/file/operation/FileStoreScanTest.java b/flink-table-store-core/src/test/java/org/apache/flink/table/store/file/operation/FileStoreScanTest.java
index e2ca788c..f6245761 100644
--- a/flink-table-store-core/src/test/java/org/apache/flink/table/store/file/operation/FileStoreScanTest.java
+++ b/flink-table-store-core/src/test/java/org/apache/flink/table/store/file/operation/FileStoreScanTest.java
@@ -28,14 +28,13 @@ import org.apache.flink.table.store.file.manifest.ManifestList;
import org.apache.flink.table.store.file.mergetree.compact.DeduplicateMergeFunction;
import org.apache.flink.table.store.file.predicate.Literal;
import org.apache.flink.table.store.file.predicate.PredicateBuilder;
-import org.apache.flink.table.store.file.utils.FileStorePathFactory;
+import org.apache.flink.table.store.file.utils.SnapshotManager;
import org.apache.flink.table.types.logical.IntType;
import org.junit.jupiter.api.BeforeEach;
import org.junit.jupiter.api.Test;
import org.junit.jupiter.api.io.TempDir;
-import java.io.IOException;
import java.util.ArrayList;
import java.util.Collection;
import java.util.HashSet;
@@ -55,10 +54,10 @@ public class FileStoreScanTest {
private TestKeyValueGenerator gen;
@TempDir java.nio.file.Path tempDir;
private TestFileStore store;
- private FileStorePathFactory pathFactory;
+ private SnapshotManager snapshotManager;
@BeforeEach
- public void beforeEach() throws IOException {
+ public void beforeEach() {
gen = new TestKeyValueGenerator();
store =
TestFileStore.create(
@@ -69,7 +68,7 @@ public class FileStoreScanTest {
TestKeyValueGenerator.KEY_TYPE,
TestKeyValueGenerator.DEFAULT_ROW_TYPE,
new DeduplicateMergeFunction());
- pathFactory = store.pathFactory();
+ snapshotManager = store.snapshotManager();
}
@Test
@@ -203,8 +202,8 @@ public class FileStoreScanTest {
}
ManifestList manifestList = store.manifestListFactory().create();
- long wantedSnapshotId = random.nextLong(pathFactory.latestSnapshotId()) + 1;
- Snapshot wantedSnapshot = Snapshot.fromPath(pathFactory.toSnapshotPath(wantedSnapshotId));
+ long wantedSnapshotId = random.nextLong(snapshotManager.latestSnapshotId()) + 1;
+ Snapshot wantedSnapshot = snapshotManager.snapshot(wantedSnapshotId);
List<ManifestFileMeta> wantedManifests = wantedSnapshot.readAllManifests(manifestList);
FileStoreScan scan = store.newScan();
diff --git a/flink-table-store-core/src/test/java/org/apache/flink/table/store/file/utils/FileStorePathFactoryTest.java b/flink-table-store-core/src/test/java/org/apache/flink/table/store/file/utils/FileStorePathFactoryTest.java
index cd3a7827..154ec613 100644
--- a/flink-table-store-core/src/test/java/org/apache/flink/table/store/file/utils/FileStorePathFactoryTest.java
+++ b/flink-table-store-core/src/test/java/org/apache/flink/table/store/file/utils/FileStorePathFactoryTest.java
@@ -66,15 +66,6 @@ public class FileStorePathFactoryTest {
.isEqualTo(new Path(tempDir.toString() + "/manifest/my-manifest-list-file-name"));
}
- @Test
- public void testSnapshotPath() {
- FileStorePathFactory pathFactory = new FileStorePathFactory(new Path(tempDir.toString()));
- for (int i = 0; i < 20; i++) {
- assertThat(pathFactory.toSnapshotPath(i))
- .isEqualTo(new Path(tempDir.toString() + "/snapshot/snapshot-" + i));
- }
- }
-
@Test
public void testCreateDataFilePathFactoryNoPartition() {
FileStorePathFactory pathFactory = new FileStorePathFactory(new Path(tempDir.toString()));
diff --git a/flink-table-store-core/src/main/java/org/apache/flink/table/store/table/FileStoreTable.java b/flink-table-store-core/src/test/java/org/apache/flink/table/store/file/utils/SnapshotManagerTest.java
similarity index 52%
copy from flink-table-store-core/src/main/java/org/apache/flink/table/store/table/FileStoreTable.java
copy to flink-table-store-core/src/test/java/org/apache/flink/table/store/file/utils/SnapshotManagerTest.java
index 65645f76..97525003 100644
--- a/flink-table-store-core/src/main/java/org/apache/flink/table/store/table/FileStoreTable.java
+++ b/flink-table-store-core/src/test/java/org/apache/flink/table/store/file/utils/SnapshotManagerTest.java
@@ -16,31 +16,26 @@
* limitations under the License.
*/
-package org.apache.flink.table.store.table;
+package org.apache.flink.table.store.file.utils;
-import org.apache.flink.table.store.table.sink.TableCommit;
-import org.apache.flink.table.store.table.sink.TableWrite;
-import org.apache.flink.table.store.table.source.TableRead;
-import org.apache.flink.table.store.table.source.TableScan;
-import org.apache.flink.table.types.logical.RowType;
+import org.apache.flink.core.fs.Path;
-import java.io.Serializable;
+import org.junit.jupiter.api.Test;
+import org.junit.jupiter.api.io.TempDir;
-/**
- * An abstraction layer above {@link org.apache.flink.table.store.file.FileStore} to provide reading
- * and writing of {@link org.apache.flink.table.data.RowData}.
- */
-public interface FileStoreTable extends Serializable {
-
- String name();
-
- RowType rowType();
-
- TableScan newScan();
+import static org.assertj.core.api.Assertions.assertThat;
- TableRead newRead();
+/** Tests for {@link SnapshotManager}. */
+public class SnapshotManagerTest {
- TableWrite newWrite();
+ @TempDir java.nio.file.Path tempDir;
- TableCommit newCommit();
+ @Test
+ public void testSnapshotPath() {
+ SnapshotManager snapshotManager = new SnapshotManager(new Path(tempDir.toString()));
+ for (int i = 0; i < 20; i++) {
+ assertThat(snapshotManager.snapshotPath(i))
+ .isEqualTo(new Path(tempDir.toString() + "/snapshot/snapshot-" + i));
+ }
+ }
}