You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by cz...@apache.org on 2023/01/10 10:39:12 UTC

[flink-table-store] branch release-0.3 updated: [FLINK-30611] Expiration can be performed with missing files

This is an automated email from the ASF dual-hosted git repository.

czweng pushed a commit to branch release-0.3
in repository https://gitbox.apache.org/repos/asf/flink-table-store.git


The following commit(s) were added to refs/heads/release-0.3 by this push:
     new 1f0f28c3 [FLINK-30611] Expiration can be performed with missing files
1f0f28c3 is described below

commit 1f0f28c3cf9c680ba9294362f3a4ba128236f8ab
Author: tsreaper <ts...@gmail.com>
AuthorDate: Tue Jan 10 18:38:05 2023 +0800

    [FLINK-30611] Expiration can be performed with missing files
    
    This closes #474.
    
    (cherry picked from commit 929a411f29f4fc76dfead6e716daae56c165724c)
---
 .../table/store/file/manifest/ManifestFile.java    |  23 ----
 .../store/file/operation/FileStoreExpireImpl.java  |  48 ++++++++-
 .../flink/table/store/file/TestFileStore.java      |  64 ++++++-----
 ...reTest.java => CleanedFileStoreExpireTest.java} |  90 +---------------
 .../file/operation/FileStoreExpireTestBase.java    | 119 +++++++++++++++++++++
 .../operation/UncleanedFileStoreExpireTest.java    |  84 +++++++++++++++
 6 files changed, 288 insertions(+), 140 deletions(-)

diff --git a/flink-table-store-core/src/main/java/org/apache/flink/table/store/file/manifest/ManifestFile.java b/flink-table-store-core/src/main/java/org/apache/flink/table/store/file/manifest/ManifestFile.java
index c6dbab0b..77b26de2 100644
--- a/flink-table-store-core/src/main/java/org/apache/flink/table/store/file/manifest/ManifestFile.java
+++ b/flink-table-store-core/src/main/java/org/apache/flink/table/store/file/manifest/ManifestFile.java
@@ -35,13 +35,8 @@ import org.apache.flink.table.store.format.FieldStatsCollector;
 import org.apache.flink.table.store.format.FileFormat;
 import org.apache.flink.table.types.logical.RowType;
 
-import org.apache.flink.shaded.guava30.com.google.common.collect.Iterables;
-
 import java.io.IOException;
-import java.util.Iterator;
-import java.util.LinkedList;
 import java.util.List;
-import java.util.Queue;
 
 /**
  * This file includes several {@link ManifestEntry}s, representing the additional changes since last
@@ -91,24 +86,6 @@ public class ManifestFile {
         }
     }
 
-    public Iterable<ManifestEntry> readManifestFiles(List<String> manifestFiles) {
-        Queue<String> files = new LinkedList<>(manifestFiles);
-        return Iterables.concat(
-                (Iterable<Iterable<ManifestEntry>>)
-                        () ->
-                                new Iterator<Iterable<ManifestEntry>>() {
-                                    @Override
-                                    public boolean hasNext() {
-                                        return files.size() > 0;
-                                    }
-
-                                    @Override
-                                    public Iterable<ManifestEntry> next() {
-                                        return read(files.poll());
-                                    }
-                                });
-    }
-
     /**
      * Write several {@link ManifestEntry}s into manifest files.
      *
diff --git a/flink-table-store-core/src/main/java/org/apache/flink/table/store/file/operation/FileStoreExpireImpl.java b/flink-table-store-core/src/main/java/org/apache/flink/table/store/file/operation/FileStoreExpireImpl.java
index 5f85cb8a..fd7012f9 100644
--- a/flink-table-store-core/src/main/java/org/apache/flink/table/store/file/operation/FileStoreExpireImpl.java
+++ b/flink-table-store-core/src/main/java/org/apache/flink/table/store/file/operation/FileStoreExpireImpl.java
@@ -29,14 +29,20 @@ import org.apache.flink.table.store.file.utils.FileStorePathFactory;
 import org.apache.flink.table.store.file.utils.FileUtils;
 import org.apache.flink.table.store.file.utils.SnapshotManager;
 
+import org.apache.flink.shaded.guava30.com.google.common.collect.Iterables;
+
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
 import java.util.ArrayList;
+import java.util.Collections;
 import java.util.HashMap;
 import java.util.HashSet;
+import java.util.Iterator;
+import java.util.LinkedList;
 import java.util.List;
 import java.util.Map;
+import java.util.Queue;
 import java.util.Set;
 import java.util.concurrent.Callable;
 import java.util.stream.Collectors;
@@ -184,9 +190,14 @@ public class FileStoreExpireImpl implements FileStoreExpire {
             }
 
             Snapshot toExpire = snapshotManager.snapshot(id);
+            // cannot call `toExpire.readAllDataManifests` directly, it is possible that a job is
+            // killed during expiration, so some manifest files may have been deleted
+            List<ManifestFileMeta> toExpireManifests = new ArrayList<>();
+            toExpireManifests.addAll(tryReadManifestList(toExpire.baseManifestList()));
+            toExpireManifests.addAll(tryReadManifestList(toExpire.deltaManifestList()));
 
             // delete manifest
-            for (ManifestFileMeta manifest : toExpire.readAllDataManifests(manifestList)) {
+            for (ManifestFileMeta manifest : toExpireManifests) {
                 if (!manifestsInUse.contains(manifest) && !deletedManifests.contains(manifest)) {
                     manifestFile.delete(manifest.fileName());
                     deletedManifests.add(manifest);
@@ -194,7 +205,7 @@ public class FileStoreExpireImpl implements FileStoreExpire {
             }
             if (toExpire.changelogManifestList() != null) {
                 for (ManifestFileMeta manifest :
-                        manifestList.read(toExpire.changelogManifestList())) {
+                        tryReadManifestList(toExpire.changelogManifestList())) {
                     manifestFile.delete(manifest.fileName());
                 }
             }
@@ -260,10 +271,39 @@ public class FileStoreExpireImpl implements FileStoreExpire {
 
     private Iterable<ManifestEntry> getManifestEntriesFromManifestList(String manifestListName) {
         List<String> manifestFiles =
-                manifestList.read(manifestListName).stream()
+                tryReadManifestList(manifestListName).stream()
                         .map(ManifestFileMeta::fileName)
                         .collect(Collectors.toList());
-        return manifestFile.readManifestFiles(manifestFiles);
+        Queue<String> files = new LinkedList<>(manifestFiles);
+        return Iterables.concat(
+                (Iterable<Iterable<ManifestEntry>>)
+                        () ->
+                                new Iterator<Iterable<ManifestEntry>>() {
+                                    @Override
+                                    public boolean hasNext() {
+                                        return files.size() > 0;
+                                    }
+
+                                    @Override
+                                    public Iterable<ManifestEntry> next() {
+                                        String file = files.poll();
+                                        try {
+                                            return manifestFile.read(file);
+                                        } catch (Exception e) {
+                                            LOG.warn("Failed to read manifest file " + file, e);
+                                            return Collections.emptyList();
+                                        }
+                                    }
+                                });
+    }
+
+    private List<ManifestFileMeta> tryReadManifestList(String manifestListName) {
+        try {
+            return manifestList.read(manifestListName);
+        } catch (Exception e) {
+            LOG.warn("Failed to read manifest list file " + manifestListName, e);
+            return Collections.emptyList();
+        }
     }
 
     private void writeEarliestHint(long earliest) {
diff --git a/flink-table-store-core/src/test/java/org/apache/flink/table/store/file/TestFileStore.java b/flink-table-store-core/src/test/java/org/apache/flink/table/store/file/TestFileStore.java
index 376befa8..0c61503f 100644
--- a/flink-table-store-core/src/test/java/org/apache/flink/table/store/file/TestFileStore.java
+++ b/flink-table-store-core/src/test/java/org/apache/flink/table/store/file/TestFileStore.java
@@ -404,9 +404,6 @@ public class TestFileStore extends KeyValueFileStore {
 
     private Set<Path> getFilesInUse() {
         Set<Path> result = new HashSet<>();
-        FileStorePathFactory pathFactory = pathFactory();
-        ManifestList manifestList = manifestListFactory().create();
-        FileStoreScan scan = newScan();
 
         SchemaManager schemaManager = new SchemaManager(options.path());
         schemaManager.listAllIds().forEach(id -> result.add(schemaManager.toSchemaPath(id)));
@@ -427,35 +424,48 @@ public class TestFileStore extends KeyValueFileStore {
         }
 
         for (long id = firstInUseSnapshotId; id <= latestSnapshotId; id++) {
-            Path snapshotPath = snapshotManager.snapshotPath(id);
-            Snapshot snapshot = Snapshot.fromPath(snapshotPath);
+            result.addAll(getFilesInUse(id));
+        }
+        return result;
+    }
 
-            // snapshot file
-            result.add(snapshotPath);
+    public Set<Path> getFilesInUse(long snapshotId) {
+        Set<Path> result = new HashSet<>();
 
-            // manifest lists
-            result.add(pathFactory.toManifestListPath(snapshot.baseManifestList()));
-            result.add(pathFactory.toManifestListPath(snapshot.deltaManifestList()));
-            if (snapshot.changelogManifestList() != null) {
-                result.add(pathFactory.toManifestListPath(snapshot.changelogManifestList()));
-            }
+        SnapshotManager snapshotManager = snapshotManager();
+        FileStorePathFactory pathFactory = pathFactory();
+        ManifestList manifestList = manifestListFactory().create();
+        FileStoreScan scan = newScan();
 
-            // manifests
-            List<ManifestFileMeta> manifests = snapshot.readAllDataManifests(manifestList);
-            if (snapshot.changelogManifestList() != null) {
-                manifests.addAll(manifestList.read(snapshot.changelogManifestList()));
-            }
-            manifests.forEach(m -> result.add(pathFactory.toManifestFilePath(m.fileName())));
+        Path snapshotPath = snapshotManager.snapshotPath(snapshotId);
+        Snapshot snapshot = Snapshot.fromPath(snapshotPath);
 
-            // data file
-            List<ManifestEntry> entries = scan.withManifestList(manifests).plan().files();
-            for (ManifestEntry entry : entries) {
-                result.add(
-                        new Path(
-                                pathFactory.bucketPath(entry.partition(), entry.bucket()),
-                                entry.file().fileName()));
-            }
+        // snapshot file
+        result.add(snapshotPath);
+
+        // manifest lists
+        result.add(pathFactory.toManifestListPath(snapshot.baseManifestList()));
+        result.add(pathFactory.toManifestListPath(snapshot.deltaManifestList()));
+        if (snapshot.changelogManifestList() != null) {
+            result.add(pathFactory.toManifestListPath(snapshot.changelogManifestList()));
         }
+
+        // manifests
+        List<ManifestFileMeta> manifests = snapshot.readAllDataManifests(manifestList);
+        if (snapshot.changelogManifestList() != null) {
+            manifests.addAll(manifestList.read(snapshot.changelogManifestList()));
+        }
+        manifests.forEach(m -> result.add(pathFactory.toManifestFilePath(m.fileName())));
+
+        // data file
+        List<ManifestEntry> entries = scan.withManifestList(manifests).plan().files();
+        for (ManifestEntry entry : entries) {
+            result.add(
+                    new Path(
+                            pathFactory.bucketPath(entry.partition(), entry.bucket()),
+                            entry.file().fileName()));
+        }
+
         return result;
     }
 
diff --git a/flink-table-store-core/src/test/java/org/apache/flink/table/store/file/operation/FileStoreExpireTest.java b/flink-table-store-core/src/test/java/org/apache/flink/table/store/file/operation/CleanedFileStoreExpireTest.java
similarity index 68%
rename from flink-table-store-core/src/test/java/org/apache/flink/table/store/file/operation/FileStoreExpireTest.java
rename to flink-table-store-core/src/test/java/org/apache/flink/table/store/file/operation/CleanedFileStoreExpireTest.java
index 8c4da4c2..a03717d9 100644
--- a/flink-table-store-core/src/test/java/org/apache/flink/table/store/file/operation/FileStoreExpireTest.java
+++ b/flink-table-store-core/src/test/java/org/apache/flink/table/store/file/operation/CleanedFileStoreExpireTest.java
@@ -21,82 +21,29 @@ package org.apache.flink.table.store.file.operation;
 import org.apache.flink.core.fs.FileSystem;
 import org.apache.flink.core.fs.Path;
 import org.apache.flink.table.data.binary.BinaryRowData;
-import org.apache.flink.table.store.CoreOptions;
 import org.apache.flink.table.store.file.KeyValue;
-import org.apache.flink.table.store.file.Snapshot;
-import org.apache.flink.table.store.file.TestFileStore;
-import org.apache.flink.table.store.file.TestKeyValueGenerator;
 import org.apache.flink.table.store.file.io.DataFileMeta;
 import org.apache.flink.table.store.file.manifest.FileKind;
 import org.apache.flink.table.store.file.manifest.ManifestEntry;
-import org.apache.flink.table.store.file.mergetree.compact.DeduplicateMergeFunction;
-import org.apache.flink.table.store.file.schema.SchemaManager;
-import org.apache.flink.table.store.file.schema.UpdateSchema;
 import org.apache.flink.table.store.file.utils.FileUtils;
 import org.apache.flink.table.store.file.utils.SnapshotManager;
 
 import org.junit.jupiter.api.AfterEach;
-import org.junit.jupiter.api.BeforeEach;
 import org.junit.jupiter.api.Test;
-import org.junit.jupiter.api.io.TempDir;
 
 import java.io.IOException;
 import java.util.ArrayList;
 import java.util.Arrays;
-import java.util.Collections;
 import java.util.List;
-import java.util.Map;
 import java.util.concurrent.ThreadLocalRandom;
 
 import static org.apache.flink.table.data.binary.BinaryRowDataUtil.EMPTY_ROW;
 import static org.assertj.core.api.Assertions.assertThat;
 
-/** Tests for {@link FileStoreExpireImpl}. */
-public class FileStoreExpireTest {
-
-    private TestKeyValueGenerator gen;
-    @TempDir java.nio.file.Path tempDir;
-    private TestFileStore store;
-    private SnapshotManager snapshotManager;
-
-    @BeforeEach
-    public void beforeEach() throws Exception {
-        gen = new TestKeyValueGenerator();
-        store = createStore();
-        snapshotManager = store.snapshotManager();
-        SchemaManager schemaManager = new SchemaManager(new Path(tempDir.toUri()));
-        schemaManager.commitNewVersion(
-                new UpdateSchema(
-                        TestKeyValueGenerator.DEFAULT_ROW_TYPE,
-                        TestKeyValueGenerator.DEFAULT_PART_TYPE.getFieldNames(),
-                        TestKeyValueGenerator.getPrimaryKeys(
-                                TestKeyValueGenerator.GeneratorMode.MULTI_PARTITIONED),
-                        Collections.emptyMap(),
-                        null));
-    }
-
-    private TestFileStore createStore() {
-        ThreadLocalRandom random = ThreadLocalRandom.current();
-
-        CoreOptions.ChangelogProducer changelogProducer;
-        if (random.nextBoolean()) {
-            changelogProducer = CoreOptions.ChangelogProducer.INPUT;
-        } else {
-            changelogProducer = CoreOptions.ChangelogProducer.NONE;
-        }
-
-        return new TestFileStore.Builder(
-                        "avro",
-                        tempDir.toString(),
-                        1,
-                        TestKeyValueGenerator.DEFAULT_PART_TYPE,
-                        TestKeyValueGenerator.KEY_TYPE,
-                        TestKeyValueGenerator.DEFAULT_ROW_TYPE,
-                        TestKeyValueGenerator.TestKeyValueFieldsExtractor.EXTRACTOR,
-                        DeduplicateMergeFunction.factory())
-                .changelogProducer(changelogProducer)
-                .build();
-    }
+/**
+ * Tests for {@link FileStoreExpireImpl}. After expiration, only useful files should be retained.
+ */
+public class CleanedFileStoreExpireTest extends FileStoreExpireTestBase {
 
     @AfterEach
     public void afterEach() throws IOException {
@@ -265,33 +212,4 @@ public class FileStoreExpireTest {
             }
         }
     }
-
-    private void commit(int numCommits, List<KeyValue> allData, List<Integer> snapshotPositions)
-            throws Exception {
-        for (int i = 0; i < numCommits; i++) {
-            int numRecords = ThreadLocalRandom.current().nextInt(100) + 1;
-            List<KeyValue> data = new ArrayList<>();
-            for (int j = 0; j < numRecords; j++) {
-                data.add(gen.next());
-            }
-            allData.addAll(data);
-            List<Snapshot> snapshots = store.commitData(data, gen::getPartition, kv -> 0);
-            for (int j = 0; j < snapshots.size(); j++) {
-                snapshotPositions.add(allData.size());
-            }
-        }
-    }
-
-    private void assertSnapshot(
-            int snapshotId, List<KeyValue> allData, List<Integer> snapshotPositions)
-            throws Exception {
-        Map<BinaryRowData, BinaryRowData> expected =
-                store.toKvMap(allData.subList(0, snapshotPositions.get(snapshotId - 1)));
-        List<KeyValue> actualKvs =
-                store.readKvsFromManifestEntries(
-                        store.newScan().withSnapshot(snapshotId).plan().files(), false);
-        gen.sort(actualKvs);
-        Map<BinaryRowData, BinaryRowData> actual = store.toKvMap(actualKvs);
-        assertThat(actual).isEqualTo(expected);
-    }
 }
diff --git a/flink-table-store-core/src/test/java/org/apache/flink/table/store/file/operation/FileStoreExpireTestBase.java b/flink-table-store-core/src/test/java/org/apache/flink/table/store/file/operation/FileStoreExpireTestBase.java
new file mode 100644
index 00000000..3cddfb60
--- /dev/null
+++ b/flink-table-store-core/src/test/java/org/apache/flink/table/store/file/operation/FileStoreExpireTestBase.java
@@ -0,0 +1,119 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.store.file.operation;
+
+import org.apache.flink.core.fs.Path;
+import org.apache.flink.table.data.binary.BinaryRowData;
+import org.apache.flink.table.store.CoreOptions;
+import org.apache.flink.table.store.file.KeyValue;
+import org.apache.flink.table.store.file.Snapshot;
+import org.apache.flink.table.store.file.TestFileStore;
+import org.apache.flink.table.store.file.TestKeyValueGenerator;
+import org.apache.flink.table.store.file.mergetree.compact.DeduplicateMergeFunction;
+import org.apache.flink.table.store.file.schema.SchemaManager;
+import org.apache.flink.table.store.file.schema.UpdateSchema;
+import org.apache.flink.table.store.file.utils.SnapshotManager;
+
+import org.junit.jupiter.api.BeforeEach;
+import org.junit.jupiter.api.io.TempDir;
+
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.List;
+import java.util.Map;
+import java.util.concurrent.ThreadLocalRandom;
+
+import static org.assertj.core.api.Assertions.assertThat;
+
+/** Base test class for {@link FileStoreExpireImpl}. */
+public class FileStoreExpireTestBase {
+
+    protected TestKeyValueGenerator gen;
+    @TempDir java.nio.file.Path tempDir;
+    protected TestFileStore store;
+    protected SnapshotManager snapshotManager;
+
+    @BeforeEach
+    public void beforeEach() throws Exception {
+        gen = new TestKeyValueGenerator();
+        store = createStore();
+        snapshotManager = store.snapshotManager();
+        SchemaManager schemaManager = new SchemaManager(new Path(tempDir.toUri()));
+        schemaManager.commitNewVersion(
+                new UpdateSchema(
+                        TestKeyValueGenerator.DEFAULT_ROW_TYPE,
+                        TestKeyValueGenerator.DEFAULT_PART_TYPE.getFieldNames(),
+                        TestKeyValueGenerator.getPrimaryKeys(
+                                TestKeyValueGenerator.GeneratorMode.MULTI_PARTITIONED),
+                        Collections.emptyMap(),
+                        null));
+    }
+
+    private TestFileStore createStore() {
+        ThreadLocalRandom random = ThreadLocalRandom.current();
+
+        CoreOptions.ChangelogProducer changelogProducer;
+        if (random.nextBoolean()) {
+            changelogProducer = CoreOptions.ChangelogProducer.INPUT;
+        } else {
+            changelogProducer = CoreOptions.ChangelogProducer.NONE;
+        }
+
+        return new TestFileStore.Builder(
+                        "avro",
+                        tempDir.toString(),
+                        1,
+                        TestKeyValueGenerator.DEFAULT_PART_TYPE,
+                        TestKeyValueGenerator.KEY_TYPE,
+                        TestKeyValueGenerator.DEFAULT_ROW_TYPE,
+                        TestKeyValueGenerator.TestKeyValueFieldsExtractor.EXTRACTOR,
+                        DeduplicateMergeFunction.factory())
+                .changelogProducer(changelogProducer)
+                .build();
+    }
+
+    protected void commit(int numCommits, List<KeyValue> allData, List<Integer> snapshotPositions)
+            throws Exception {
+        for (int i = 0; i < numCommits; i++) {
+            int numRecords = ThreadLocalRandom.current().nextInt(100) + 1;
+            List<KeyValue> data = new ArrayList<>();
+            for (int j = 0; j < numRecords; j++) {
+                data.add(gen.next());
+            }
+            allData.addAll(data);
+            List<Snapshot> snapshots = store.commitData(data, gen::getPartition, kv -> 0);
+            for (int j = 0; j < snapshots.size(); j++) {
+                snapshotPositions.add(allData.size());
+            }
+        }
+    }
+
+    protected void assertSnapshot(
+            int snapshotId, List<KeyValue> allData, List<Integer> snapshotPositions)
+            throws Exception {
+        Map<BinaryRowData, BinaryRowData> expected =
+                store.toKvMap(allData.subList(0, snapshotPositions.get(snapshotId - 1)));
+        List<KeyValue> actualKvs =
+                store.readKvsFromManifestEntries(
+                        store.newScan().withSnapshot(snapshotId).plan().files(), false);
+        gen.sort(actualKvs);
+        Map<BinaryRowData, BinaryRowData> actual = store.toKvMap(actualKvs);
+        assertThat(actual).isEqualTo(expected);
+    }
+}
diff --git a/flink-table-store-core/src/test/java/org/apache/flink/table/store/file/operation/UncleanedFileStoreExpireTest.java b/flink-table-store-core/src/test/java/org/apache/flink/table/store/file/operation/UncleanedFileStoreExpireTest.java
new file mode 100644
index 00000000..0f7b38df
--- /dev/null
+++ b/flink-table-store-core/src/test/java/org/apache/flink/table/store/file/operation/UncleanedFileStoreExpireTest.java
@@ -0,0 +1,84 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.store.file.operation;
+
+import org.apache.flink.core.fs.Path;
+import org.apache.flink.table.store.file.KeyValue;
+import org.apache.flink.table.store.file.utils.FileUtils;
+
+import org.junit.jupiter.api.Test;
+
+import java.nio.file.Files;
+import java.nio.file.Paths;
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.List;
+import java.util.Set;
+import java.util.concurrent.ThreadLocalRandom;
+import java.util.stream.Collectors;
+
+import static org.assertj.core.api.Assertions.assertThat;
+
+/**
+ * Tests for {@link FileStoreExpireImpl}. Some files not in use may still remain after the test due
+ * to the testing methods.
+ */
+public class UncleanedFileStoreExpireTest extends FileStoreExpireTestBase {
+
+    @Test
+    public void testExpireWithMissingFiles() throws Exception {
+        FileStoreExpire expire = store.newExpire(1, 1, 1);
+
+        List<KeyValue> allData = new ArrayList<>();
+        List<Integer> snapshotPositions = new ArrayList<>();
+        commit(5, allData, snapshotPositions);
+
+        int latestSnapshotId = snapshotManager.latestSnapshotId().intValue();
+        Set<Path> filesInUse = store.getFilesInUse(latestSnapshotId);
+        List<Path> unusedFileList =
+                Files.walk(Paths.get(tempDir.toString()))
+                        .filter(Files::isRegularFile)
+                        .filter(p -> !p.getFileName().toString().startsWith("snapshot"))
+                        .filter(p -> !p.getFileName().toString().startsWith("schema"))
+                        .map(p -> new Path(p.toString()))
+                        .filter(p -> !filesInUse.contains(p))
+                        .collect(Collectors.toList());
+
+        // shuffle list
+        ThreadLocalRandom random = ThreadLocalRandom.current();
+        for (int i = unusedFileList.size() - 1; i > 0; i--) {
+            int j = random.nextInt(i + 1);
+            Collections.swap(unusedFileList, i, j);
+        }
+
+        // delete some unused files
+        int numFilesToDelete = random.nextInt(unusedFileList.size());
+        for (int i = 0; i < numFilesToDelete; i++) {
+            FileUtils.deleteOrWarn(unusedFileList.get(i));
+        }
+
+        expire.expire();
+
+        for (int i = 1; i < latestSnapshotId; i++) {
+            assertThat(snapshotManager.snapshotExists(i)).isFalse();
+        }
+        assertThat(snapshotManager.snapshotExists(latestSnapshotId)).isTrue();
+        assertSnapshot(latestSnapshotId, allData, snapshotPositions);
+    }
+}