You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by se...@apache.org on 2014/11/10 16:24:56 UTC
[6/7] incubator-flink git commit: [FLINK-1215] Increase robustness to
spurious failures when creating output directories
[FLINK-1215] Increase robustness to spurious failures when creating output directories
Project: http://git-wip-us.apache.org/repos/asf/incubator-flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-flink/commit/81c5b2ad
Tree: http://git-wip-us.apache.org/repos/asf/incubator-flink/tree/81c5b2ad
Diff: http://git-wip-us.apache.org/repos/asf/incubator-flink/diff/81c5b2ad
Branch: refs/heads/master
Commit: 81c5b2adee778532634e52b7915ec682bb939a15
Parents: 4b75d83
Author: Stephan Ewen <se...@apache.org>
Authored: Fri Nov 7 14:03:53 2014 +0100
Committer: Stephan Ewen <se...@apache.org>
Committed: Mon Nov 10 11:59:56 2014 +0100
----------------------------------------------------------------------
.../org/apache/flink/core/fs/FileSystem.java | 169 ++++++++++---------
.../test/recordJobTests/TPCHQuery10ITCase.java | 2 +-
2 files changed, 94 insertions(+), 77 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/81c5b2ad/flink-core/src/main/java/org/apache/flink/core/fs/FileSystem.java
----------------------------------------------------------------------
diff --git a/flink-core/src/main/java/org/apache/flink/core/fs/FileSystem.java b/flink-core/src/main/java/org/apache/flink/core/fs/FileSystem.java
index cb3a751..91780fd 100644
--- a/flink-core/src/main/java/org/apache/flink/core/fs/FileSystem.java
+++ b/flink-core/src/main/java/org/apache/flink/core/fs/FileSystem.java
@@ -458,85 +458,93 @@ public abstract class FileSystem {
* @throws IOException
*/
public boolean initOutPathLocalFS(Path outPath, WriteMode writeMode, boolean createDirectory) throws IOException {
- if(this.isDistributedFS()) {
+ if (this.isDistributedFS()) {
return false;
}
- FileStatus status = null;
- try {
- status = getFileStatus(outPath);
- }
- catch (FileNotFoundException e) {
- // okay, the file is not there
- }
+ // NOTE: we sometimes see this code block fail due to a races when changes to the file system take small time fractions before being
+ // visible to other threads. for example:
+ // - the check whether the directory exists returns false
+ // - the call to create the directory fails (some concurrent thread is creating the directory, locked)
+ // - the call to check whether the directory exists does not yet see the new directory (change is not committed)
- // check if path exists
- if (status != null) {
- // path exists, check write mode
- switch (writeMode) {
- case NO_OVERWRITE:
- if (status.isDir() && createDirectory) {
- return true;
- } else {
- // file may not be overwritten
- throw new IOException("File or directory already exists. Existing files and directories are not overwritten in " +
- WriteMode.NO_OVERWRITE.name() + " mode. Use " + WriteMode.OVERWRITE.name() +
- " mode to overwrite existing files and directories.");
- }
-
- case OVERWRITE:
- if (status.isDir()) {
- if (createDirectory) {
- // directory exists and does not need to be created
+ // try for 30 seconds
+ final long now = System.currentTimeMillis();
+ final long deadline = now + 30000;
+
+ Exception lastError = null;
+
+ do {
+ FileStatus status = null;
+ try {
+ status = getFileStatus(outPath);
+ }
+ catch (FileNotFoundException e) {
+ // okay, the file is not there
+ }
+
+ // check if path exists
+ if (status != null) {
+ // path exists, check write mode
+ switch (writeMode) {
+ case NO_OVERWRITE:
+ if (status.isDir() && createDirectory) {
return true;
} else {
- // we will write in a single file, delete directory (there is also no other thread trying to delete the directory).
- try {
- this.delete(outPath, true);
- } catch(IOException ioe) {
- // due to races in some file systems, it may spuriously occur that a deleted the file looks
- // as if it still exists and is gone a millisecond later, once the change is committed
- // we ignore the exception
+ // file may not be overwritten
+ throw new IOException("File or directory already exists. Existing files and directories are not overwritten in " +
+ WriteMode.NO_OVERWRITE.name() + " mode. Use " + WriteMode.OVERWRITE.name() +
+ " mode to overwrite existing files and directories.");
+ }
+
+ case OVERWRITE:
+ if (status.isDir()) {
+ if (createDirectory) {
+ // directory exists and does not need to be created
+ return true;
+ } else {
+ // we will write in a single file, delete directory
+ // (there is also no other thread trying to delete the directory, since there is only one writer).
+ try {
+ this.delete(outPath, true);
+ }
+ catch (IOException e) {
+ // due to races in some file systems, it may spuriously occur that a deleted the file looks
+ // as if it still exists and is gone a millisecond later, once the change is committed
+ // we ignore the exception, possibly fall through the loop later
+ lastError = e;
+ }
}
}
- } else {
- // delete file
- try {
- this.delete(outPath, false);
- } catch(IOException ioe) {
- // Some other thread might already have deleted the file.
- // If - for some other reason - the file could not be deleted,
- // the error will be handled later.
+ else {
+ // delete file
+ try {
+ this.delete(outPath, false);
+ }
+ catch (IOException e) {
+ // Some other thread might already have deleted the file.
+ // If - for some other reason - the file could not be deleted,
+ // the error will be handled later.
+ lastError = e;
+ }
}
+ break;
+ default:
+ throw new IllegalArgumentException("Invalid write mode: " + writeMode);
}
- break;
- default:
- throw new IllegalArgumentException("Invalid write mode: "+writeMode);
}
- }
-
- if (createDirectory) {
- // Output directory needs to be created
- // NOTE: we sometimes see this code block fail due to a race:
- // - the check whether the directory exists returns false
- // - the call to create the directory fails (some concurrent thread is creating the directory)
- // - the call to check whether the directory exists does not yet see the new directory
-
- // try for 30 seconds
- long now = System.currentTimeMillis();
- long deadline = now + 30000;
-
- do {
+ if (createDirectory) {
+ // Output directory needs to be created
+
try {
- if(!this.exists(outPath)) {
+ if (!this.exists(outPath)) {
this.mkdirs(outPath);
}
}
- catch (IOException ioe) {
- // Some other thread might already have created the directory.
- // If - for some other reason - the directory could not be created
- // and the path does not exist, this will be handled later.
+ catch (IOException e) {
+ // Some other thread might already have created the directory concurrently.
+ lastError = e;
}
// double check that the output directory exists
@@ -545,28 +553,37 @@ public abstract class FileSystem {
if (check != null) {
if (check.isDir()) {
return true;
- } else {
- throw new IOException("FileSystem should create an output directory, but the path points to a file instead.");
+ }
+ else {
+ lastError = new IOException("FileSystem should create an output directory, but the path points to a file instead.");
}
}
- // else: fall through the loop
+ // fall through the loop
}
catch (FileNotFoundException e) {
// fall though the loop
}
-
- // delay to allow other threads to make progress in the I/O calls
- try {
- Thread.sleep(1000);
- } catch (InterruptedException ie) {}
+
+ }
+ else {
+ // check that the output path does not exist and an output file can be created by the output format.
+ return !this.exists(outPath);
}
- while (System.currentTimeMillis() < deadline);
- return false;
+ // small delay to allow changes to make progress
+ try {
+ Thread.sleep(10);
+ }
+ catch (InterruptedException e) {
+ throw new IOException("Thread was interrupted");
+ }
}
- else {
- // check that the output path does not exist and an output file can be created by the output format.
- return !this.exists(outPath);
+ while (System.currentTimeMillis() < deadline);
+
+ if (lastError != null) {
+ throw new IOException("File system failed to prepare output path " + outPath + " with write mode " + writeMode.name(), lastError);
+ } else {
+ return false;
}
}
http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/81c5b2ad/flink-tests/src/test/java/org/apache/flink/test/recordJobTests/TPCHQuery10ITCase.java
----------------------------------------------------------------------
diff --git a/flink-tests/src/test/java/org/apache/flink/test/recordJobTests/TPCHQuery10ITCase.java b/flink-tests/src/test/java/org/apache/flink/test/recordJobTests/TPCHQuery10ITCase.java
index a6a9897..b8eb4d2 100644
--- a/flink-tests/src/test/java/org/apache/flink/test/recordJobTests/TPCHQuery10ITCase.java
+++ b/flink-tests/src/test/java/org/apache/flink/test/recordJobTests/TPCHQuery10ITCase.java
@@ -194,7 +194,7 @@ public class TPCHQuery10ITCase extends RecordAPITestBase {
lineitemsPath = createTempFile("line_items.txt", LINEITEMS);
customersPath = createTempFile("customers.txt", CUSTOMERS);
nationsPath = createTempFile("nations.txt", NATIONS);
- resultPath = createTempFile("result.txt", EXPECTED_RESULT);
+ resultPath = getTempDirPath("result");
}
@Override