You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by cz...@apache.org on 2023/01/10 10:39:12 UTC
[flink-table-store] branch release-0.3 updated: [FLINK-30611] Expiration can be performed with missing files
This is an automated email from the ASF dual-hosted git repository.
czweng pushed a commit to branch release-0.3
in repository https://gitbox.apache.org/repos/asf/flink-table-store.git
The following commit(s) were added to refs/heads/release-0.3 by this push:
new 1f0f28c3 [FLINK-30611] Expiration can be performed with missing files
1f0f28c3 is described below
commit 1f0f28c3cf9c680ba9294362f3a4ba128236f8ab
Author: tsreaper <ts...@gmail.com>
AuthorDate: Tue Jan 10 18:38:05 2023 +0800
[FLINK-30611] Expiration can be performed with missing files
This closes #474.
(cherry picked from commit 929a411f29f4fc76dfead6e716daae56c165724c)
---
.../table/store/file/manifest/ManifestFile.java | 23 ----
.../store/file/operation/FileStoreExpireImpl.java | 48 ++++++++-
.../flink/table/store/file/TestFileStore.java | 64 ++++++-----
...reTest.java => CleanedFileStoreExpireTest.java} | 90 +---------------
.../file/operation/FileStoreExpireTestBase.java | 119 +++++++++++++++++++++
.../operation/UncleanedFileStoreExpireTest.java | 84 +++++++++++++++
6 files changed, 288 insertions(+), 140 deletions(-)
diff --git a/flink-table-store-core/src/main/java/org/apache/flink/table/store/file/manifest/ManifestFile.java b/flink-table-store-core/src/main/java/org/apache/flink/table/store/file/manifest/ManifestFile.java
index c6dbab0b..77b26de2 100644
--- a/flink-table-store-core/src/main/java/org/apache/flink/table/store/file/manifest/ManifestFile.java
+++ b/flink-table-store-core/src/main/java/org/apache/flink/table/store/file/manifest/ManifestFile.java
@@ -35,13 +35,8 @@ import org.apache.flink.table.store.format.FieldStatsCollector;
import org.apache.flink.table.store.format.FileFormat;
import org.apache.flink.table.types.logical.RowType;
-import org.apache.flink.shaded.guava30.com.google.common.collect.Iterables;
-
import java.io.IOException;
-import java.util.Iterator;
-import java.util.LinkedList;
import java.util.List;
-import java.util.Queue;
/**
* This file includes several {@link ManifestEntry}s, representing the additional changes since last
@@ -91,24 +86,6 @@ public class ManifestFile {
}
}
- public Iterable<ManifestEntry> readManifestFiles(List<String> manifestFiles) {
- Queue<String> files = new LinkedList<>(manifestFiles);
- return Iterables.concat(
- (Iterable<Iterable<ManifestEntry>>)
- () ->
- new Iterator<Iterable<ManifestEntry>>() {
- @Override
- public boolean hasNext() {
- return files.size() > 0;
- }
-
- @Override
- public Iterable<ManifestEntry> next() {
- return read(files.poll());
- }
- });
- }
-
/**
* Write several {@link ManifestEntry}s into manifest files.
*
diff --git a/flink-table-store-core/src/main/java/org/apache/flink/table/store/file/operation/FileStoreExpireImpl.java b/flink-table-store-core/src/main/java/org/apache/flink/table/store/file/operation/FileStoreExpireImpl.java
index 5f85cb8a..fd7012f9 100644
--- a/flink-table-store-core/src/main/java/org/apache/flink/table/store/file/operation/FileStoreExpireImpl.java
+++ b/flink-table-store-core/src/main/java/org/apache/flink/table/store/file/operation/FileStoreExpireImpl.java
@@ -29,14 +29,20 @@ import org.apache.flink.table.store.file.utils.FileStorePathFactory;
import org.apache.flink.table.store.file.utils.FileUtils;
import org.apache.flink.table.store.file.utils.SnapshotManager;
+import org.apache.flink.shaded.guava30.com.google.common.collect.Iterables;
+
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import java.util.ArrayList;
+import java.util.Collections;
import java.util.HashMap;
import java.util.HashSet;
+import java.util.Iterator;
+import java.util.LinkedList;
import java.util.List;
import java.util.Map;
+import java.util.Queue;
import java.util.Set;
import java.util.concurrent.Callable;
import java.util.stream.Collectors;
@@ -184,9 +190,14 @@ public class FileStoreExpireImpl implements FileStoreExpire {
}
Snapshot toExpire = snapshotManager.snapshot(id);
+ // cannot call `toExpire.readAllDataManifests` directly, it is possible that a job is
+ // killed during expiration, so some manifest files may have been deleted
+ List<ManifestFileMeta> toExpireManifests = new ArrayList<>();
+ toExpireManifests.addAll(tryReadManifestList(toExpire.baseManifestList()));
+ toExpireManifests.addAll(tryReadManifestList(toExpire.deltaManifestList()));
// delete manifest
- for (ManifestFileMeta manifest : toExpire.readAllDataManifests(manifestList)) {
+ for (ManifestFileMeta manifest : toExpireManifests) {
if (!manifestsInUse.contains(manifest) && !deletedManifests.contains(manifest)) {
manifestFile.delete(manifest.fileName());
deletedManifests.add(manifest);
@@ -194,7 +205,7 @@ public class FileStoreExpireImpl implements FileStoreExpire {
}
if (toExpire.changelogManifestList() != null) {
for (ManifestFileMeta manifest :
- manifestList.read(toExpire.changelogManifestList())) {
+ tryReadManifestList(toExpire.changelogManifestList())) {
manifestFile.delete(manifest.fileName());
}
}
@@ -260,10 +271,39 @@ public class FileStoreExpireImpl implements FileStoreExpire {
private Iterable<ManifestEntry> getManifestEntriesFromManifestList(String manifestListName) {
List<String> manifestFiles =
- manifestList.read(manifestListName).stream()
+ tryReadManifestList(manifestListName).stream()
.map(ManifestFileMeta::fileName)
.collect(Collectors.toList());
- return manifestFile.readManifestFiles(manifestFiles);
+ Queue<String> files = new LinkedList<>(manifestFiles);
+ return Iterables.concat(
+ (Iterable<Iterable<ManifestEntry>>)
+ () ->
+ new Iterator<Iterable<ManifestEntry>>() {
+ @Override
+ public boolean hasNext() {
+ return files.size() > 0;
+ }
+
+ @Override
+ public Iterable<ManifestEntry> next() {
+ String file = files.poll();
+ try {
+ return manifestFile.read(file);
+ } catch (Exception e) {
+ LOG.warn("Failed to read manifest file " + file, e);
+ return Collections.emptyList();
+ }
+ }
+ });
+ }
+
+ private List<ManifestFileMeta> tryReadManifestList(String manifestListName) {
+ try {
+ return manifestList.read(manifestListName);
+ } catch (Exception e) {
+ LOG.warn("Failed to read manifest list file " + manifestListName, e);
+ return Collections.emptyList();
+ }
}
private void writeEarliestHint(long earliest) {
diff --git a/flink-table-store-core/src/test/java/org/apache/flink/table/store/file/TestFileStore.java b/flink-table-store-core/src/test/java/org/apache/flink/table/store/file/TestFileStore.java
index 376befa8..0c61503f 100644
--- a/flink-table-store-core/src/test/java/org/apache/flink/table/store/file/TestFileStore.java
+++ b/flink-table-store-core/src/test/java/org/apache/flink/table/store/file/TestFileStore.java
@@ -404,9 +404,6 @@ public class TestFileStore extends KeyValueFileStore {
private Set<Path> getFilesInUse() {
Set<Path> result = new HashSet<>();
- FileStorePathFactory pathFactory = pathFactory();
- ManifestList manifestList = manifestListFactory().create();
- FileStoreScan scan = newScan();
SchemaManager schemaManager = new SchemaManager(options.path());
schemaManager.listAllIds().forEach(id -> result.add(schemaManager.toSchemaPath(id)));
@@ -427,35 +424,48 @@ public class TestFileStore extends KeyValueFileStore {
}
for (long id = firstInUseSnapshotId; id <= latestSnapshotId; id++) {
- Path snapshotPath = snapshotManager.snapshotPath(id);
- Snapshot snapshot = Snapshot.fromPath(snapshotPath);
+ result.addAll(getFilesInUse(id));
+ }
+ return result;
+ }
- // snapshot file
- result.add(snapshotPath);
+ public Set<Path> getFilesInUse(long snapshotId) {
+ Set<Path> result = new HashSet<>();
- // manifest lists
- result.add(pathFactory.toManifestListPath(snapshot.baseManifestList()));
- result.add(pathFactory.toManifestListPath(snapshot.deltaManifestList()));
- if (snapshot.changelogManifestList() != null) {
- result.add(pathFactory.toManifestListPath(snapshot.changelogManifestList()));
- }
+ SnapshotManager snapshotManager = snapshotManager();
+ FileStorePathFactory pathFactory = pathFactory();
+ ManifestList manifestList = manifestListFactory().create();
+ FileStoreScan scan = newScan();
- // manifests
- List<ManifestFileMeta> manifests = snapshot.readAllDataManifests(manifestList);
- if (snapshot.changelogManifestList() != null) {
- manifests.addAll(manifestList.read(snapshot.changelogManifestList()));
- }
- manifests.forEach(m -> result.add(pathFactory.toManifestFilePath(m.fileName())));
+ Path snapshotPath = snapshotManager.snapshotPath(snapshotId);
+ Snapshot snapshot = Snapshot.fromPath(snapshotPath);
- // data file
- List<ManifestEntry> entries = scan.withManifestList(manifests).plan().files();
- for (ManifestEntry entry : entries) {
- result.add(
- new Path(
- pathFactory.bucketPath(entry.partition(), entry.bucket()),
- entry.file().fileName()));
- }
+ // snapshot file
+ result.add(snapshotPath);
+
+ // manifest lists
+ result.add(pathFactory.toManifestListPath(snapshot.baseManifestList()));
+ result.add(pathFactory.toManifestListPath(snapshot.deltaManifestList()));
+ if (snapshot.changelogManifestList() != null) {
+ result.add(pathFactory.toManifestListPath(snapshot.changelogManifestList()));
}
+
+ // manifests
+ List<ManifestFileMeta> manifests = snapshot.readAllDataManifests(manifestList);
+ if (snapshot.changelogManifestList() != null) {
+ manifests.addAll(manifestList.read(snapshot.changelogManifestList()));
+ }
+ manifests.forEach(m -> result.add(pathFactory.toManifestFilePath(m.fileName())));
+
+ // data file
+ List<ManifestEntry> entries = scan.withManifestList(manifests).plan().files();
+ for (ManifestEntry entry : entries) {
+ result.add(
+ new Path(
+ pathFactory.bucketPath(entry.partition(), entry.bucket()),
+ entry.file().fileName()));
+ }
+
return result;
}
diff --git a/flink-table-store-core/src/test/java/org/apache/flink/table/store/file/operation/FileStoreExpireTest.java b/flink-table-store-core/src/test/java/org/apache/flink/table/store/file/operation/CleanedFileStoreExpireTest.java
similarity index 68%
rename from flink-table-store-core/src/test/java/org/apache/flink/table/store/file/operation/FileStoreExpireTest.java
rename to flink-table-store-core/src/test/java/org/apache/flink/table/store/file/operation/CleanedFileStoreExpireTest.java
index 8c4da4c2..a03717d9 100644
--- a/flink-table-store-core/src/test/java/org/apache/flink/table/store/file/operation/FileStoreExpireTest.java
+++ b/flink-table-store-core/src/test/java/org/apache/flink/table/store/file/operation/CleanedFileStoreExpireTest.java
@@ -21,82 +21,29 @@ package org.apache.flink.table.store.file.operation;
import org.apache.flink.core.fs.FileSystem;
import org.apache.flink.core.fs.Path;
import org.apache.flink.table.data.binary.BinaryRowData;
-import org.apache.flink.table.store.CoreOptions;
import org.apache.flink.table.store.file.KeyValue;
-import org.apache.flink.table.store.file.Snapshot;
-import org.apache.flink.table.store.file.TestFileStore;
-import org.apache.flink.table.store.file.TestKeyValueGenerator;
import org.apache.flink.table.store.file.io.DataFileMeta;
import org.apache.flink.table.store.file.manifest.FileKind;
import org.apache.flink.table.store.file.manifest.ManifestEntry;
-import org.apache.flink.table.store.file.mergetree.compact.DeduplicateMergeFunction;
-import org.apache.flink.table.store.file.schema.SchemaManager;
-import org.apache.flink.table.store.file.schema.UpdateSchema;
import org.apache.flink.table.store.file.utils.FileUtils;
import org.apache.flink.table.store.file.utils.SnapshotManager;
import org.junit.jupiter.api.AfterEach;
-import org.junit.jupiter.api.BeforeEach;
import org.junit.jupiter.api.Test;
-import org.junit.jupiter.api.io.TempDir;
import java.io.IOException;
import java.util.ArrayList;
import java.util.Arrays;
-import java.util.Collections;
import java.util.List;
-import java.util.Map;
import java.util.concurrent.ThreadLocalRandom;
import static org.apache.flink.table.data.binary.BinaryRowDataUtil.EMPTY_ROW;
import static org.assertj.core.api.Assertions.assertThat;
-/** Tests for {@link FileStoreExpireImpl}. */
-public class FileStoreExpireTest {
-
- private TestKeyValueGenerator gen;
- @TempDir java.nio.file.Path tempDir;
- private TestFileStore store;
- private SnapshotManager snapshotManager;
-
- @BeforeEach
- public void beforeEach() throws Exception {
- gen = new TestKeyValueGenerator();
- store = createStore();
- snapshotManager = store.snapshotManager();
- SchemaManager schemaManager = new SchemaManager(new Path(tempDir.toUri()));
- schemaManager.commitNewVersion(
- new UpdateSchema(
- TestKeyValueGenerator.DEFAULT_ROW_TYPE,
- TestKeyValueGenerator.DEFAULT_PART_TYPE.getFieldNames(),
- TestKeyValueGenerator.getPrimaryKeys(
- TestKeyValueGenerator.GeneratorMode.MULTI_PARTITIONED),
- Collections.emptyMap(),
- null));
- }
-
- private TestFileStore createStore() {
- ThreadLocalRandom random = ThreadLocalRandom.current();
-
- CoreOptions.ChangelogProducer changelogProducer;
- if (random.nextBoolean()) {
- changelogProducer = CoreOptions.ChangelogProducer.INPUT;
- } else {
- changelogProducer = CoreOptions.ChangelogProducer.NONE;
- }
-
- return new TestFileStore.Builder(
- "avro",
- tempDir.toString(),
- 1,
- TestKeyValueGenerator.DEFAULT_PART_TYPE,
- TestKeyValueGenerator.KEY_TYPE,
- TestKeyValueGenerator.DEFAULT_ROW_TYPE,
- TestKeyValueGenerator.TestKeyValueFieldsExtractor.EXTRACTOR,
- DeduplicateMergeFunction.factory())
- .changelogProducer(changelogProducer)
- .build();
- }
+/**
+ * Tests for {@link FileStoreExpireImpl}. After expiration, only useful files should be retained.
+ */
+public class CleanedFileStoreExpireTest extends FileStoreExpireTestBase {
@AfterEach
public void afterEach() throws IOException {
@@ -265,33 +212,4 @@ public class FileStoreExpireTest {
}
}
}
-
- private void commit(int numCommits, List<KeyValue> allData, List<Integer> snapshotPositions)
- throws Exception {
- for (int i = 0; i < numCommits; i++) {
- int numRecords = ThreadLocalRandom.current().nextInt(100) + 1;
- List<KeyValue> data = new ArrayList<>();
- for (int j = 0; j < numRecords; j++) {
- data.add(gen.next());
- }
- allData.addAll(data);
- List<Snapshot> snapshots = store.commitData(data, gen::getPartition, kv -> 0);
- for (int j = 0; j < snapshots.size(); j++) {
- snapshotPositions.add(allData.size());
- }
- }
- }
-
- private void assertSnapshot(
- int snapshotId, List<KeyValue> allData, List<Integer> snapshotPositions)
- throws Exception {
- Map<BinaryRowData, BinaryRowData> expected =
- store.toKvMap(allData.subList(0, snapshotPositions.get(snapshotId - 1)));
- List<KeyValue> actualKvs =
- store.readKvsFromManifestEntries(
- store.newScan().withSnapshot(snapshotId).plan().files(), false);
- gen.sort(actualKvs);
- Map<BinaryRowData, BinaryRowData> actual = store.toKvMap(actualKvs);
- assertThat(actual).isEqualTo(expected);
- }
}
diff --git a/flink-table-store-core/src/test/java/org/apache/flink/table/store/file/operation/FileStoreExpireTestBase.java b/flink-table-store-core/src/test/java/org/apache/flink/table/store/file/operation/FileStoreExpireTestBase.java
new file mode 100644
index 00000000..3cddfb60
--- /dev/null
+++ b/flink-table-store-core/src/test/java/org/apache/flink/table/store/file/operation/FileStoreExpireTestBase.java
@@ -0,0 +1,119 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.store.file.operation;
+
+import org.apache.flink.core.fs.Path;
+import org.apache.flink.table.data.binary.BinaryRowData;
+import org.apache.flink.table.store.CoreOptions;
+import org.apache.flink.table.store.file.KeyValue;
+import org.apache.flink.table.store.file.Snapshot;
+import org.apache.flink.table.store.file.TestFileStore;
+import org.apache.flink.table.store.file.TestKeyValueGenerator;
+import org.apache.flink.table.store.file.mergetree.compact.DeduplicateMergeFunction;
+import org.apache.flink.table.store.file.schema.SchemaManager;
+import org.apache.flink.table.store.file.schema.UpdateSchema;
+import org.apache.flink.table.store.file.utils.SnapshotManager;
+
+import org.junit.jupiter.api.BeforeEach;
+import org.junit.jupiter.api.io.TempDir;
+
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.List;
+import java.util.Map;
+import java.util.concurrent.ThreadLocalRandom;
+
+import static org.assertj.core.api.Assertions.assertThat;
+
+/** Base test class for {@link FileStoreExpireImpl}. */
+public class FileStoreExpireTestBase {
+
+ protected TestKeyValueGenerator gen;
+ @TempDir java.nio.file.Path tempDir;
+ protected TestFileStore store;
+ protected SnapshotManager snapshotManager;
+
+ @BeforeEach
+ public void beforeEach() throws Exception {
+ gen = new TestKeyValueGenerator();
+ store = createStore();
+ snapshotManager = store.snapshotManager();
+ SchemaManager schemaManager = new SchemaManager(new Path(tempDir.toUri()));
+ schemaManager.commitNewVersion(
+ new UpdateSchema(
+ TestKeyValueGenerator.DEFAULT_ROW_TYPE,
+ TestKeyValueGenerator.DEFAULT_PART_TYPE.getFieldNames(),
+ TestKeyValueGenerator.getPrimaryKeys(
+ TestKeyValueGenerator.GeneratorMode.MULTI_PARTITIONED),
+ Collections.emptyMap(),
+ null));
+ }
+
+ private TestFileStore createStore() {
+ ThreadLocalRandom random = ThreadLocalRandom.current();
+
+ CoreOptions.ChangelogProducer changelogProducer;
+ if (random.nextBoolean()) {
+ changelogProducer = CoreOptions.ChangelogProducer.INPUT;
+ } else {
+ changelogProducer = CoreOptions.ChangelogProducer.NONE;
+ }
+
+ return new TestFileStore.Builder(
+ "avro",
+ tempDir.toString(),
+ 1,
+ TestKeyValueGenerator.DEFAULT_PART_TYPE,
+ TestKeyValueGenerator.KEY_TYPE,
+ TestKeyValueGenerator.DEFAULT_ROW_TYPE,
+ TestKeyValueGenerator.TestKeyValueFieldsExtractor.EXTRACTOR,
+ DeduplicateMergeFunction.factory())
+ .changelogProducer(changelogProducer)
+ .build();
+ }
+
+ protected void commit(int numCommits, List<KeyValue> allData, List<Integer> snapshotPositions)
+ throws Exception {
+ for (int i = 0; i < numCommits; i++) {
+ int numRecords = ThreadLocalRandom.current().nextInt(100) + 1;
+ List<KeyValue> data = new ArrayList<>();
+ for (int j = 0; j < numRecords; j++) {
+ data.add(gen.next());
+ }
+ allData.addAll(data);
+ List<Snapshot> snapshots = store.commitData(data, gen::getPartition, kv -> 0);
+ for (int j = 0; j < snapshots.size(); j++) {
+ snapshotPositions.add(allData.size());
+ }
+ }
+ }
+
+ protected void assertSnapshot(
+ int snapshotId, List<KeyValue> allData, List<Integer> snapshotPositions)
+ throws Exception {
+ Map<BinaryRowData, BinaryRowData> expected =
+ store.toKvMap(allData.subList(0, snapshotPositions.get(snapshotId - 1)));
+ List<KeyValue> actualKvs =
+ store.readKvsFromManifestEntries(
+ store.newScan().withSnapshot(snapshotId).plan().files(), false);
+ gen.sort(actualKvs);
+ Map<BinaryRowData, BinaryRowData> actual = store.toKvMap(actualKvs);
+ assertThat(actual).isEqualTo(expected);
+ }
+}
diff --git a/flink-table-store-core/src/test/java/org/apache/flink/table/store/file/operation/UncleanedFileStoreExpireTest.java b/flink-table-store-core/src/test/java/org/apache/flink/table/store/file/operation/UncleanedFileStoreExpireTest.java
new file mode 100644
index 00000000..0f7b38df
--- /dev/null
+++ b/flink-table-store-core/src/test/java/org/apache/flink/table/store/file/operation/UncleanedFileStoreExpireTest.java
@@ -0,0 +1,84 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.store.file.operation;
+
+import org.apache.flink.core.fs.Path;
+import org.apache.flink.table.store.file.KeyValue;
+import org.apache.flink.table.store.file.utils.FileUtils;
+
+import org.junit.jupiter.api.Test;
+
+import java.nio.file.Files;
+import java.nio.file.Paths;
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.List;
+import java.util.Set;
+import java.util.concurrent.ThreadLocalRandom;
+import java.util.stream.Collectors;
+
+import static org.assertj.core.api.Assertions.assertThat;
+
+/**
+ * Tests for {@link FileStoreExpireImpl}. Some files not in use may still remain after the test due
+ * to the testing methods.
+ */
+public class UncleanedFileStoreExpireTest extends FileStoreExpireTestBase {
+
+ @Test
+ public void testExpireWithMissingFiles() throws Exception {
+ FileStoreExpire expire = store.newExpire(1, 1, 1);
+
+ List<KeyValue> allData = new ArrayList<>();
+ List<Integer> snapshotPositions = new ArrayList<>();
+ commit(5, allData, snapshotPositions);
+
+ int latestSnapshotId = snapshotManager.latestSnapshotId().intValue();
+ Set<Path> filesInUse = store.getFilesInUse(latestSnapshotId);
+ List<Path> unusedFileList =
+ Files.walk(Paths.get(tempDir.toString()))
+ .filter(Files::isRegularFile)
+ .filter(p -> !p.getFileName().toString().startsWith("snapshot"))
+ .filter(p -> !p.getFileName().toString().startsWith("schema"))
+ .map(p -> new Path(p.toString()))
+ .filter(p -> !filesInUse.contains(p))
+ .collect(Collectors.toList());
+
+ // shuffle list
+ ThreadLocalRandom random = ThreadLocalRandom.current();
+ for (int i = unusedFileList.size() - 1; i > 0; i--) {
+ int j = random.nextInt(i + 1);
+ Collections.swap(unusedFileList, i, j);
+ }
+
+ // delete some unused files
+ int numFilesToDelete = random.nextInt(unusedFileList.size());
+ for (int i = 0; i < numFilesToDelete; i++) {
+ FileUtils.deleteOrWarn(unusedFileList.get(i));
+ }
+
+ expire.expire();
+
+ for (int i = 1; i < latestSnapshotId; i++) {
+ assertThat(snapshotManager.snapshotExists(i)).isFalse();
+ }
+ assertThat(snapshotManager.snapshotExists(latestSnapshotId)).isTrue();
+ assertSnapshot(latestSnapshotId, allData, snapshotPositions);
+ }
+}