You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by ma...@apache.org on 2022/03/10 12:07:39 UTC

[flink] branch master updated: [FLINK-26450] Makes FileStateHandle.discardState fail if the file couldn't be deleted

This is an automated email from the ASF dual-hosted git repository.

mapohl pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/flink.git


The following commit(s) were added to refs/heads/master by this push:
     new 0318b26  [FLINK-26450] Makes FileStateHandle.discardState fail if the file couldn't be deleted
0318b26 is described below

commit 0318b260f71d357fea020baf8138435614b19287
Author: Matthias Pohl <ma...@ververica.com>
AuthorDate: Fri Mar 4 22:38:27 2022 +0100

    [FLINK-26450] Makes FileStateHandle.discardState fail if the file couldn't be deleted
---
 .../runtime/state/filesystem/FileStateHandle.java  |  22 +-
 .../state/filesystem/FileStateHandleTest.java      | 243 ++++++++++++++++++---
 2 files changed, 230 insertions(+), 35 deletions(-)

diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/state/filesystem/FileStateHandle.java b/flink-runtime/src/main/java/org/apache/flink/runtime/state/filesystem/FileStateHandle.java
index 519482d..8b94047 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/state/filesystem/FileStateHandle.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/state/filesystem/FileStateHandle.java
@@ -81,8 +81,26 @@ public class FileStateHandle implements StreamStateHandle {
      */
     @Override
     public void discardState() throws Exception {
-        FileSystem fs = getFileSystem();
-        fs.delete(filePath, false);
+        final FileSystem fs = getFileSystem();
+
+        IOException actualException = null;
+        boolean success = true;
+        try {
+            success = fs.delete(filePath, false);
+        } catch (IOException e) {
+            actualException = e;
+        }
+
+        if (!success || actualException != null) {
+            if (fs.exists(filePath)) {
+                throw Optional.ofNullable(actualException)
+                        .orElse(
+                                new IOException(
+                                        "Unknown error caused the file '"
+                                                + filePath
+                                                + "' to not be deleted."));
+            }
+        }
     }
 
     /**
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/state/filesystem/FileStateHandleTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/state/filesystem/FileStateHandleTest.java
index 1b6ea3c..a8ee90a 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/state/filesystem/FileStateHandleTest.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/state/filesystem/FileStateHandleTest.java
@@ -18,35 +18,41 @@
 
 package org.apache.flink.runtime.state.filesystem;
 
+import org.apache.flink.configuration.Configuration;
+import org.apache.flink.core.fs.FileSystem;
+import org.apache.flink.core.fs.FileSystemFactory;
 import org.apache.flink.core.fs.Path;
+import org.apache.flink.core.fs.local.LocalFileSystem;
+import org.apache.flink.core.plugin.TestingPluginManager;
+import org.apache.flink.util.TestLoggerExtension;
+import org.apache.flink.util.function.BiFunctionWithException;
+import org.apache.flink.util.function.FunctionWithException;
+import org.apache.flink.util.function.RunnableWithException;
 
-import org.junit.Rule;
-import org.junit.Test;
-import org.junit.rules.TemporaryFolder;
+import org.junit.jupiter.api.Test;
+import org.junit.jupiter.api.extension.ExtendWith;
 
-import java.io.File;
-import java.io.FileOutputStream;
 import java.io.IOException;
-import java.io.OutputStream;
-import java.util.Random;
+import java.net.URI;
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.Iterator;
+import java.util.List;
+import java.util.Map;
 
-import static org.junit.Assert.assertFalse;
-import static org.junit.Assert.assertTrue;
+import static org.assertj.core.api.Assertions.assertThat;
+import static org.junit.jupiter.api.Assertions.assertThrows;
+import static org.junit.jupiter.api.Assertions.fail;
 
 /** Tests for the {@link FileStateHandle}. */
+@ExtendWith(TestLoggerExtension.class)
 public class FileStateHandleTest {
 
-    @Rule public final TemporaryFolder tempFolder = new TemporaryFolder();
+    private static final String TEST_SCHEME = "test";
 
-    @Test
-    public void testDisposeDeletesFile() throws Exception {
-        File file = tempFolder.newFile();
-        writeTestData(file);
-        assertTrue(file.exists());
-
-        FileStateHandle handle = new FileStateHandle(Path.fromLocalFile(file), file.length());
-        handle.discardState();
-        assertFalse(file.exists());
+    private static Path resolve(String... segments) {
+        return new Path(TEST_SCHEME + "://" + String.join("/", segments));
     }
 
     /**
@@ -57,27 +63,198 @@ public class FileStateHandleTest {
      */
     @Test
     public void testDisposeDoesNotDeleteParentDirectory() throws Exception {
-        File parentDir = tempFolder.newFolder();
-        assertTrue(parentDir.exists());
+        final Path p = resolve("path", "with", "parent");
+        final List<Path> pathsToDelete = new ArrayList<>();
 
-        File file = new File(parentDir, "test");
-        writeTestData(file);
-        assertTrue(file.exists());
+        initializeFileSystem(
+                MockedLocalFileSystem.newBuilder()
+                        .setDeleteFunction(
+                                (path, ignoredRecursionMarker) -> {
+                                    pathsToDelete.add(path);
+                                    return true;
+                                })
+                        .build());
 
-        FileStateHandle handle = new FileStateHandle(Path.fromLocalFile(file), file.length());
+        final FileStateHandle handle = new FileStateHandle(p, 42);
         handle.discardState();
-        assertFalse(file.exists());
-        assertTrue(parentDir.exists());
+        assertThat(pathsToDelete)
+                .as(
+                        "Only one delete call should have happened on the actual path but not the parent.")
+                .singleElement()
+                .isEqualTo(p);
+    }
+
+    @Test
+    public void testDiscardStateForNonExistingFileWithoutAnErrorBeingExposedByTheFileSystem()
+            throws Exception {
+        testDiscardStateForNonExistingFile(
+                MockedLocalFileSystem.newBuilder()
+                        // deletion call was successful
+                        .setDeleteFunction((ignoredPath, ignoredRecursionMarker) -> true)
+                        .setExistsFunction(
+                                path ->
+                                        fail(
+                                                "The exists call should not have been triggered. This call "
+                                                        + "should be avoided because it might be quite "
+                                                        + "expensive in object stores."))
+                        .build());
+    }
+
+    @Test
+    public void testDiscardStateForNonExistingFileWithDeleteCallReturningFalse() throws Exception {
+        testDiscardStateForNonExistingFile(
+                MockedLocalFileSystem.newBuilder()
+                        .setDeleteFunction((ignoredPath, ignoredRecursionMarker) -> false)
+                        .setExistsFunction(path -> false)
+                        .build());
+    }
+
+    @Test
+    public void testDiscardStateForNonExistingFileWithEDeleteCallFailing() throws Exception {
+        testDiscardStateForNonExistingFile(
+                MockedLocalFileSystem.newBuilder()
+                        .setDeleteFunction(
+                                (ignoredPath, ignoredRecursionMarker) -> {
+                                    throw new IOException(
+                                            "Expected IOException caused by FileSystem.delete.");
+                                })
+                        .setExistsFunction(path -> false)
+                        .build());
+    }
+
+    private void testDiscardStateForNonExistingFile(FileSystem fileSystem) throws Exception {
+        runInFileSystemContext(
+                fileSystem,
+                () -> {
+                    final FileStateHandle handle =
+                            new FileStateHandle(resolve("path", "to", "state"), 0);
+                    // should not fail
+                    handle.discardState();
+                });
+    }
+
+    @Test
+    public void testDiscardStateWithDeletionFailureThroughException() throws Exception {
+        testDiscardStateFailed(
+                MockedLocalFileSystem.newBuilder()
+                        .setDeleteFunction(
+                                (ignoredPath, ignoredRecursionMarker) -> {
+                                    throw new IOException(
+                                            "Expected IOException to simulate IO error.");
+                                })
+                        .setExistsFunction(path -> true)
+                        .build());
+    }
+
+    @Test
+    public void testDiscardStateWithDeletionFailureThroughReturnValue() throws Exception {
+        testDiscardStateFailed(
+                MockedLocalFileSystem.newBuilder()
+                        .setDeleteFunction((ignoredPath, ignoredRecursionMarker) -> false)
+                        .setExistsFunction(path -> true)
+                        .build());
+    }
+
+    private static void testDiscardStateFailed(FileSystem fileSystem) throws Exception {
+        runInFileSystemContext(
+                fileSystem,
+                () -> {
+                    final FileStateHandle handle =
+                            new FileStateHandle(resolve("path", "to", "state"), 0);
+                    assertThrows(IOException.class, handle::discardState);
+                });
+    }
+
+    private static void runInFileSystemContext(
+            FileSystem fileSystem, RunnableWithException testCallback) throws Exception {
+        initializeFileSystem(fileSystem);
+
+        try {
+            testCallback.run();
+        } finally {
+            FileSystem.initialize(new Configuration(), null);
+        }
+    }
+
+    private static void initializeFileSystem(FileSystem fileSystem) {
+        final Map<Class<?>, Iterator<?>> fileSystemPlugins = new HashMap<>();
+        fileSystemPlugins.put(
+                FileSystemFactory.class,
+                Collections.singletonList(new TestingFileSystemFactory(TEST_SCHEME, fileSystem))
+                        .iterator());
+
+        FileSystem.initialize(new Configuration(), new TestingPluginManager(fileSystemPlugins));
     }
 
-    private static void writeTestData(File file) throws IOException {
-        final Random rnd = new Random();
+    private static class MockedLocalFileSystem extends LocalFileSystem {
+
+        private final BiFunctionWithException<Path, Boolean, Boolean, IOException> deleteFunction;
+        private final FunctionWithException<Path, Boolean, IOException> existsFunction;
+
+        public MockedLocalFileSystem(
+                BiFunctionWithException<Path, Boolean, Boolean, IOException> deleteFunction,
+                FunctionWithException<Path, Boolean, IOException> existsFunction) {
+            this.deleteFunction = deleteFunction;
+            this.existsFunction = existsFunction;
+        }
+
+        @Override
+        public boolean delete(Path f, boolean recursive) throws IOException {
+            return deleteFunction.apply(f, recursive);
+        }
+
+        @Override
+        public boolean exists(Path f) throws IOException {
+            return existsFunction.apply(f);
+        }
+
+        public static Builder newBuilder() {
+            return new Builder();
+        }
 
-        byte[] data = new byte[rnd.nextInt(1024) + 1];
-        rnd.nextBytes(data);
+        private static class Builder {
+
+            private BiFunctionWithException<Path, Boolean, Boolean, IOException> deleteFunction =
+                    (ignoredPath, ignoredRecursionMarker) -> true;
+            private FunctionWithException<Path, Boolean, IOException> existsFunction =
+                    ignoredPath -> true;
+
+            public Builder setDeleteFunction(
+                    BiFunctionWithException<Path, Boolean, Boolean, IOException> deleteFunction) {
+                this.deleteFunction = deleteFunction;
+                return this;
+            }
+
+            public Builder setExistsFunction(
+                    FunctionWithException<Path, Boolean, IOException> existsFunction) {
+                this.existsFunction = existsFunction;
+                return this;
+            }
+
+            public MockedLocalFileSystem build() {
+                return new MockedLocalFileSystem(deleteFunction, existsFunction);
+            }
+        }
+    }
+
+    private static class TestingFileSystemFactory implements FileSystemFactory {
+
+        private final String scheme;
+        private final FileSystem fileSystem;
+
+        public TestingFileSystemFactory(String scheme, FileSystem fileSystem) {
+            this.scheme = scheme;
+            this.fileSystem = fileSystem;
+        }
+
+        @Override
+        public String getScheme() {
+            return scheme;
+        }
 
-        try (OutputStream out = new FileOutputStream(file)) {
-            out.write(data);
+        @Override
+        public FileSystem create(URI fsUri) throws IOException {
+            return fileSystem;
         }
     }
 }