You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by ch...@apache.org on 2018/01/30 14:55:45 UTC
[2/5] flink git commit: [FLINK-5659] [core] Prevent concurrent
FileUtils.deleteDirectory() under Windows
[FLINK-5659] [core] Prevent concurrent FileUtils.deleteDirectory() under Windows
This serializes these types of cleanup calls under Windows, to work around the
fact that Windows exhibits unpredictable behavior under concurretn file/directory
delete operations. That in turn causes nasty exception stack traces on shutdown
operations where components like I/O manager and Blob Server concurrently clean
up their directories, which may overlap in some subdirectories.
Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/2e63d5a8
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/2e63d5a8
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/2e63d5a8
Branch: refs/heads/master
Commit: 2e63d5a8ec1fa874a61061b72b970879f14c86d9
Parents: e135c3a
Author: Stephan Ewen <se...@apache.org>
Authored: Wed Jan 24 18:43:47 2018 +0100
Committer: zentol <ch...@apache.org>
Committed: Tue Jan 30 15:54:16 2018 +0100
----------------------------------------------------------------------
.../java/org/apache/flink/util/FileUtils.java | 125 ++++++++++++-------
.../java/org/apache/flink/util/LambdaUtil.java | 2 +-
.../flink/util/function/ThrowingConsumer.java | 7 +-
tools/maven/spotbugs-exclude.xml | 7 ++
4 files changed, 95 insertions(+), 46 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/flink/blob/2e63d5a8/flink-core/src/main/java/org/apache/flink/util/FileUtils.java
----------------------------------------------------------------------
diff --git a/flink-core/src/main/java/org/apache/flink/util/FileUtils.java b/flink-core/src/main/java/org/apache/flink/util/FileUtils.java
index 6d15b2e..f47825c 100644
--- a/flink-core/src/main/java/org/apache/flink/util/FileUtils.java
+++ b/flink-core/src/main/java/org/apache/flink/util/FileUtils.java
@@ -21,12 +21,13 @@ package org.apache.flink.util;
import org.apache.flink.core.fs.FileStatus;
import org.apache.flink.core.fs.FileSystem;
import org.apache.flink.core.fs.Path;
+import org.apache.flink.util.function.ThrowingConsumer;
import java.io.File;
import java.io.FileNotFoundException;
import java.io.IOException;
+import java.nio.file.AccessDeniedException;
import java.nio.file.Files;
-import java.nio.file.NoSuchFileException;
import java.nio.file.StandardOpenOption;
import java.util.Random;
@@ -38,6 +39,9 @@ import static org.apache.flink.util.Preconditions.checkNotNull;
*/
public final class FileUtils {
+ /** Global lock to prevent concurrent directory deletes under Windows. */
+ private static final Object WINDOWS_DELETE_LOCK = new Object();
+
/** The alphabet to construct the random part of the filename from. */
private static final char[] ALPHABET =
{ '0', '1', '2', '3', '4', '5', '6', '7', '8', '9', '0', 'a', 'b', 'c', 'd', 'e', 'f' };
@@ -108,19 +112,7 @@ public final class FileUtils {
public static void deleteFileOrDirectory(File file) throws IOException {
checkNotNull(file, "file");
- if (file.isDirectory()) {
- // file exists and is directory
- deleteDirectory(file);
- }
- else if (file.exists()) {
- try {
- Files.delete(file.toPath());
- }
- catch (NoSuchFileException e) {
- // if the file is already gone (concurrently), we don't mind
- }
- }
- // else: already deleted
+ guardIfWindows(FileUtils::deleteFileOrDirectoryInternal, file);
}
/**
@@ -138,34 +130,7 @@ public final class FileUtils {
public static void deleteDirectory(File directory) throws IOException {
checkNotNull(directory, "directory");
- if (directory.isDirectory()) {
- // directory exists and is a directory
-
- // empty the directory first
- try {
- cleanDirectory(directory);
- }
- catch (FileNotFoundException ignored) {
- // someone concurrently deleted the directory, nothing to do for us
- return;
- }
-
- // delete the directory. this fails if the directory is not empty, meaning
- // if new files got concurrently created. we want to fail then.
- try {
- Files.delete(directory.toPath());
- }
- catch (NoSuchFileException ignored) {
- // if someone else deleted this concurrently, we don't mind
- // the result is the same for us, after all
- }
- }
- else if (directory.exists()) {
- // exists but is file, not directory
- // either an error from the caller, or concurrently a file got created
- throw new IOException(directory + " is not a directory");
- }
- // else: does not exist, which is okay (as if deleted)
+ guardIfWindows(FileUtils::deleteDirectoryInternal, directory);
}
/**
@@ -203,6 +168,49 @@ public final class FileUtils {
public static void cleanDirectory(File directory) throws IOException {
checkNotNull(directory, "directory");
+ guardIfWindows(FileUtils::cleanDirectoryInternal, directory);
+ }
+
+ private static void deleteFileOrDirectoryInternal(File file) throws IOException {
+ if (file.isDirectory()) {
+ // file exists and is directory
+ deleteDirectoryInternal(file);
+ }
+ else {
+ // if the file is already gone (concurrently), we don't mind
+ Files.deleteIfExists(file.toPath());
+ }
+ // else: already deleted
+ }
+
+ private static void deleteDirectoryInternal(File directory) throws IOException {
+ if (directory.isDirectory()) {
+ // directory exists and is a directory
+
+ // empty the directory first
+ try {
+ cleanDirectoryInternal(directory);
+ }
+ catch (FileNotFoundException ignored) {
+ // someone concurrently deleted the directory, nothing to do for us
+ return;
+ }
+
+ // delete the directory. this fails if the directory is not empty, meaning
+ // if new files got concurrently created. we want to fail then.
+ // if someone else deleted the empty directory concurrently, we don't mind
+ // the result is the same for us, after all
+ Files.deleteIfExists(directory.toPath());
+ }
+ else if (directory.exists()) {
+ // exists but is file, not directory
+ // either an error from the caller, or concurrently a file got created
+ throw new IOException(directory + " is not a directory");
+ }
+ // else: does not exist, which is okay (as if deleted)
+ }
+
+ private static void cleanDirectoryInternal(File directory) throws IOException {
if (directory.isDirectory()) {
final File[] files = directory.listFiles();
@@ -231,6 +239,39 @@ public final class FileUtils {
}
}
+ private static void guardIfWindows(ThrowingConsumer<File, IOException> toRun, File file) throws IOException {
+ if (!OperatingSystem.isWindows()) {
+ toRun.accept(file);
+ }
+ else {
+ // for windows, we synchronize on a global lock, to prevent concurrent delete issues
+ // >
+ // in the future, we may want to find either a good way of working around file visibility
+ // in Windows under concurrent operations (the behavior seems completely unpredictable)
+ // or make this locking more fine grained, for example on directory path prefixes
+ synchronized (WINDOWS_DELETE_LOCK) {
+ for (int attempt = 1; attempt <= 10; attempt++) {
+ try {
+ toRun.accept(file);
+ break;
+ }
+ catch (AccessDeniedException e) {
+ // ah, windows...
+ }
+
+ // briefly wait and fall through the loop
+ try {
+ Thread.sleep(1);
+ } catch (InterruptedException e) {
+ // restore the interruption flag and error out of the method
+ Thread.currentThread().interrupt();
+ throw new IOException("operation interrupted");
+ }
+ }
+ }
+ }
+ }
+
// ------------------------------------------------------------------------
// Deleting directories on Flink FileSystem abstraction
// ------------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/flink/blob/2e63d5a8/flink-core/src/main/java/org/apache/flink/util/LambdaUtil.java
----------------------------------------------------------------------
diff --git a/flink-core/src/main/java/org/apache/flink/util/LambdaUtil.java b/flink-core/src/main/java/org/apache/flink/util/LambdaUtil.java
index bce403a..bbcded9 100644
--- a/flink-core/src/main/java/org/apache/flink/util/LambdaUtil.java
+++ b/flink-core/src/main/java/org/apache/flink/util/LambdaUtil.java
@@ -41,7 +41,7 @@ public final class LambdaUtil {
*/
public static <T> void applyToAllWhileSuppressingExceptions(
Iterable<T> inputs,
- ThrowingConsumer<T> throwingConsumer) throws Exception {
+ ThrowingConsumer<T, ? extends Exception> throwingConsumer) throws Exception {
if (inputs != null && throwingConsumer != null) {
Exception exception = null;
http://git-wip-us.apache.org/repos/asf/flink/blob/2e63d5a8/flink-core/src/main/java/org/apache/flink/util/function/ThrowingConsumer.java
----------------------------------------------------------------------
diff --git a/flink-core/src/main/java/org/apache/flink/util/function/ThrowingConsumer.java b/flink-core/src/main/java/org/apache/flink/util/function/ThrowingConsumer.java
index 1452071..cd1703b 100644
--- a/flink-core/src/main/java/org/apache/flink/util/function/ThrowingConsumer.java
+++ b/flink-core/src/main/java/org/apache/flink/util/function/ThrowingConsumer.java
@@ -25,16 +25,17 @@ import org.apache.flink.annotation.Public;
* an exception.
*
* @param <T> type of the consumed elements.
+ * @param <E> type of the exception thrown.
*/
@Public
@FunctionalInterface
-public interface ThrowingConsumer<T> {
+public interface ThrowingConsumer<T, E extends Throwable> {
/**
* Performs this operation on the given argument.
*
* @param t the input argument
- * @throws Exception on errors during consumption
+ * @throws E on errors during consumption
*/
- void accept(T t) throws Exception;
+ void accept(T t) throws E;
}
http://git-wip-us.apache.org/repos/asf/flink/blob/2e63d5a8/tools/maven/spotbugs-exclude.xml
----------------------------------------------------------------------
diff --git a/tools/maven/spotbugs-exclude.xml b/tools/maven/spotbugs-exclude.xml
index 3da9268..b165464 100644
--- a/tools/maven/spotbugs-exclude.xml
+++ b/tools/maven/spotbugs-exclude.xml
@@ -22,6 +22,13 @@ under the License.
<Match>
<Package name="~org\.apache\.calcite.*"/>
</Match>
+
+ <Match>
+ <!-- Intentional hack to reduce calls to the OS; locking is required for concurrency -->
+ <Bug pattern="SWL_SLEEP_WITH_LOCK_HELD"/>
+ <Class name="org.apache.flink.util.FileUtils"/>
+ <Method name="guardIfWindows"/>
+ </Match>
<Match>
<Bug pattern="VA_FORMAT_STRING_USES_NEWLINE"/>