You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by lz...@apache.org on 2022/03/03 06:18:13 UTC
[flink-table-store] branch master updated: [FLINK-26247] Optimize expire by only reading new changes in a snapshot
This is an automated email from the ASF dual-hosted git repository.
lzljs3620320 pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/flink-table-store.git
The following commit(s) were added to refs/heads/master by this push:
new 57134d3 [FLINK-26247] Optimize expire by only reading new changes in a snapshot
57134d3 is described below
commit 57134d35fdd7bf3db1fd5048207fc4bacf198667
Author: tsreaper <ts...@gmail.com>
AuthorDate: Thu Mar 3 14:18:06 2022 +0800
[FLINK-26247] Optimize expire by only reading new changes in a snapshot
This closes #30
---
.../flink/table/store/file/FileStoreImpl.java | 7 +-
.../apache/flink/table/store/file/Snapshot.java | 41 +++++--
.../table/store/file/manifest/ManifestFile.java | 4 -
.../store/file/operation/FileStoreCommitImpl.java | 38 ++++--
.../store/file/operation/FileStoreExpireImpl.java | 128 +++++++++++++--------
.../store/file/operation/FileStoreScanImpl.java | 2 +-
.../flink/table/store/file/utils/FileUtils.java | 4 +
.../flink/table/store/file/TestFileStore.java | 87 +++++++++++++-
.../store/file/operation/FileStoreExpireTest.java | 6 +
.../store/file/operation/FileStoreScanTest.java | 10 +-
10 files changed, 245 insertions(+), 82 deletions(-)
diff --git a/flink-table-store-core/src/main/java/org/apache/flink/table/store/file/FileStoreImpl.java b/flink-table-store-core/src/main/java/org/apache/flink/table/store/file/FileStoreImpl.java
index 3249c35..d670ef6 100644
--- a/flink-table-store-core/src/main/java/org/apache/flink/table/store/file/FileStoreImpl.java
+++ b/flink-table-store-core/src/main/java/org/apache/flink/table/store/file/FileStoreImpl.java
@@ -78,7 +78,8 @@ public class FileStoreImpl implements FileStore {
options.path(), partitionType, options.partitionDefaultName());
}
- private ManifestFile.Factory manifestFileFactory() {
+ @VisibleForTesting
+ public ManifestFile.Factory manifestFileFactory() {
return new ManifestFile.Factory(
partitionType,
keyType,
@@ -141,8 +142,8 @@ public class FileStoreImpl implements FileStore {
options.snapshotNumRetain(),
options.snapshotTimeRetain().toMillis(),
pathFactory(),
- manifestListFactory(),
- newScan());
+ manifestFileFactory(),
+ manifestListFactory());
}
@Override
diff --git a/flink-table-store-core/src/main/java/org/apache/flink/table/store/file/Snapshot.java b/flink-table-store-core/src/main/java/org/apache/flink/table/store/file/Snapshot.java
index e30286d..6bb343b 100644
--- a/flink-table-store-core/src/main/java/org/apache/flink/table/store/file/Snapshot.java
+++ b/flink-table-store-core/src/main/java/org/apache/flink/table/store/file/Snapshot.java
@@ -19,6 +19,8 @@
package org.apache.flink.table.store.file;
import org.apache.flink.core.fs.Path;
+import org.apache.flink.table.store.file.manifest.ManifestFileMeta;
+import org.apache.flink.table.store.file.manifest.ManifestList;
import org.apache.flink.table.store.file.utils.FileUtils;
import org.apache.flink.shaded.jackson2.com.fasterxml.jackson.annotation.JsonCreator;
@@ -28,6 +30,8 @@ import org.apache.flink.shaded.jackson2.com.fasterxml.jackson.core.JsonProcessin
import org.apache.flink.shaded.jackson2.com.fasterxml.jackson.databind.ObjectMapper;
import java.io.IOException;
+import java.util.ArrayList;
+import java.util.List;
/** This file is the entrance to all data committed at some specific time point. */
public class Snapshot {
@@ -35,7 +39,8 @@ public class Snapshot {
public static final long FIRST_SNAPSHOT_ID = 1;
private static final String FIELD_ID = "id";
- private static final String FIELD_MANIFEST_LIST = "manifestList";
+ private static final String FIELD_BASE_MANIFEST_LIST = "baseManifestList";
+ private static final String FIELD_DELTA_MANIFEST_LIST = "deltaManifestList";
private static final String FIELD_COMMIT_USER = "commitUser";
private static final String FIELD_COMMIT_IDENTIFIER = "commitIdentifier";
private static final String FIELD_COMMIT_KIND = "commitKind";
@@ -44,8 +49,14 @@ public class Snapshot {
@JsonProperty(FIELD_ID)
private final long id;
- @JsonProperty(FIELD_MANIFEST_LIST)
- private final String manifestList;
+ // a manifest list recording all changes from the previous snapshots
+ @JsonProperty(FIELD_BASE_MANIFEST_LIST)
+ private final String baseManifestList;
+
+ // a manifest list recording all new changes occurred in this snapshot
+ // for faster expire and streaming reads
+ @JsonProperty(FIELD_DELTA_MANIFEST_LIST)
+ private final String deltaManifestList;
@JsonProperty(FIELD_COMMIT_USER)
private final String commitUser;
@@ -63,13 +74,15 @@ public class Snapshot {
@JsonCreator
public Snapshot(
@JsonProperty(FIELD_ID) long id,
- @JsonProperty(FIELD_MANIFEST_LIST) String manifestList,
+ @JsonProperty(FIELD_BASE_MANIFEST_LIST) String baseManifestList,
+ @JsonProperty(FIELD_DELTA_MANIFEST_LIST) String deltaManifestList,
@JsonProperty(FIELD_COMMIT_USER) String commitUser,
@JsonProperty(FIELD_COMMIT_IDENTIFIER) String commitIdentifier,
@JsonProperty(FIELD_COMMIT_KIND) CommitKind commitKind,
@JsonProperty(FIELD_TIME_MILLIS) long timeMillis) {
this.id = id;
- this.manifestList = manifestList;
+ this.baseManifestList = baseManifestList;
+ this.deltaManifestList = deltaManifestList;
this.commitUser = commitUser;
this.commitIdentifier = commitIdentifier;
this.commitKind = commitKind;
@@ -81,9 +94,14 @@ public class Snapshot {
return id;
}
- @JsonGetter(FIELD_MANIFEST_LIST)
- public String manifestList() {
- return manifestList;
+ @JsonGetter(FIELD_BASE_MANIFEST_LIST)
+ public String baseManifestList() {
+ return baseManifestList;
+ }
+
+ @JsonGetter(FIELD_DELTA_MANIFEST_LIST)
+ public String deltaManifestList() {
+ return deltaManifestList;
}
@JsonGetter(FIELD_COMMIT_USER)
@@ -114,6 +132,13 @@ public class Snapshot {
}
}
+ public List<ManifestFileMeta> readAllManifests(ManifestList manifestList) {
+ List<ManifestFileMeta> result = new ArrayList<>();
+ result.addAll(manifestList.read(baseManifestList));
+ result.addAll(manifestList.read(deltaManifestList));
+ return result;
+ }
+
public static Snapshot fromJson(String json) {
try {
return new ObjectMapper().readValue(json, Snapshot.class);
diff --git a/flink-table-store-core/src/main/java/org/apache/flink/table/store/file/manifest/ManifestFile.java b/flink-table-store-core/src/main/java/org/apache/flink/table/store/file/manifest/ManifestFile.java
index 6382817..57ec343 100644
--- a/flink-table-store-core/src/main/java/org/apache/flink/table/store/file/manifest/ManifestFile.java
+++ b/flink-table-store-core/src/main/java/org/apache/flink/table/store/file/manifest/ManifestFile.java
@@ -31,7 +31,6 @@ import org.apache.flink.table.store.file.utils.FileStorePathFactory;
import org.apache.flink.table.store.file.utils.FileUtils;
import org.apache.flink.table.store.file.utils.RollingFile;
import org.apache.flink.table.types.logical.RowType;
-import org.apache.flink.util.Preconditions;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@@ -90,9 +89,6 @@ public class ManifestFile {
* <p>NOTE: This method is atomic.
*/
public List<ManifestFileMeta> write(List<ManifestEntry> entries) {
- Preconditions.checkArgument(
- entries.size() > 0, "Manifest entries to write must not be empty.");
-
ManifestRollingFile rollingFile = new ManifestRollingFile();
List<ManifestFileMeta> result = new ArrayList<>();
List<Path> filesToCleanUp = new ArrayList<>();
diff --git a/flink-table-store-core/src/main/java/org/apache/flink/table/store/file/operation/FileStoreCommitImpl.java b/flink-table-store-core/src/main/java/org/apache/flink/table/store/file/operation/FileStoreCommitImpl.java
index 1bfdf1b..9d0212f 100644
--- a/flink-table-store-core/src/main/java/org/apache/flink/table/store/file/operation/FileStoreCommitImpl.java
+++ b/flink-table-store-core/src/main/java/org/apache/flink/table/store/file/operation/FileStoreCommitImpl.java
@@ -301,13 +301,14 @@ public class FileStoreCommitImpl implements FileStoreCommit {
}
Snapshot newSnapshot;
- String manifestListName = null;
+ String previousChangesListName = null;
+ String newChangesListName = null;
List<ManifestFileMeta> oldMetas = new ArrayList<>();
List<ManifestFileMeta> newMetas = new ArrayList<>();
try {
if (latestSnapshot != null) {
// read all previous manifest files
- oldMetas.addAll(manifestList.read(latestSnapshot.manifestList()));
+ oldMetas.addAll(latestSnapshot.readAllManifests(manifestList));
}
// merge manifest files with changes
newMetas.addAll(
@@ -316,16 +317,19 @@ public class FileStoreCommitImpl implements FileStoreCommit {
manifestFile,
manifestTargetSize.getBytes(),
manifestMergeMinCount));
+ previousChangesListName = manifestList.write(newMetas);
+
// write new changes into manifest files
- if (!changes.isEmpty()) {
- newMetas.addAll(manifestFile.write(changes));
- }
+ List<ManifestFileMeta> newChangesManifests = manifestFile.write(changes);
+ newMetas.addAll(newChangesManifests);
+ newChangesListName = manifestList.write(newChangesManifests);
+
// prepare snapshot file
- manifestListName = manifestList.write(newMetas);
newSnapshot =
new Snapshot(
newSnapshotId,
- manifestListName,
+ previousChangesListName,
+ newChangesListName,
commitUser,
identifier,
commitKind,
@@ -333,7 +337,12 @@ public class FileStoreCommitImpl implements FileStoreCommit {
FileUtils.writeFileUtf8(tmpSnapshotPath, newSnapshot.toJson());
} catch (Throwable e) {
// fails when preparing for commit, we should clean up
- cleanUpTmpSnapshot(tmpSnapshotPath, manifestListName, oldMetas, newMetas);
+ cleanUpTmpSnapshot(
+ tmpSnapshotPath,
+ previousChangesListName,
+ newChangesListName,
+ oldMetas,
+ newMetas);
throw new RuntimeException(
String.format(
"Exception occurs when preparing snapshot #%d (path %s) by user %s "
@@ -406,7 +415,8 @@ public class FileStoreCommitImpl implements FileStoreCommit {
commitUser,
identifier,
commitKind.name()));
- cleanUpTmpSnapshot(tmpSnapshotPath, manifestListName, oldMetas, newMetas);
+ cleanUpTmpSnapshot(
+ tmpSnapshotPath, previousChangesListName, newChangesListName, oldMetas, newMetas);
return false;
}
@@ -457,14 +467,18 @@ public class FileStoreCommitImpl implements FileStoreCommit {
private void cleanUpTmpSnapshot(
Path tmpSnapshotPath,
- String manifestListName,
+ String previousChangesListName,
+ String newChangesListName,
List<ManifestFileMeta> oldMetas,
List<ManifestFileMeta> newMetas) {
// clean up tmp snapshot file
FileUtils.deleteOrWarn(tmpSnapshotPath);
// clean up newly created manifest list
- if (manifestListName != null) {
- manifestList.delete(manifestListName);
+ if (previousChangesListName != null) {
+ manifestList.delete(previousChangesListName);
+ }
+ if (newChangesListName != null) {
+ manifestList.delete(newChangesListName);
}
// clean up newly merged manifest files
Set<ManifestFileMeta> oldMetaSet = new HashSet<>(oldMetas); // for faster searching
diff --git a/flink-table-store-core/src/main/java/org/apache/flink/table/store/file/operation/FileStoreExpireImpl.java b/flink-table-store-core/src/main/java/org/apache/flink/table/store/file/operation/FileStoreExpireImpl.java
index e7a03d9..e6dd7e3 100644
--- a/flink-table-store-core/src/main/java/org/apache/flink/table/store/file/operation/FileStoreExpireImpl.java
+++ b/flink-table-store-core/src/main/java/org/apache/flink/table/store/file/operation/FileStoreExpireImpl.java
@@ -21,14 +21,19 @@ package org.apache.flink.table.store.file.operation;
import org.apache.flink.core.fs.Path;
import org.apache.flink.table.store.file.Snapshot;
import org.apache.flink.table.store.file.manifest.ManifestEntry;
+import org.apache.flink.table.store.file.manifest.ManifestFile;
import org.apache.flink.table.store.file.manifest.ManifestFileMeta;
import org.apache.flink.table.store.file.manifest.ManifestList;
import org.apache.flink.table.store.file.mergetree.sst.SstPathFactory;
import org.apache.flink.table.store.file.utils.FileStorePathFactory;
import org.apache.flink.table.store.file.utils.FileUtils;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
import java.io.IOException;
import java.util.HashSet;
+import java.util.List;
import java.util.Set;
/**
@@ -40,25 +45,27 @@ import java.util.Set;
*/
public class FileStoreExpireImpl implements FileStoreExpire {
+ private static final Logger LOG = LoggerFactory.getLogger(FileStoreExpireImpl.class);
+
// snapshots exceeding any constraint will be expired
private final int numRetained;
private final long millisRetained;
private final FileStorePathFactory pathFactory;
+ private final ManifestFile manifestFile;
private final ManifestList manifestList;
- private final FileStoreScan scan;
public FileStoreExpireImpl(
int numRetained,
long millisRetained,
FileStorePathFactory pathFactory,
- ManifestList.Factory manifestListFactory,
- FileStoreScan scan) {
+ ManifestFile.Factory manifestFileFactory,
+ ManifestList.Factory manifestListFactory) {
this.numRetained = numRetained;
this.millisRetained = millisRetained;
this.pathFactory = pathFactory;
+ this.manifestFile = manifestFileFactory.create();
this.manifestList = manifestListFactory.create();
- this.scan = scan;
}
@Override
@@ -95,72 +102,99 @@ public class FileStoreExpireImpl implements FileStoreExpire {
expireUntil(latestSnapshotId);
}
- private void expireUntil(long exclusiveId) {
- if (exclusiveId <= Snapshot.FIRST_SNAPSHOT_ID) {
+ private void expireUntil(long endExclusiveId) {
+ if (endExclusiveId <= Snapshot.FIRST_SNAPSHOT_ID) {
// fast exit
return;
}
- Snapshot exclusiveSnapshot = Snapshot.fromPath(pathFactory.toSnapshotPath(exclusiveId));
-
- // if sst file is only used in snapshots to expire but not in next snapshot we can delete it
- // because each sst file will only be added and deleted once
- Set<Path> sstInUse = new HashSet<>();
- FileStorePathFactory.SstPathFactoryCache sstPathFactoryCache =
- new FileStorePathFactory.SstPathFactoryCache(pathFactory);
- for (ManifestEntry entry : scan.withSnapshot(exclusiveId).plan().files()) {
- SstPathFactory sstPathFactory =
- sstPathFactoryCache.getSstPathFactory(entry.partition(), entry.bucket());
- sstInUse.add(sstPathFactory.toPath(entry.file().fileName()));
- }
-
- // the same with sst, manifests are only added and deleted once
- Set<ManifestFileMeta> manifestsInUse =
- new HashSet<>(manifestList.read(exclusiveSnapshot.manifestList()));
-
- Set<Path> sstToDelete = new HashSet<>();
- Set<String> manifestsToDelete = new HashSet<>();
-
- for (long id = exclusiveId - 1; id >= Snapshot.FIRST_SNAPSHOT_ID; id--) {
+ // find first snapshot to expire
+ long beginInclusiveId = Snapshot.FIRST_SNAPSHOT_ID;
+ for (long id = endExclusiveId - 1; id >= Snapshot.FIRST_SNAPSHOT_ID; id--) {
Path snapshotPath = pathFactory.toSnapshotPath(id);
try {
if (!snapshotPath.getFileSystem().exists(snapshotPath)) {
// only latest snapshots are retained, as we cannot find this snapshot, we can
// assume that all snapshots preceding it have been removed
+ beginInclusiveId = id + 1;
break;
}
+ } catch (IOException e) {
+ throw new RuntimeException(
+ "Failed to determine if snapshot #" + id + " still exists", e);
+ }
+ }
+ if (LOG.isDebugEnabled()) {
+ LOG.debug(
+ "Snapshot expire range is [" + beginInclusiveId + ", " + endExclusiveId + ")");
+ }
- Snapshot toExpire = Snapshot.fromPath(pathFactory.toSnapshotPath(id));
+ // delete sst files
+ FileStorePathFactory.SstPathFactoryCache sstPathFactoryCache =
+ new FileStorePathFactory.SstPathFactoryCache(pathFactory);
+ // deleted sst files in a snapshot are not used by that snapshot, so the range of id should
+ // be (beginInclusiveId, endExclusiveId]
+ for (long id = beginInclusiveId + 1; id <= endExclusiveId; id++) {
+ if (LOG.isDebugEnabled()) {
+ LOG.debug("Ready to delete sst files in snapshot #" + id);
+ }
+
+ Snapshot toExpire = Snapshot.fromPath(pathFactory.toSnapshotPath(id));
+ List<ManifestFileMeta> deltaManifests = manifestList.read(toExpire.deltaManifestList());
- for (ManifestEntry entry : scan.withSnapshot(toExpire.id()).plan().files()) {
+ // we cannot delete an sst file directly when we meet a DELETE entry, because that
+ // file might be upgraded
+ Set<Path> sstToDelete = new HashSet<>();
+ for (ManifestFileMeta meta : deltaManifests) {
+ for (ManifestEntry entry : manifestFile.read(meta.fileName())) {
SstPathFactory sstPathFactory =
sstPathFactoryCache.getSstPathFactory(
entry.partition(), entry.bucket());
Path sstPath = sstPathFactory.toPath(entry.file().fileName());
- if (!sstInUse.contains(sstPath)) {
- sstToDelete.add(sstPath);
+ switch (entry.kind()) {
+ case ADD:
+ sstToDelete.remove(sstPath);
+ break;
+ case DELETE:
+ sstToDelete.add(sstPath);
+ break;
+ default:
+ throw new UnsupportedOperationException(
+ "Unknown value kind " + entry.kind().name());
}
}
+ }
+ for (Path sst : sstToDelete) {
+ FileUtils.deleteOrWarn(sst);
+ }
+ }
- for (ManifestFileMeta manifest : manifestList.read(toExpire.manifestList())) {
- if (!manifestsInUse.contains(manifest)) {
- manifestsToDelete.add(manifest.fileName());
- }
- }
+ // delete manifests
+ Snapshot exclusiveSnapshot = Snapshot.fromPath(pathFactory.toSnapshotPath(endExclusiveId));
+ Set<ManifestFileMeta> manifestsInUse =
+ new HashSet<>(exclusiveSnapshot.readAllManifests(manifestList));
+ // to avoid deleting twice
+ Set<ManifestFileMeta> deletedManifests = new HashSet<>();
+ for (long id = beginInclusiveId; id < endExclusiveId; id++) {
+ if (LOG.isDebugEnabled()) {
+ LOG.debug("Ready to delete manifests in snapshot #" + id);
+ }
- manifestList.delete(toExpire.manifestList());
- FileUtils.deleteOrWarn(pathFactory.toSnapshotPath(id));
- } catch (IOException e) {
- throw new RuntimeException(
- "Failed to determine if snapshot #" + id + " still exists", e);
+ Snapshot toExpire = Snapshot.fromPath(pathFactory.toSnapshotPath(id));
+
+ for (ManifestFileMeta manifest : toExpire.readAllManifests(manifestList)) {
+ if (!manifestsInUse.contains(manifest) && !deletedManifests.contains(manifest)) {
+ manifestFile.delete(manifest.fileName());
+ deletedManifests.add(manifest);
+ }
}
- }
- for (Path sst : sstToDelete) {
- FileUtils.deleteOrWarn(sst);
- }
- for (String manifestName : manifestsToDelete) {
- FileUtils.deleteOrWarn(pathFactory.toManifestFilePath(manifestName));
+ // delete manifest lists
+ manifestList.delete(toExpire.baseManifestList());
+ manifestList.delete(toExpire.deltaManifestList());
+
+ // delete snapshot
+ FileUtils.deleteOrWarn(pathFactory.toSnapshotPath(id));
}
}
}
diff --git a/flink-table-store-core/src/main/java/org/apache/flink/table/store/file/operation/FileStoreScanImpl.java b/flink-table-store-core/src/main/java/org/apache/flink/table/store/file/operation/FileStoreScanImpl.java
index 580d5af..d99a792 100644
--- a/flink-table-store-core/src/main/java/org/apache/flink/table/store/file/operation/FileStoreScanImpl.java
+++ b/flink-table-store-core/src/main/java/org/apache/flink/table/store/file/operation/FileStoreScanImpl.java
@@ -155,7 +155,7 @@ public class FileStoreScanImpl implements FileStoreScan {
manifests = Collections.emptyList();
} else {
Snapshot snapshot = Snapshot.fromPath(pathFactory.toSnapshotPath(snapshotId));
- manifests = manifestList.read(snapshot.manifestList());
+ manifests = snapshot.readAllManifests(manifestList);
}
}
diff --git a/flink-table-store-core/src/main/java/org/apache/flink/table/store/file/utils/FileUtils.java b/flink-table-store-core/src/main/java/org/apache/flink/table/store/file/utils/FileUtils.java
index 50cce1d..e797a28 100644
--- a/flink-table-store-core/src/main/java/org/apache/flink/table/store/file/utils/FileUtils.java
+++ b/flink-table-store-core/src/main/java/org/apache/flink/table/store/file/utils/FileUtils.java
@@ -110,6 +110,10 @@ public class FileUtils {
}
public static void deleteOrWarn(Path file) {
+ if (LOG.isDebugEnabled()) {
+ LOG.debug("Ready to delete " + file.toString());
+ }
+
try {
FileSystem fs = file.getFileSystem();
if (!fs.delete(file, false) && fs.exists(file)) {
diff --git a/flink-table-store-core/src/test/java/org/apache/flink/table/store/file/TestFileStore.java b/flink-table-store-core/src/test/java/org/apache/flink/table/store/file/TestFileStore.java
index 38b4057..0057435 100644
--- a/flink-table-store-core/src/test/java/org/apache/flink/table/store/file/TestFileStore.java
+++ b/flink-table-store-core/src/test/java/org/apache/flink/table/store/file/TestFileStore.java
@@ -20,10 +20,13 @@ package org.apache.flink.table.store.file;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.configuration.MemorySize;
+import org.apache.flink.core.fs.Path;
import org.apache.flink.table.data.binary.BinaryRowData;
import org.apache.flink.table.runtime.typeutils.RowDataSerializer;
import org.apache.flink.table.store.file.manifest.ManifestCommittable;
import org.apache.flink.table.store.file.manifest.ManifestEntry;
+import org.apache.flink.table.store.file.manifest.ManifestFileMeta;
+import org.apache.flink.table.store.file.manifest.ManifestList;
import org.apache.flink.table.store.file.mergetree.Increment;
import org.apache.flink.table.store.file.mergetree.MergeTreeOptions;
import org.apache.flink.table.store.file.mergetree.compact.Accumulator;
@@ -31,6 +34,7 @@ import org.apache.flink.table.store.file.mergetree.sst.SstFileMeta;
import org.apache.flink.table.store.file.operation.FileStoreCommit;
import org.apache.flink.table.store.file.operation.FileStoreExpireImpl;
import org.apache.flink.table.store.file.operation.FileStoreRead;
+import org.apache.flink.table.store.file.operation.FileStoreScan;
import org.apache.flink.table.store.file.operation.FileStoreWrite;
import org.apache.flink.table.store.file.utils.FileStorePathFactory;
import org.apache.flink.table.store.file.utils.RecordReaderIterator;
@@ -42,24 +46,32 @@ import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import java.io.IOException;
+import java.nio.file.Files;
+import java.nio.file.Paths;
import java.util.ArrayList;
import java.util.Collections;
import java.util.HashMap;
+import java.util.HashSet;
import java.util.List;
import java.util.Map;
import java.util.Random;
+import java.util.Set;
import java.util.UUID;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.ThreadLocalRandom;
import java.util.function.BiConsumer;
import java.util.function.Function;
+import java.util.stream.Collectors;
+
+import static org.assertj.core.api.Assertions.assertThat;
/** {@link FileStore} for tests. */
public class TestFileStore extends FileStoreImpl {
private static final Logger LOG = LoggerFactory.getLogger(TestFileStore.class);
+ private final String root;
private final RowDataSerializer keySerializer;
private final RowDataSerializer valueSerializer;
@@ -96,13 +108,18 @@ public class TestFileStore extends FileStoreImpl {
RowType valueType,
Accumulator accumulator) {
super(conf, UUID.randomUUID().toString(), partitionType, keyType, valueType, accumulator);
+ this.root = conf.getString(FileStoreOptions.FILE_PATH);
this.keySerializer = new RowDataSerializer(keyType);
this.valueSerializer = new RowDataSerializer(valueType);
}
public FileStoreExpireImpl newExpire(int numRetained, long millisRetained) {
return new FileStoreExpireImpl(
- numRetained, millisRetained, pathFactory(), manifestListFactory(), newScan());
+ numRetained,
+ millisRetained,
+ pathFactory(),
+ manifestFileFactory(),
+ manifestListFactory());
}
public List<Snapshot> commitData(
@@ -264,4 +281,72 @@ public class TestFileStore extends FileStoreImpl {
}
return result;
}
+
+ public void assertCleaned() {
+ Set<Path> filesInUse = getFilesInUse();
+ Set<Path> actualFiles;
+ try {
+ actualFiles =
+ Files.walk(Paths.get(root))
+ .filter(p -> Files.isRegularFile(p))
+ .map(p -> new Path(p.toString()))
+ .collect(Collectors.toSet());
+ } catch (IOException e) {
+ throw new RuntimeException(e);
+ }
+ assertThat(actualFiles).isEqualTo(filesInUse);
+ }
+
+ private Set<Path> getFilesInUse() {
+ FileStorePathFactory pathFactory = pathFactory();
+ ManifestList manifestList = manifestListFactory().create();
+ FileStoreScan scan = newScan();
+ FileStorePathFactory.SstPathFactoryCache sstPathFactoryCache =
+ new FileStorePathFactory.SstPathFactoryCache(pathFactory);
+
+ Long latestSnapshotId = pathFactory.latestSnapshotId();
+ if (latestSnapshotId == null) {
+ return Collections.emptySet();
+ }
+
+ long firstInUseSnapshotId = Snapshot.FIRST_SNAPSHOT_ID;
+ for (long id = latestSnapshotId - 1; id >= Snapshot.FIRST_SNAPSHOT_ID; id--) {
+ Path snapshotPath = pathFactory.toSnapshotPath(id);
+ try {
+ if (!snapshotPath.getFileSystem().exists(snapshotPath)) {
+ firstInUseSnapshotId = id + 1;
+ break;
+ }
+ } catch (IOException e) {
+ throw new RuntimeException(e);
+ }
+ }
+
+ Set<Path> result = new HashSet<>();
+ for (long id = firstInUseSnapshotId; id <= latestSnapshotId; id++) {
+ Path snapshotPath = pathFactory.toSnapshotPath(id);
+ Snapshot snapshot = Snapshot.fromPath(snapshotPath);
+
+ // snapshot file
+ result.add(snapshotPath);
+
+ // manifest lists
+ result.add(pathFactory.toManifestListPath(snapshot.baseManifestList()));
+ result.add(pathFactory.toManifestListPath(snapshot.deltaManifestList()));
+
+ // manifests
+ List<ManifestFileMeta> manifests = snapshot.readAllManifests(manifestList);
+ manifests.forEach(m -> result.add(pathFactory.toManifestFilePath(m.fileName())));
+
+ // sst
+ List<ManifestEntry> entries = scan.withManifestList(manifests).plan().files();
+ for (ManifestEntry entry : entries) {
+ result.add(
+ sstPathFactoryCache
+ .getSstPathFactory(entry.partition(), entry.bucket())
+ .toPath(entry.file().fileName()));
+ }
+ }
+ return result;
+ }
}
diff --git a/flink-table-store-core/src/test/java/org/apache/flink/table/store/file/operation/FileStoreExpireTest.java b/flink-table-store-core/src/test/java/org/apache/flink/table/store/file/operation/FileStoreExpireTest.java
index e007ca0..bb12a2f 100644
--- a/flink-table-store-core/src/test/java/org/apache/flink/table/store/file/operation/FileStoreExpireTest.java
+++ b/flink-table-store-core/src/test/java/org/apache/flink/table/store/file/operation/FileStoreExpireTest.java
@@ -28,6 +28,7 @@ import org.apache.flink.table.store.file.TestKeyValueGenerator;
import org.apache.flink.table.store.file.mergetree.compact.DeduplicateAccumulator;
import org.apache.flink.table.store.file.utils.FileStorePathFactory;
+import org.junit.jupiter.api.AfterEach;
import org.junit.jupiter.api.BeforeEach;
import org.junit.jupiter.api.Test;
import org.junit.jupiter.api.io.TempDir;
@@ -65,6 +66,11 @@ public class FileStoreExpireTest {
pathFactory = store.pathFactory();
}
+ @AfterEach
+ public void afterEach() {
+ store.assertCleaned();
+ }
+
@Test
public void testNoSnapshot() {
FileStoreExpire expire = store.newExpire(3, Long.MAX_VALUE);
diff --git a/flink-table-store-core/src/test/java/org/apache/flink/table/store/file/operation/FileStoreScanTest.java b/flink-table-store-core/src/test/java/org/apache/flink/table/store/file/operation/FileStoreScanTest.java
index bde976a..1c5d1b6 100644
--- a/flink-table-store-core/src/test/java/org/apache/flink/table/store/file/operation/FileStoreScanTest.java
+++ b/flink-table-store-core/src/test/java/org/apache/flink/table/store/file/operation/FileStoreScanTest.java
@@ -161,16 +161,14 @@ public class FileStoreScanTest {
}
ManifestList manifestList = store.manifestListFactory().create();
- long wantedSnapshot = random.nextLong(pathFactory.latestSnapshotId()) + 1;
- List<ManifestFileMeta> wantedManifests =
- manifestList.read(
- Snapshot.fromPath(pathFactory.toSnapshotPath(wantedSnapshot))
- .manifestList());
+ long wantedSnapshotId = random.nextLong(pathFactory.latestSnapshotId()) + 1;
+ Snapshot wantedSnapshot = Snapshot.fromPath(pathFactory.toSnapshotPath(wantedSnapshotId));
+ List<ManifestFileMeta> wantedManifests = wantedSnapshot.readAllManifests(manifestList);
FileStoreScan scan = store.newScan();
scan.withManifestList(wantedManifests);
- List<KeyValue> expectedKvs = store.readKvsFromSnapshot(wantedSnapshot);
+ List<KeyValue> expectedKvs = store.readKvsFromSnapshot(wantedSnapshotId);
gen.sort(expectedKvs);
Map<BinaryRowData, BinaryRowData> expected = store.toKvMap(expectedKvs);
runTest(scan, null, expected);