You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@kafka.apache.org by ij...@apache.org on 2017/04/12 14:27:17 UTC

kafka git commit: KAFKA-5038; Throw correct exception of locking of state directory fails

Repository: kafka
Updated Branches:
  refs/heads/0.10.2 5f728532a -> 863867582


KAFKA-5038; Throw correct exception of locking of state directory fails

Author: Eno Thereska <en...@gmail.com>

Reviewers: Damian Guy <da...@gmail.com>, Matthias J. Sax <ma...@confluent.io>, Ismael Juma <is...@juma.me.uk>

Closes #2841 from enothereska/KAFKA-5038


Project: http://git-wip-us.apache.org/repos/asf/kafka/repo
Commit: http://git-wip-us.apache.org/repos/asf/kafka/commit/86386758
Tree: http://git-wip-us.apache.org/repos/asf/kafka/tree/86386758
Diff: http://git-wip-us.apache.org/repos/asf/kafka/diff/86386758

Branch: refs/heads/0.10.2
Commit: 8638675828b63c4bae6fb85de56543d7b3088546
Parents: 5f72853
Author: Eno Thereska <en...@gmail.com>
Authored: Wed Apr 12 15:26:37 2017 +0100
Committer: Ismael Juma <is...@juma.me.uk>
Committed: Wed Apr 12 15:26:37 2017 +0100

----------------------------------------------------------------------
 .../internals/ProcessorStateManager.java          | 16 ++++++++++++----
 .../processor/internals/StateDirectory.java       |  9 ++++++++-
 .../processor/internals/StateDirectoryTest.java   | 18 ++++++++++++++++++
 3 files changed, 38 insertions(+), 5 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/kafka/blob/86386758/streams/src/main/java/org/apache/kafka/streams/processor/internals/ProcessorStateManager.java
----------------------------------------------------------------------
diff --git a/streams/src/main/java/org/apache/kafka/streams/processor/internals/ProcessorStateManager.java b/streams/src/main/java/org/apache/kafka/streams/processor/internals/ProcessorStateManager.java
index 65831a2..2ef9634 100644
--- a/streams/src/main/java/org/apache/kafka/streams/processor/internals/ProcessorStateManager.java
+++ b/streams/src/main/java/org/apache/kafka/streams/processor/internals/ProcessorStateManager.java
@@ -82,7 +82,7 @@ public class ProcessorStateManager implements StateManager {
                                  final Map<String, String> storeToChangelogTopic) throws LockException, IOException {
         this.taskId = taskId;
         this.stateDirectory = stateDirectory;
-        this.baseDir  = stateDirectory.directoryForTask(taskId);
+        this.logPrefix = String.format("task [%s]", taskId);
         this.partitionForTopic = new HashMap<>();
         for (TopicPartition source : sources) {
             this.partitionForTopic.put(source.topic(), source);
@@ -96,10 +96,18 @@ public class ProcessorStateManager implements StateManager {
         this.restoreCallbacks = isStandby ? new HashMap<String, StateRestoreCallback>() : null;
         this.storeToChangelogTopic = storeToChangelogTopic;
 
-        this.logPrefix = String.format("task [%s]", taskId);
-
         if (!stateDirectory.lock(taskId, 5)) {
-            throw new LockException(String.format("%s Failed to lock the state directory: %s", logPrefix, baseDir.getCanonicalPath()));
+            throw new LockException(String.format("%s Failed to lock the state directory for task %s",
+                logPrefix, taskId));
+        }
+        // get a handle on the parent/base directory of the task directory
+        // note that the parent directory could have been accidentally deleted here,
+        // so catch that exception if that is the case
+        try {
+            this.baseDir = stateDirectory.directoryForTask(taskId);
+        } catch (ProcessorStateException e) {
+            throw new LockException(String.format("%s Failed to get the directory for task %s. Exception %s",
+                logPrefix, taskId, e));
         }
 
         // load the checkpoint information

http://git-wip-us.apache.org/repos/asf/kafka/blob/86386758/streams/src/main/java/org/apache/kafka/streams/processor/internals/StateDirectory.java
----------------------------------------------------------------------
diff --git a/streams/src/main/java/org/apache/kafka/streams/processor/internals/StateDirectory.java b/streams/src/main/java/org/apache/kafka/streams/processor/internals/StateDirectory.java
index d264b26..b081e27 100644
--- a/streams/src/main/java/org/apache/kafka/streams/processor/internals/StateDirectory.java
+++ b/streams/src/main/java/org/apache/kafka/streams/processor/internals/StateDirectory.java
@@ -95,11 +95,18 @@ public class StateDirectory {
      * @throws IOException
      */
     public boolean lock(final TaskId taskId, int retry) throws IOException {
+        final File lockFile;
         // we already have the lock so bail out here
         if (locks.containsKey(taskId)) {
             return true;
         }
-        final File lockFile = new File(directoryForTask(taskId), LOCK_FILE_NAME);
+        try {
+            lockFile = new File(directoryForTask(taskId), LOCK_FILE_NAME);
+        } catch (ProcessorStateException e) {
+            // directoryForTask could be throwing an exception if another thread
+            // has concurrently deleted the directory
+            return false;
+        }
 
         final FileChannel channel;
 

http://git-wip-us.apache.org/repos/asf/kafka/blob/86386758/streams/src/test/java/org/apache/kafka/streams/processor/internals/StateDirectoryTest.java
----------------------------------------------------------------------
diff --git a/streams/src/test/java/org/apache/kafka/streams/processor/internals/StateDirectoryTest.java b/streams/src/test/java/org/apache/kafka/streams/processor/internals/StateDirectoryTest.java
index fb55796..6b1d077 100644
--- a/streams/src/test/java/org/apache/kafka/streams/processor/internals/StateDirectoryTest.java
+++ b/streams/src/test/java/org/apache/kafka/streams/processor/internals/StateDirectoryTest.java
@@ -18,6 +18,7 @@
 package org.apache.kafka.streams.processor.internals;
 
 import org.apache.kafka.common.utils.Utils;
+import org.apache.kafka.streams.errors.ProcessorStateException;
 import org.apache.kafka.streams.processor.TaskId;
 import org.apache.kafka.test.TestUtils;
 import org.junit.After;
@@ -32,6 +33,7 @@ import java.util.Arrays;
 import java.util.List;
 
 import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertFalse;
 import static org.junit.Assert.assertTrue;
 import static org.junit.Assert.fail;
 
@@ -97,6 +99,22 @@ public class StateDirectoryTest {
         assertTrue(directory.lock(taskId, 0));
     }
 
+    @Test(expected = ProcessorStateException.class)
+    public void shouldThrowProcessorStateException() throws Exception {
+        final TaskId taskId = new TaskId(0, 0);
+
+        Utils.delete(stateDir);
+        directory.directoryForTask(taskId);
+    }
+
+    @Test
+    public void shouldNotLockDeletedDirectory() throws Exception {
+        final TaskId taskId = new TaskId(0, 0);
+
+        Utils.delete(stateDir);
+        assertFalse(directory.lock(taskId, 0));
+    }
+
 
     @Test
     public void shouldLockMulitpleTaskDirectories() throws Exception {