You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by se...@apache.org on 2014/11/07 11:12:43 UTC

[3/3] incubator-flink git commit: [FLINK-1215] Fix spurious failures when creating output directories due to I/O races

[FLINK-1215] Fix spurious failures when creating output directories due to I/O races


Project: http://git-wip-us.apache.org/repos/asf/incubator-flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-flink/commit/e5804971
Tree: http://git-wip-us.apache.org/repos/asf/incubator-flink/tree/e5804971
Diff: http://git-wip-us.apache.org/repos/asf/incubator-flink/diff/e5804971

Branch: refs/heads/master
Commit: e58049711e4275d86197223b7efcb47d2f801244
Parents: ef9a373
Author: Stephan Ewen <se...@apache.org>
Authored: Thu Nov 6 20:33:38 2014 +0100
Committer: Stephan Ewen <se...@apache.org>
Committed: Thu Nov 6 20:36:46 2014 +0100

----------------------------------------------------------------------
 .../org/apache/flink/core/fs/FileSystem.java    | 62 +++++++++++++++-----
 1 file changed, 46 insertions(+), 16 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/e5804971/flink-core/src/main/java/org/apache/flink/core/fs/FileSystem.java
----------------------------------------------------------------------
diff --git a/flink-core/src/main/java/org/apache/flink/core/fs/FileSystem.java b/flink-core/src/main/java/org/apache/flink/core/fs/FileSystem.java
index 1b7b91e..cb3a751 100644
--- a/flink-core/src/main/java/org/apache/flink/core/fs/FileSystem.java
+++ b/flink-core/src/main/java/org/apache/flink/core/fs/FileSystem.java
@@ -517,24 +517,54 @@ public abstract class FileSystem {
 		
 		if (createDirectory) {
 			// Output directory needs to be created
-			try {
-				if(!this.exists(outPath)) {
-					this.mkdirs(outPath);
+			
+			// NOTE: we sometimes see this code block fail due to a race:
+			// - the check whether the directory exists returns false
+			// - the call to create the directory fails (some concurrent thread is creating the directory)
+			// - the call to check whether the directory exists does not yet see the new directory
+			
+			// try for 30 seconds
+			long now = System.currentTimeMillis();
+			long deadline = now + 30000;
+			
+			do {
+				try {
+					if(!this.exists(outPath)) {
+						this.mkdirs(outPath);
+					}
 				}
-			} catch(IOException ioe) {
-				// Some other thread might already have created the directory.
-				// If - for some other reason - the directory could not be created  
-				// and the path does not exist, this will be handled later.
-			}
-	
-			// double check that the output directory exists
-			try {
-				FileStatus check = getFileStatus(outPath);
-				return check.isDir();
-			} catch (FileNotFoundException e) {
-				return false;
+				catch (IOException ioe) {
+					// Some other thread might already have created the directory.
+					// If - for some other reason - the directory could not be created  
+					// and the path does not exist, this will be handled later.
+				}
+		
+				// double check that the output directory exists
+				try {
+					FileStatus check = getFileStatus(outPath);
+					if (check != null) {
+						if (check.isDir()) {
+							return true;
+						} else {
+							throw new IOException("FileSystem should create an output directory, but the path points to a file instead.");
+						}
+					}
+					// else: fall through the loop
+				}
+				catch (FileNotFoundException e) {
+					// fall though the loop
+				}
+				
+				// delay to allow other threads to make progress in the I/O calls
+				try {
+					Thread.sleep(1000);
+				} catch (InterruptedException ie) {}
 			}
-		} else {
+			while (System.currentTimeMillis() < deadline);
+			
+			return false;
+		}
+		else {
 			// check that the output path does not exist and an output file can be created by the output format.
 			return !this.exists(outPath);
 		}