You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by se...@apache.org on 2014/11/07 11:12:43 UTC
[3/3] incubator-flink git commit: [FLINK-1215] Fix spurious failures
when creating output directories due to I/O races
[FLINK-1215] Fix spurious failures when creating output directories due to I/O races
Project: http://git-wip-us.apache.org/repos/asf/incubator-flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-flink/commit/e5804971
Tree: http://git-wip-us.apache.org/repos/asf/incubator-flink/tree/e5804971
Diff: http://git-wip-us.apache.org/repos/asf/incubator-flink/diff/e5804971
Branch: refs/heads/master
Commit: e58049711e4275d86197223b7efcb47d2f801244
Parents: ef9a373
Author: Stephan Ewen <se...@apache.org>
Authored: Thu Nov 6 20:33:38 2014 +0100
Committer: Stephan Ewen <se...@apache.org>
Committed: Thu Nov 6 20:36:46 2014 +0100
----------------------------------------------------------------------
.../org/apache/flink/core/fs/FileSystem.java | 62 +++++++++++++++-----
1 file changed, 46 insertions(+), 16 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/e5804971/flink-core/src/main/java/org/apache/flink/core/fs/FileSystem.java
----------------------------------------------------------------------
diff --git a/flink-core/src/main/java/org/apache/flink/core/fs/FileSystem.java b/flink-core/src/main/java/org/apache/flink/core/fs/FileSystem.java
index 1b7b91e..cb3a751 100644
--- a/flink-core/src/main/java/org/apache/flink/core/fs/FileSystem.java
+++ b/flink-core/src/main/java/org/apache/flink/core/fs/FileSystem.java
@@ -517,24 +517,54 @@ public abstract class FileSystem {
if (createDirectory) {
// Output directory needs to be created
- try {
- if(!this.exists(outPath)) {
- this.mkdirs(outPath);
+
+ // NOTE: we sometimes see this code block fail due to a race:
+ // - the check whether the directory exists returns false
+ // - the call to create the directory fails (some concurrent thread is creating the directory)
+ // - the call to check whether the directory exists does not yet see the new directory
+
+ // try for 30 seconds
+ long now = System.currentTimeMillis();
+ long deadline = now + 30000;
+
+ do {
+ try {
+ if(!this.exists(outPath)) {
+ this.mkdirs(outPath);
+ }
}
- } catch(IOException ioe) {
- // Some other thread might already have created the directory.
- // If - for some other reason - the directory could not be created
- // and the path does not exist, this will be handled later.
- }
-
- // double check that the output directory exists
- try {
- FileStatus check = getFileStatus(outPath);
- return check.isDir();
- } catch (FileNotFoundException e) {
- return false;
+ catch (IOException ioe) {
+ // Some other thread might already have created the directory.
+ // If - for some other reason - the directory could not be created
+ // and the path does not exist, this will be handled later.
+ }
+
+ // double check that the output directory exists
+ try {
+ FileStatus check = getFileStatus(outPath);
+ if (check != null) {
+ if (check.isDir()) {
+ return true;
+ } else {
+ throw new IOException("FileSystem should create an output directory, but the path points to a file instead.");
+ }
+ }
+ // else: fall through the loop
+ }
+ catch (FileNotFoundException e) {
+ // fall though the loop
+ }
+
+ // delay to allow other threads to make progress in the I/O calls
+ try {
+ Thread.sleep(1000);
+ } catch (InterruptedException ie) {}
}
- } else {
+ while (System.currentTimeMillis() < deadline);
+
+ return false;
+ }
+ else {
// check that the output path does not exist and an output file can be created by the output format.
return !this.exists(outPath);
}