You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@kafka.apache.org by ij...@apache.org on 2016/12/08 16:02:24 UTC

kafka git commit: KAFKA-4392; Handle NoSuchFileException gracefully in StateDirectory

Repository: kafka
Updated Branches:
  refs/heads/trunk e9a67a8da -> 600859e77


KAFKA-4392; Handle NoSuchFileException gracefully in StateDirectory

Author: Guozhang Wang <wa...@gmail.com>

Reviewers: Damian Guy <da...@gmail.com>, Ismael Juma <is...@juma.me.uk>

Closes #2121 from guozhangwang/K4392-race-dir-cleanup


Project: http://git-wip-us.apache.org/repos/asf/kafka/repo
Commit: http://git-wip-us.apache.org/repos/asf/kafka/commit/600859e7
Tree: http://git-wip-us.apache.org/repos/asf/kafka/tree/600859e7
Diff: http://git-wip-us.apache.org/repos/asf/kafka/diff/600859e7

Branch: refs/heads/trunk
Commit: 600859e77c8e9a93212815ac26a6ae8b778fee6f
Parents: e9a67a8
Author: Guozhang Wang <wa...@gmail.com>
Authored: Thu Dec 8 15:50:24 2016 +0000
Committer: Ismael Juma <is...@juma.me.uk>
Committed: Thu Dec 8 15:50:24 2016 +0000

----------------------------------------------------------------------
 .../streams/processor/internals/StateDirectory.java    | 13 ++++++++++++-
 1 file changed, 12 insertions(+), 1 deletion(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/kafka/blob/600859e7/streams/src/main/java/org/apache/kafka/streams/processor/internals/StateDirectory.java
----------------------------------------------------------------------
diff --git a/streams/src/main/java/org/apache/kafka/streams/processor/internals/StateDirectory.java b/streams/src/main/java/org/apache/kafka/streams/processor/internals/StateDirectory.java
index 3048fba..a48ec59 100644
--- a/streams/src/main/java/org/apache/kafka/streams/processor/internals/StateDirectory.java
+++ b/streams/src/main/java/org/apache/kafka/streams/processor/internals/StateDirectory.java
@@ -28,6 +28,7 @@ import java.io.IOException;
 import java.nio.channels.FileChannel;
 import java.nio.channels.FileLock;
 import java.nio.channels.OverlappingFileLockException;
+import java.nio.file.NoSuchFileException;
 import java.nio.file.Path;
 import java.nio.file.StandardOpenOption;
 import java.util.HashMap;
@@ -87,7 +88,17 @@ public class StateDirectory {
             return true;
         }
         final File lockFile = new File(directoryForTask(taskId), LOCK_FILE_NAME);
-        final FileChannel channel = getOrCreateFileChannel(taskId, lockFile.toPath());
+
+        final FileChannel channel;
+
+        try {
+            channel = getOrCreateFileChannel(taskId, lockFile.toPath());
+        } catch (NoSuchFileException e) {
+            // FileChannel.open(..) could throw NoSuchFileException when there is another thread
+            // concurrently deleting the parent directory (i.e. the directory of the taskId) of the lock
+            // file, in this case we will return immediately indicating locking failed.
+            return false;
+        }
 
         FileLock lock = tryAcquireLock(channel);
         while (lock == null && retry > 0) {