You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by sr...@apache.org on 2018/04/19 09:12:43 UTC

[2/4] flink git commit: [hotfix][checkpointing] Double check if local state directory exists to avoid problem with concurrent directory creation.

[hotfix][checkpointing] Double check if local state directory exists to avoid problem with concurrent directory creation.


Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/a455d6a7
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/a455d6a7
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/a455d6a7

Branch: refs/heads/master
Commit: a455d6a77e5678140bc4e19c75c29905b25cbe0b
Parents: 777cc1a
Author: Stefan Richter <s....@data-artisans.com>
Authored: Wed Apr 18 10:50:24 2018 +0200
Committer: Stefan Richter <s....@data-artisans.com>
Committed: Thu Apr 19 11:12:22 2018 +0200

----------------------------------------------------------------------
 .../state/TaskExecutorLocalStateStoresManager.java       | 11 ++++++-----
 1 file changed, 6 insertions(+), 5 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/flink/blob/a455d6a7/flink-runtime/src/main/java/org/apache/flink/runtime/state/TaskExecutorLocalStateStoresManager.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/state/TaskExecutorLocalStateStoresManager.java b/flink-runtime/src/main/java/org/apache/flink/runtime/state/TaskExecutorLocalStateStoresManager.java
index e7a7d8f..518ad81 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/state/TaskExecutorLocalStateStoresManager.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/state/TaskExecutorLocalStateStoresManager.java
@@ -83,12 +83,13 @@ public class TaskExecutorLocalStateStoresManager {
 		this.closed = false;
 
 		for (File localStateRecoveryRootDir : localStateRootDirectories) {
-			if (!localStateRecoveryRootDir.exists()) {
 
-				if (!localStateRecoveryRootDir.mkdirs()) {
-					throw new IOException("Could not create root directory for local recovery: " +
-						localStateRecoveryRootDir);
-				}
+			if (!localStateRecoveryRootDir.exists()
+				&& !localStateRecoveryRootDir.mkdirs()
+				// we double check for exists in case another task created the directory concurrently.
+				&& !localStateRecoveryRootDir.exists()) {
+				throw new IOException("Could not create root directory for local recovery: " +
+					localStateRecoveryRootDir);
 			}
 		}