You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@kafka.apache.org by gu...@apache.org on 2016/02/24 22:41:49 UTC
kafka git commit: MINOR: add retry to state dir locking
Repository: kafka
Updated Branches:
refs/heads/trunk 01aeea7c7 -> fa05752cc
MINOR: add retry to state dir locking
There is a possibility that the state directory locking fails when another stream thread is taking long to close all tasks. Simple retries should alleviate the problem.
Author: Yasuhiro Matsuda <ya...@confluent.io>
Reviewers: Guozhang Wang <wa...@gmail.com>
Closes #899 from ymatsuda/minor2
Project: http://git-wip-us.apache.org/repos/asf/kafka/repo
Commit: http://git-wip-us.apache.org/repos/asf/kafka/commit/fa05752c
Tree: http://git-wip-us.apache.org/repos/asf/kafka/tree/fa05752c
Diff: http://git-wip-us.apache.org/repos/asf/kafka/diff/fa05752c
Branch: refs/heads/trunk
Commit: fa05752ccd9ee4996f252aa5ed71f10f28e80a52
Parents: 01aeea7
Author: Yasuhiro Matsuda <ya...@confluent.io>
Authored: Wed Feb 24 13:41:46 2016 -0800
Committer: Guozhang Wang <wa...@gmail.com>
Committed: Wed Feb 24 13:41:46 2016 -0800
----------------------------------------------------------------------
.../internals/ProcessorStateManager.java | 23 +++++++++++++++++++-
1 file changed, 22 insertions(+), 1 deletion(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/kafka/blob/fa05752c/streams/src/main/java/org/apache/kafka/streams/processor/internals/ProcessorStateManager.java
----------------------------------------------------------------------
diff --git a/streams/src/main/java/org/apache/kafka/streams/processor/internals/ProcessorStateManager.java b/streams/src/main/java/org/apache/kafka/streams/processor/internals/ProcessorStateManager.java
index d449d04..30441c5 100644
--- a/streams/src/main/java/org/apache/kafka/streams/processor/internals/ProcessorStateManager.java
+++ b/streams/src/main/java/org/apache/kafka/streams/processor/internals/ProcessorStateManager.java
@@ -85,7 +85,7 @@ public class ProcessorStateManager {
createStateDirectory(baseDir);
// try to acquire the exclusive lock on the state directory
- directoryLock = lockStateDirectory(baseDir);
+ directoryLock = lockStateDirectory(baseDir, 5);
if (directoryLock == null) {
throw new IOException("Failed to lock the state directory: " + baseDir.getCanonicalPath());
}
@@ -109,8 +109,27 @@ public class ProcessorStateManager {
}
public static FileLock lockStateDirectory(File stateDir) throws IOException {
+ return lockStateDirectory(stateDir, 0);
+ }
+
+ private static FileLock lockStateDirectory(File stateDir, int retry) throws IOException {
File lockFile = new File(stateDir, ProcessorStateManager.LOCK_FILE_NAME);
FileChannel channel = new RandomAccessFile(lockFile, "rw").getChannel();
+
+ FileLock lock = lockStateDirectory(channel);
+ while (lock == null && retry > 0) {
+ try {
+ Thread.sleep(200);
+ } catch (Exception ex) {
+ // do nothing
+ }
+ retry--;
+ lock = lockStateDirectory(channel);
+ }
+ return lock;
+ }
+
+ private static FileLock lockStateDirectory(FileChannel channel) throws IOException {
try {
return channel.tryLock();
} catch (OverlappingFileLockException e) {
@@ -118,6 +137,8 @@ public class ProcessorStateManager {
}
}
+
+
public File baseDir() {
return this.baseDir;
}