You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by ch...@apache.org on 2018/01/30 14:55:45 UTC

[2/5] flink git commit: [FLINK-5659] [core] Prevent concurrent FileUtils.deleteDirectory() under Windows

[FLINK-5659] [core] Prevent concurrent FileUtils.deleteDirectory() under Windows

This serializes these types of cleanup calls under Windows, to work around the
fact that Windows exhibits unpredictable behavior under concurretn file/directory
delete operations. That in turn causes nasty exception stack traces on shutdown
operations where components like I/O manager and Blob Server concurrently clean
up their directories, which may overlap in some subdirectories.


Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/2e63d5a8
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/2e63d5a8
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/2e63d5a8

Branch: refs/heads/master
Commit: 2e63d5a8ec1fa874a61061b72b970879f14c86d9
Parents: e135c3a
Author: Stephan Ewen <se...@apache.org>
Authored: Wed Jan 24 18:43:47 2018 +0100
Committer: zentol <ch...@apache.org>
Committed: Tue Jan 30 15:54:16 2018 +0100

----------------------------------------------------------------------
 .../java/org/apache/flink/util/FileUtils.java   | 125 ++++++++++++-------
 .../java/org/apache/flink/util/LambdaUtil.java  |   2 +-
 .../flink/util/function/ThrowingConsumer.java   |   7 +-
 tools/maven/spotbugs-exclude.xml                |   7 ++
 4 files changed, 95 insertions(+), 46 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/flink/blob/2e63d5a8/flink-core/src/main/java/org/apache/flink/util/FileUtils.java
----------------------------------------------------------------------
diff --git a/flink-core/src/main/java/org/apache/flink/util/FileUtils.java b/flink-core/src/main/java/org/apache/flink/util/FileUtils.java
index 6d15b2e..f47825c 100644
--- a/flink-core/src/main/java/org/apache/flink/util/FileUtils.java
+++ b/flink-core/src/main/java/org/apache/flink/util/FileUtils.java
@@ -21,12 +21,13 @@ package org.apache.flink.util;
 import org.apache.flink.core.fs.FileStatus;
 import org.apache.flink.core.fs.FileSystem;
 import org.apache.flink.core.fs.Path;
+import org.apache.flink.util.function.ThrowingConsumer;
 
 import java.io.File;
 import java.io.FileNotFoundException;
 import java.io.IOException;
+import java.nio.file.AccessDeniedException;
 import java.nio.file.Files;
-import java.nio.file.NoSuchFileException;
 import java.nio.file.StandardOpenOption;
 import java.util.Random;
 
@@ -38,6 +39,9 @@ import static org.apache.flink.util.Preconditions.checkNotNull;
  */
 public final class FileUtils {
 
+	/** Global lock to prevent concurrent directory deletes under Windows. */
+	private static final Object WINDOWS_DELETE_LOCK = new Object();
+
 	/** The alphabet to construct the random part of the filename from. */
 	private static final char[] ALPHABET =
 			{ '0', '1', '2', '3', '4', '5', '6', '7', '8', '9', '0', 'a', 'b', 'c', 'd', 'e', 'f' };
@@ -108,19 +112,7 @@ public final class FileUtils {
 	public static void deleteFileOrDirectory(File file) throws IOException {
 		checkNotNull(file, "file");
 
-		if (file.isDirectory()) {
-			// file exists and is directory
-			deleteDirectory(file);
-		}
-		else if (file.exists()) {
-			try {
-				Files.delete(file.toPath());
-			}
-			catch (NoSuchFileException e) {
-				// if the file is already gone (concurrently), we don't mind
-			}
-		}
-		// else: already deleted
+		guardIfWindows(FileUtils::deleteFileOrDirectoryInternal, file);
 	}
 
 	/**
@@ -138,34 +130,7 @@ public final class FileUtils {
 	public static void deleteDirectory(File directory) throws IOException {
 		checkNotNull(directory, "directory");
 
-		if (directory.isDirectory()) {
-			// directory exists and is a directory
-
-			// empty the directory first
-			try {
-				cleanDirectory(directory);
-			}
-			catch (FileNotFoundException ignored) {
-				// someone concurrently deleted the directory, nothing to do for us
-				return;
-			}
-
-			// delete the directory. this fails if the directory is not empty, meaning
-			// if new files got concurrently created. we want to fail then.
-			try {
-				Files.delete(directory.toPath());
-			}
-			catch (NoSuchFileException ignored) {
-				// if someone else deleted this concurrently, we don't mind
-				// the result is the same for us, after all
-			}
-		}
-		else if (directory.exists()) {
-			// exists but is file, not directory
-			// either an error from the caller, or concurrently a file got created
-			throw new IOException(directory + " is not a directory");
-		}
-		// else: does not exist, which is okay (as if deleted)
+		guardIfWindows(FileUtils::deleteDirectoryInternal, directory);
 	}
 
 	/**
@@ -203,6 +168,49 @@ public final class FileUtils {
 	public static void cleanDirectory(File directory) throws IOException {
 		checkNotNull(directory, "directory");
 
+		guardIfWindows(FileUtils::cleanDirectoryInternal, directory);
+	}
+
+	private static void deleteFileOrDirectoryInternal(File file) throws IOException {
+		if (file.isDirectory()) {
+			// file exists and is directory
+			deleteDirectoryInternal(file);
+		}
+		else {
+			// if the file is already gone (concurrently), we don't mind
+			Files.deleteIfExists(file.toPath());
+		}
+		// else: already deleted
+	}
+
+	private static void deleteDirectoryInternal(File directory) throws IOException {
+		if (directory.isDirectory()) {
+			// directory exists and is a directory
+
+			// empty the directory first
+			try {
+				cleanDirectoryInternal(directory);
+			}
+			catch (FileNotFoundException ignored) {
+				// someone concurrently deleted the directory, nothing to do for us
+				return;
+			}
+
+			// delete the directory. this fails if the directory is not empty, meaning
+			// if new files got concurrently created. we want to fail then.
+			// if someone else deleted the empty directory concurrently, we don't mind
+			// the result is the same for us, after all
+			Files.deleteIfExists(directory.toPath());
+		}
+		else if (directory.exists()) {
+			// exists but is file, not directory
+			// either an error from the caller, or concurrently a file got created
+			throw new IOException(directory + " is not a directory");
+		}
+		// else: does not exist, which is okay (as if deleted)
+	}
+
+	private static void cleanDirectoryInternal(File directory) throws IOException {
 		if (directory.isDirectory()) {
 			final File[] files = directory.listFiles();
 
@@ -231,6 +239,39 @@ public final class FileUtils {
 		}
 	}
 
+	private static void guardIfWindows(ThrowingConsumer<File, IOException> toRun, File file) throws IOException {
+		if (!OperatingSystem.isWindows()) {
+			toRun.accept(file);
+		}
+		else {
+			// for windows, we synchronize on a global lock, to prevent concurrent delete issues
+			// >
+			// in the future, we may want to find either a good way of working around file visibility
+			// in Windows under concurrent operations (the behavior seems completely unpredictable)
+			// or  make this locking more fine grained, for example  on directory path prefixes
+			synchronized (WINDOWS_DELETE_LOCK) {
+				for (int attempt = 1; attempt <= 10; attempt++) {
+					try {
+						toRun.accept(file);
+						break;
+					}
+					catch (AccessDeniedException e) {
+						// ah, windows...
+					}
+
+					// briefly wait and fall through the loop
+					try {
+						Thread.sleep(1);
+					} catch (InterruptedException e) {
+						// restore the interruption flag and error out of the method
+						Thread.currentThread().interrupt();
+						throw new IOException("operation interrupted");
+					}
+				}
+			}
+		}
+	}
+
 	// ------------------------------------------------------------------------
 	//  Deleting directories on Flink FileSystem abstraction
 	// ------------------------------------------------------------------------

http://git-wip-us.apache.org/repos/asf/flink/blob/2e63d5a8/flink-core/src/main/java/org/apache/flink/util/LambdaUtil.java
----------------------------------------------------------------------
diff --git a/flink-core/src/main/java/org/apache/flink/util/LambdaUtil.java b/flink-core/src/main/java/org/apache/flink/util/LambdaUtil.java
index bce403a..bbcded9 100644
--- a/flink-core/src/main/java/org/apache/flink/util/LambdaUtil.java
+++ b/flink-core/src/main/java/org/apache/flink/util/LambdaUtil.java
@@ -41,7 +41,7 @@ public final class LambdaUtil {
 	 */
 	public static <T> void applyToAllWhileSuppressingExceptions(
 		Iterable<T> inputs,
-		ThrowingConsumer<T> throwingConsumer) throws Exception {
+		ThrowingConsumer<T, ? extends Exception> throwingConsumer) throws Exception {
 
 		if (inputs != null && throwingConsumer != null) {
 			Exception exception = null;

http://git-wip-us.apache.org/repos/asf/flink/blob/2e63d5a8/flink-core/src/main/java/org/apache/flink/util/function/ThrowingConsumer.java
----------------------------------------------------------------------
diff --git a/flink-core/src/main/java/org/apache/flink/util/function/ThrowingConsumer.java b/flink-core/src/main/java/org/apache/flink/util/function/ThrowingConsumer.java
index 1452071..cd1703b 100644
--- a/flink-core/src/main/java/org/apache/flink/util/function/ThrowingConsumer.java
+++ b/flink-core/src/main/java/org/apache/flink/util/function/ThrowingConsumer.java
@@ -25,16 +25,17 @@ import org.apache.flink.annotation.Public;
  * an exception.
  *
  * @param <T> type of the consumed elements.
+ * @param <E> type of the exception thrown.
  */
 @Public
 @FunctionalInterface
-public interface ThrowingConsumer<T> {
+public interface ThrowingConsumer<T, E extends Throwable> {
 
 	/**
 	 * Performs this operation on the given argument.
 	 *
 	 * @param t the input argument
-	 * @throws Exception on errors during consumption
+	 * @throws E on errors during consumption
 	 */
-	void accept(T t) throws Exception;
+	void accept(T t) throws E;
 }

http://git-wip-us.apache.org/repos/asf/flink/blob/2e63d5a8/tools/maven/spotbugs-exclude.xml
----------------------------------------------------------------------
diff --git a/tools/maven/spotbugs-exclude.xml b/tools/maven/spotbugs-exclude.xml
index 3da9268..b165464 100644
--- a/tools/maven/spotbugs-exclude.xml
+++ b/tools/maven/spotbugs-exclude.xml
@@ -22,6 +22,13 @@ under the License.
 	<Match>
     	<Package name="~org\.apache\.calcite.*"/>
   	</Match>
+
+	<Match>
+		<!-- Intentional hack to reduce calls to the OS; locking is required for concurrency -->
+		<Bug pattern="SWL_SLEEP_WITH_LOCK_HELD"/>
+		<Class name="org.apache.flink.util.FileUtils"/>
+		<Method name="guardIfWindows"/>
+	</Match>
   	
 	<Match>
 		<Bug pattern="VA_FORMAT_STRING_USES_NEWLINE"/>