You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by ma...@apache.org on 2022/03/10 12:07:39 UTC
[flink] branch master updated: [FLINK-26450] Makes FileStateHandle.discardState fail if the file couldn't be deleted
This is an automated email from the ASF dual-hosted git repository.
mapohl pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/flink.git
The following commit(s) were added to refs/heads/master by this push:
new 0318b26 [FLINK-26450] Makes FileStateHandle.discardState fail if the file couldn't be deleted
0318b26 is described below
commit 0318b260f71d357fea020baf8138435614b19287
Author: Matthias Pohl <ma...@ververica.com>
AuthorDate: Fri Mar 4 22:38:27 2022 +0100
[FLINK-26450] Makes FileStateHandle.discardState fail if the file couldn't be deleted
---
.../runtime/state/filesystem/FileStateHandle.java | 22 +-
.../state/filesystem/FileStateHandleTest.java | 243 ++++++++++++++++++---
2 files changed, 230 insertions(+), 35 deletions(-)
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/state/filesystem/FileStateHandle.java b/flink-runtime/src/main/java/org/apache/flink/runtime/state/filesystem/FileStateHandle.java
index 519482d..8b94047 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/state/filesystem/FileStateHandle.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/state/filesystem/FileStateHandle.java
@@ -81,8 +81,26 @@ public class FileStateHandle implements StreamStateHandle {
*/
@Override
public void discardState() throws Exception {
- FileSystem fs = getFileSystem();
- fs.delete(filePath, false);
+ final FileSystem fs = getFileSystem();
+
+ IOException actualException = null;
+ boolean success = true;
+ try {
+ success = fs.delete(filePath, false);
+ } catch (IOException e) {
+ actualException = e;
+ }
+
+ if (!success || actualException != null) {
+ if (fs.exists(filePath)) {
+ throw Optional.ofNullable(actualException)
+ .orElse(
+ new IOException(
+ "Unknown error caused the file '"
+ + filePath
+ + "' to not be deleted."));
+ }
+ }
}
/**
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/state/filesystem/FileStateHandleTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/state/filesystem/FileStateHandleTest.java
index 1b6ea3c..a8ee90a 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/state/filesystem/FileStateHandleTest.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/state/filesystem/FileStateHandleTest.java
@@ -18,35 +18,41 @@
package org.apache.flink.runtime.state.filesystem;
+import org.apache.flink.configuration.Configuration;
+import org.apache.flink.core.fs.FileSystem;
+import org.apache.flink.core.fs.FileSystemFactory;
import org.apache.flink.core.fs.Path;
+import org.apache.flink.core.fs.local.LocalFileSystem;
+import org.apache.flink.core.plugin.TestingPluginManager;
+import org.apache.flink.util.TestLoggerExtension;
+import org.apache.flink.util.function.BiFunctionWithException;
+import org.apache.flink.util.function.FunctionWithException;
+import org.apache.flink.util.function.RunnableWithException;
-import org.junit.Rule;
-import org.junit.Test;
-import org.junit.rules.TemporaryFolder;
+import org.junit.jupiter.api.Test;
+import org.junit.jupiter.api.extension.ExtendWith;
-import java.io.File;
-import java.io.FileOutputStream;
import java.io.IOException;
-import java.io.OutputStream;
-import java.util.Random;
+import java.net.URI;
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.Iterator;
+import java.util.List;
+import java.util.Map;
-import static org.junit.Assert.assertFalse;
-import static org.junit.Assert.assertTrue;
+import static org.assertj.core.api.Assertions.assertThat;
+import static org.junit.jupiter.api.Assertions.assertThrows;
+import static org.junit.jupiter.api.Assertions.fail;
/** Tests for the {@link FileStateHandle}. */
+@ExtendWith(TestLoggerExtension.class)
public class FileStateHandleTest {
- @Rule public final TemporaryFolder tempFolder = new TemporaryFolder();
+ private static final String TEST_SCHEME = "test";
- @Test
- public void testDisposeDeletesFile() throws Exception {
- File file = tempFolder.newFile();
- writeTestData(file);
- assertTrue(file.exists());
-
- FileStateHandle handle = new FileStateHandle(Path.fromLocalFile(file), file.length());
- handle.discardState();
- assertFalse(file.exists());
+ private static Path resolve(String... segments) {
+ return new Path(TEST_SCHEME + "://" + String.join("/", segments));
}
/**
@@ -57,27 +63,198 @@ public class FileStateHandleTest {
*/
@Test
public void testDisposeDoesNotDeleteParentDirectory() throws Exception {
- File parentDir = tempFolder.newFolder();
- assertTrue(parentDir.exists());
+ final Path p = resolve("path", "with", "parent");
+ final List<Path> pathsToDelete = new ArrayList<>();
- File file = new File(parentDir, "test");
- writeTestData(file);
- assertTrue(file.exists());
+ initializeFileSystem(
+ MockedLocalFileSystem.newBuilder()
+ .setDeleteFunction(
+ (path, ignoredRecursionMarker) -> {
+ pathsToDelete.add(path);
+ return true;
+ })
+ .build());
- FileStateHandle handle = new FileStateHandle(Path.fromLocalFile(file), file.length());
+ final FileStateHandle handle = new FileStateHandle(p, 42);
handle.discardState();
- assertFalse(file.exists());
- assertTrue(parentDir.exists());
+ assertThat(pathsToDelete)
+ .as(
+ "Only one delete call should have happened on the actual path but not the parent.")
+ .singleElement()
+ .isEqualTo(p);
+ }
+
+ @Test
+ public void testDiscardStateForNonExistingFileWithoutAnErrorBeingExposedByTheFileSystem()
+ throws Exception {
+ testDiscardStateForNonExistingFile(
+ MockedLocalFileSystem.newBuilder()
+ // deletion call was successful
+ .setDeleteFunction((ignoredPath, ignoredRecursionMarker) -> true)
+ .setExistsFunction(
+ path ->
+ fail(
+ "The exists call should not have been triggered. This call "
+ + "should be avoided because it might be quite "
+ + "expensive in object stores."))
+ .build());
+ }
+
+ @Test
+ public void testDiscardStateForNonExistingFileWithDeleteCallReturningFalse() throws Exception {
+ testDiscardStateForNonExistingFile(
+ MockedLocalFileSystem.newBuilder()
+ .setDeleteFunction((ignoredPath, ignoredRecursionMarker) -> false)
+ .setExistsFunction(path -> false)
+ .build());
+ }
+
+ @Test
+ public void testDiscardStateForNonExistingFileWithEDeleteCallFailing() throws Exception {
+ testDiscardStateForNonExistingFile(
+ MockedLocalFileSystem.newBuilder()
+ .setDeleteFunction(
+ (ignoredPath, ignoredRecursionMarker) -> {
+ throw new IOException(
+ "Expected IOException caused by FileSystem.delete.");
+ })
+ .setExistsFunction(path -> false)
+ .build());
+ }
+
+ private void testDiscardStateForNonExistingFile(FileSystem fileSystem) throws Exception {
+ runInFileSystemContext(
+ fileSystem,
+ () -> {
+ final FileStateHandle handle =
+ new FileStateHandle(resolve("path", "to", "state"), 0);
+ // should not fail
+ handle.discardState();
+ });
+ }
+
+ @Test
+ public void testDiscardStateWithDeletionFailureThroughException() throws Exception {
+ testDiscardStateFailed(
+ MockedLocalFileSystem.newBuilder()
+ .setDeleteFunction(
+ (ignoredPath, ignoredRecursionMarker) -> {
+ throw new IOException(
+ "Expected IOException to simulate IO error.");
+ })
+ .setExistsFunction(path -> true)
+ .build());
+ }
+
+ @Test
+ public void testDiscardStateWithDeletionFailureThroughReturnValue() throws Exception {
+ testDiscardStateFailed(
+ MockedLocalFileSystem.newBuilder()
+ .setDeleteFunction((ignoredPath, ignoredRecursionMarker) -> false)
+ .setExistsFunction(path -> true)
+ .build());
+ }
+
+ private static void testDiscardStateFailed(FileSystem fileSystem) throws Exception {
+ runInFileSystemContext(
+ fileSystem,
+ () -> {
+ final FileStateHandle handle =
+ new FileStateHandle(resolve("path", "to", "state"), 0);
+ assertThrows(IOException.class, handle::discardState);
+ });
+ }
+
+ private static void runInFileSystemContext(
+ FileSystem fileSystem, RunnableWithException testCallback) throws Exception {
+ initializeFileSystem(fileSystem);
+
+ try {
+ testCallback.run();
+ } finally {
+ FileSystem.initialize(new Configuration(), null);
+ }
+ }
+
+ private static void initializeFileSystem(FileSystem fileSystem) {
+ final Map<Class<?>, Iterator<?>> fileSystemPlugins = new HashMap<>();
+ fileSystemPlugins.put(
+ FileSystemFactory.class,
+ Collections.singletonList(new TestingFileSystemFactory(TEST_SCHEME, fileSystem))
+ .iterator());
+
+ FileSystem.initialize(new Configuration(), new TestingPluginManager(fileSystemPlugins));
}
- private static void writeTestData(File file) throws IOException {
- final Random rnd = new Random();
+ private static class MockedLocalFileSystem extends LocalFileSystem {
+
+ private final BiFunctionWithException<Path, Boolean, Boolean, IOException> deleteFunction;
+ private final FunctionWithException<Path, Boolean, IOException> existsFunction;
+
+ public MockedLocalFileSystem(
+ BiFunctionWithException<Path, Boolean, Boolean, IOException> deleteFunction,
+ FunctionWithException<Path, Boolean, IOException> existsFunction) {
+ this.deleteFunction = deleteFunction;
+ this.existsFunction = existsFunction;
+ }
+
+ @Override
+ public boolean delete(Path f, boolean recursive) throws IOException {
+ return deleteFunction.apply(f, recursive);
+ }
+
+ @Override
+ public boolean exists(Path f) throws IOException {
+ return existsFunction.apply(f);
+ }
+
+ public static Builder newBuilder() {
+ return new Builder();
+ }
- byte[] data = new byte[rnd.nextInt(1024) + 1];
- rnd.nextBytes(data);
+ private static class Builder {
+
+ private BiFunctionWithException<Path, Boolean, Boolean, IOException> deleteFunction =
+ (ignoredPath, ignoredRecursionMarker) -> true;
+ private FunctionWithException<Path, Boolean, IOException> existsFunction =
+ ignoredPath -> true;
+
+ public Builder setDeleteFunction(
+ BiFunctionWithException<Path, Boolean, Boolean, IOException> deleteFunction) {
+ this.deleteFunction = deleteFunction;
+ return this;
+ }
+
+ public Builder setExistsFunction(
+ FunctionWithException<Path, Boolean, IOException> existsFunction) {
+ this.existsFunction = existsFunction;
+ return this;
+ }
+
+ public MockedLocalFileSystem build() {
+ return new MockedLocalFileSystem(deleteFunction, existsFunction);
+ }
+ }
+ }
+
+ private static class TestingFileSystemFactory implements FileSystemFactory {
+
+ private final String scheme;
+ private final FileSystem fileSystem;
+
+ public TestingFileSystemFactory(String scheme, FileSystem fileSystem) {
+ this.scheme = scheme;
+ this.fileSystem = fileSystem;
+ }
+
+ @Override
+ public String getScheme() {
+ return scheme;
+ }
- try (OutputStream out = new FileOutputStream(file)) {
- out.write(data);
+ @Override
+ public FileSystem create(URI fsUri) throws IOException {
+ return fileSystem;
}
}
}