You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by se...@apache.org on 2014/11/10 16:24:56 UTC

[6/7] incubator-flink git commit: [FLINK-1215] Increase robustness to spurious failures when creating output directories

[FLINK-1215] Increase robustness to spurious failures when creating output directories


Project: http://git-wip-us.apache.org/repos/asf/incubator-flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-flink/commit/81c5b2ad
Tree: http://git-wip-us.apache.org/repos/asf/incubator-flink/tree/81c5b2ad
Diff: http://git-wip-us.apache.org/repos/asf/incubator-flink/diff/81c5b2ad

Branch: refs/heads/master
Commit: 81c5b2adee778532634e52b7915ec682bb939a15
Parents: 4b75d83
Author: Stephan Ewen <se...@apache.org>
Authored: Fri Nov 7 14:03:53 2014 +0100
Committer: Stephan Ewen <se...@apache.org>
Committed: Mon Nov 10 11:59:56 2014 +0100

----------------------------------------------------------------------
 .../org/apache/flink/core/fs/FileSystem.java    | 169 ++++++++++---------
 .../test/recordJobTests/TPCHQuery10ITCase.java  |   2 +-
 2 files changed, 94 insertions(+), 77 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/81c5b2ad/flink-core/src/main/java/org/apache/flink/core/fs/FileSystem.java
----------------------------------------------------------------------
diff --git a/flink-core/src/main/java/org/apache/flink/core/fs/FileSystem.java b/flink-core/src/main/java/org/apache/flink/core/fs/FileSystem.java
index cb3a751..91780fd 100644
--- a/flink-core/src/main/java/org/apache/flink/core/fs/FileSystem.java
+++ b/flink-core/src/main/java/org/apache/flink/core/fs/FileSystem.java
@@ -458,85 +458,93 @@ public abstract class FileSystem {
 	 * @throws IOException
 	 */
 	public boolean initOutPathLocalFS(Path outPath, WriteMode writeMode, boolean createDirectory) throws IOException {
-		if(this.isDistributedFS()) {
+		if (this.isDistributedFS()) {
 			return false;
 		}
 		
-		FileStatus status = null;
-		try {
-			status = getFileStatus(outPath);
-		}
-		catch (FileNotFoundException e) {
-			// okay, the file is not there
-		}
+		// NOTE: we sometimes see this code block fail due to a races when changes to the file system take small time fractions before being
+		//       visible to other threads. for example:
+		// - the check whether the directory exists returns false
+		// - the call to create the directory fails (some concurrent thread is creating the directory, locked)
+		// - the call to check whether the directory exists does not yet see the new directory (change is not committed)
 		
-		// check if path exists
-		if (status != null) {
-			// path exists, check write mode
-			switch (writeMode) {
-			case NO_OVERWRITE:
-				if (status.isDir() && createDirectory) {
-					return true;
-				} else {
-					// file may not be overwritten
-					throw new IOException("File or directory already exists. Existing files and directories are not overwritten in " + 
-							WriteMode.NO_OVERWRITE.name() + " mode. Use " + WriteMode.OVERWRITE.name() + 
-							" mode to overwrite existing files and directories.");
-				}
-
-			case OVERWRITE:
-				if (status.isDir()) {
-					if (createDirectory) {
-						// directory exists and does not need to be created
+		// try for 30 seconds
+		final long now = System.currentTimeMillis();
+		final long deadline = now + 30000;
+		
+		Exception lastError = null;
+		
+		do {
+			FileStatus status = null;
+			try {
+				status = getFileStatus(outPath);
+			}
+			catch (FileNotFoundException e) {
+				// okay, the file is not there
+			}
+			
+			// check if path exists
+			if (status != null) {
+				// path exists, check write mode
+				switch (writeMode) {
+				case NO_OVERWRITE:
+					if (status.isDir() && createDirectory) {
 						return true;
 					} else {
-						// we will write in a single file, delete directory (there is also no other thread trying to delete the directory).
-						try {
-							this.delete(outPath, true);
-						} catch(IOException ioe) {
-							// due to races in some file systems, it may spuriously occur that a deleted the file looks
-							// as if it still exists and is gone a millisecond later, once the change is committed
-							// we ignore the exception
+						// file may not be overwritten
+						throw new IOException("File or directory already exists. Existing files and directories are not overwritten in " + 
+								WriteMode.NO_OVERWRITE.name() + " mode. Use " + WriteMode.OVERWRITE.name() + 
+								" mode to overwrite existing files and directories.");
+					}
+	
+				case OVERWRITE:
+					if (status.isDir()) {
+						if (createDirectory) {
+							// directory exists and does not need to be created
+							return true;
+						} else {
+							// we will write in a single file, delete directory
+							// (there is also no other thread trying to delete the directory, since there is only one writer).
+							try {
+								this.delete(outPath, true);
+							}
+							catch (IOException e) {
+								// due to races in some file systems, it may spuriously occur that a deleted the file looks
+								// as if it still exists and is gone a millisecond later, once the change is committed
+								// we ignore the exception, possibly fall through the loop later
+								lastError = e;
+							}
 						}
 					}
-				} else {
-					// delete file
-					try {
-						this.delete(outPath, false);
-					} catch(IOException ioe) {
-						// Some other thread might already have deleted the file.
-						// If - for some other reason - the file could not be deleted,  
-						// the error will be handled later.
+					else {
+						// delete file
+						try {
+							this.delete(outPath, false);
+						}
+						catch (IOException e) {
+							// Some other thread might already have deleted the file.
+							// If - for some other reason - the file could not be deleted,  
+							// the error will be handled later.
+							lastError = e;
+						}
 					}
+					break;
+				default:
+					throw new IllegalArgumentException("Invalid write mode: " + writeMode);
 				}
-				break;
-			default:
-				throw new IllegalArgumentException("Invalid write mode: "+writeMode);
 			}
-		}
-		
-		if (createDirectory) {
-			// Output directory needs to be created
 			
-			// NOTE: we sometimes see this code block fail due to a race:
-			// - the check whether the directory exists returns false
-			// - the call to create the directory fails (some concurrent thread is creating the directory)
-			// - the call to check whether the directory exists does not yet see the new directory
-			
-			// try for 30 seconds
-			long now = System.currentTimeMillis();
-			long deadline = now + 30000;
-			
-			do {
+			if (createDirectory) {
+				// Output directory needs to be created
+				
 				try {
-					if(!this.exists(outPath)) {
+					if (!this.exists(outPath)) {
 						this.mkdirs(outPath);
 					}
 				}
-				catch (IOException ioe) {
-					// Some other thread might already have created the directory.
-					// If - for some other reason - the directory could not be created  
-					// and the path does not exist, this will be handled later.
+				catch (IOException e) {
+					// Some other thread might already have created the directory concurrently.
+					lastError = e;
 				}
 		
 				// double check that the output directory exists
@@ -545,28 +553,37 @@ public abstract class FileSystem {
 					if (check != null) {
 						if (check.isDir()) {
 							return true;
-						} else {
-							throw new IOException("FileSystem should create an output directory, but the path points to a file instead.");
+						}
+						else {
+							lastError = new IOException("FileSystem should create an output directory, but the path points to a file instead.");
 						}
 					}
-					// else: fall through the loop
+					// fall through the loop
 				}
 				catch (FileNotFoundException e) {
 					// fall though the loop
 				}
-				
-				// delay to allow other threads to make progress in the I/O calls
-				try {
-					Thread.sleep(1000);
-				} catch (InterruptedException ie) {}
+					
+			}
+			else {
+				// check that the output path does not exist and an output file can be created by the output format.
+				return !this.exists(outPath);
 			}
-			while (System.currentTimeMillis() < deadline);
 			
-			return false;
+			// small delay to allow changes to make progress
+			try {
+				Thread.sleep(10);
+			}
+			catch (InterruptedException e) {
+				throw new IOException("Thread was interrupted");
+			}
 		}
-		else {
-			// check that the output path does not exist and an output file can be created by the output format.
-			return !this.exists(outPath);
+		while (System.currentTimeMillis() < deadline);
+		
+		if (lastError != null) {
+			throw new IOException("File system failed to prepare output path " + outPath + " with write mode " + writeMode.name(), lastError);
+		} else {
+			return false;
 		}
 	}
 	

http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/81c5b2ad/flink-tests/src/test/java/org/apache/flink/test/recordJobTests/TPCHQuery10ITCase.java
----------------------------------------------------------------------
diff --git a/flink-tests/src/test/java/org/apache/flink/test/recordJobTests/TPCHQuery10ITCase.java b/flink-tests/src/test/java/org/apache/flink/test/recordJobTests/TPCHQuery10ITCase.java
index a6a9897..b8eb4d2 100644
--- a/flink-tests/src/test/java/org/apache/flink/test/recordJobTests/TPCHQuery10ITCase.java
+++ b/flink-tests/src/test/java/org/apache/flink/test/recordJobTests/TPCHQuery10ITCase.java
@@ -194,7 +194,7 @@ public class TPCHQuery10ITCase extends RecordAPITestBase {
 		lineitemsPath = createTempFile("line_items.txt", LINEITEMS);
 		customersPath = createTempFile("customers.txt", CUSTOMERS);
 		nationsPath = createTempFile("nations.txt", NATIONS);
-		resultPath = createTempFile("result.txt", EXPECTED_RESULT);
+		resultPath = getTempDirPath("result");
 	}
 
 	@Override