You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@kafka.apache.org by gu...@apache.org on 2016/02/24 22:41:49 UTC

kafka git commit: MINOR: add retry to state dir locking

Repository: kafka
Updated Branches:
  refs/heads/trunk 01aeea7c7 -> fa05752cc


MINOR: add retry to state dir locking

There is a possibility that the state directory locking fails when another stream thread is taking long to close all tasks. Simple retries should alleviate the problem.

Author: Yasuhiro Matsuda <ya...@confluent.io>

Reviewers: Guozhang Wang <wa...@gmail.com>

Closes #899 from ymatsuda/minor2


Project: http://git-wip-us.apache.org/repos/asf/kafka/repo
Commit: http://git-wip-us.apache.org/repos/asf/kafka/commit/fa05752c
Tree: http://git-wip-us.apache.org/repos/asf/kafka/tree/fa05752c
Diff: http://git-wip-us.apache.org/repos/asf/kafka/diff/fa05752c

Branch: refs/heads/trunk
Commit: fa05752ccd9ee4996f252aa5ed71f10f28e80a52
Parents: 01aeea7
Author: Yasuhiro Matsuda <ya...@confluent.io>
Authored: Wed Feb 24 13:41:46 2016 -0800
Committer: Guozhang Wang <wa...@gmail.com>
Committed: Wed Feb 24 13:41:46 2016 -0800

----------------------------------------------------------------------
 .../internals/ProcessorStateManager.java        | 23 +++++++++++++++++++-
 1 file changed, 22 insertions(+), 1 deletion(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/kafka/blob/fa05752c/streams/src/main/java/org/apache/kafka/streams/processor/internals/ProcessorStateManager.java
----------------------------------------------------------------------
diff --git a/streams/src/main/java/org/apache/kafka/streams/processor/internals/ProcessorStateManager.java b/streams/src/main/java/org/apache/kafka/streams/processor/internals/ProcessorStateManager.java
index d449d04..30441c5 100644
--- a/streams/src/main/java/org/apache/kafka/streams/processor/internals/ProcessorStateManager.java
+++ b/streams/src/main/java/org/apache/kafka/streams/processor/internals/ProcessorStateManager.java
@@ -85,7 +85,7 @@ public class ProcessorStateManager {
         createStateDirectory(baseDir);
 
         // try to acquire the exclusive lock on the state directory
-        directoryLock = lockStateDirectory(baseDir);
+        directoryLock = lockStateDirectory(baseDir, 5);
         if (directoryLock == null) {
             throw new IOException("Failed to lock the state directory: " + baseDir.getCanonicalPath());
         }
@@ -109,8 +109,27 @@ public class ProcessorStateManager {
     }
 
     public static FileLock lockStateDirectory(File stateDir) throws IOException {
+        return lockStateDirectory(stateDir, 0);
+    }
+
+    private static FileLock lockStateDirectory(File stateDir, int retry) throws IOException {
         File lockFile = new File(stateDir, ProcessorStateManager.LOCK_FILE_NAME);
         FileChannel channel = new RandomAccessFile(lockFile, "rw").getChannel();
+
+        FileLock lock = lockStateDirectory(channel);
+        while (lock == null && retry > 0) {
+            try {
+                Thread.sleep(200);
+            } catch (Exception ex) {
+                // do nothing
+            }
+            retry--;
+            lock = lockStateDirectory(channel);
+        }
+        return lock;
+    }
+
+    private static FileLock lockStateDirectory(FileChannel channel) throws IOException {
         try {
             return channel.tryLock();
         } catch (OverlappingFileLockException e) {
@@ -118,6 +137,8 @@ public class ProcessorStateManager {
         }
     }
 
+
+
     public File baseDir() {
         return this.baseDir;
     }