You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@kafka.apache.org by ij...@apache.org on 2017/04/12 14:27:17 UTC
kafka git commit: KAFKA-5038;
Throw correct exception of locking of state directory fails
Repository: kafka
Updated Branches:
refs/heads/0.10.2 5f728532a -> 863867582
KAFKA-5038; Throw correct exception of locking of state directory fails
Author: Eno Thereska <en...@gmail.com>
Reviewers: Damian Guy <da...@gmail.com>, Matthias J. Sax <ma...@confluent.io>, Ismael Juma <is...@juma.me.uk>
Closes #2841 from enothereska/KAFKA-5038
Project: http://git-wip-us.apache.org/repos/asf/kafka/repo
Commit: http://git-wip-us.apache.org/repos/asf/kafka/commit/86386758
Tree: http://git-wip-us.apache.org/repos/asf/kafka/tree/86386758
Diff: http://git-wip-us.apache.org/repos/asf/kafka/diff/86386758
Branch: refs/heads/0.10.2
Commit: 8638675828b63c4bae6fb85de56543d7b3088546
Parents: 5f72853
Author: Eno Thereska <en...@gmail.com>
Authored: Wed Apr 12 15:26:37 2017 +0100
Committer: Ismael Juma <is...@juma.me.uk>
Committed: Wed Apr 12 15:26:37 2017 +0100
----------------------------------------------------------------------
.../internals/ProcessorStateManager.java | 16 ++++++++++++----
.../processor/internals/StateDirectory.java | 9 ++++++++-
.../processor/internals/StateDirectoryTest.java | 18 ++++++++++++++++++
3 files changed, 38 insertions(+), 5 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/kafka/blob/86386758/streams/src/main/java/org/apache/kafka/streams/processor/internals/ProcessorStateManager.java
----------------------------------------------------------------------
diff --git a/streams/src/main/java/org/apache/kafka/streams/processor/internals/ProcessorStateManager.java b/streams/src/main/java/org/apache/kafka/streams/processor/internals/ProcessorStateManager.java
index 65831a2..2ef9634 100644
--- a/streams/src/main/java/org/apache/kafka/streams/processor/internals/ProcessorStateManager.java
+++ b/streams/src/main/java/org/apache/kafka/streams/processor/internals/ProcessorStateManager.java
@@ -82,7 +82,7 @@ public class ProcessorStateManager implements StateManager {
final Map<String, String> storeToChangelogTopic) throws LockException, IOException {
this.taskId = taskId;
this.stateDirectory = stateDirectory;
- this.baseDir = stateDirectory.directoryForTask(taskId);
+ this.logPrefix = String.format("task [%s]", taskId);
this.partitionForTopic = new HashMap<>();
for (TopicPartition source : sources) {
this.partitionForTopic.put(source.topic(), source);
@@ -96,10 +96,18 @@ public class ProcessorStateManager implements StateManager {
this.restoreCallbacks = isStandby ? new HashMap<String, StateRestoreCallback>() : null;
this.storeToChangelogTopic = storeToChangelogTopic;
- this.logPrefix = String.format("task [%s]", taskId);
-
if (!stateDirectory.lock(taskId, 5)) {
- throw new LockException(String.format("%s Failed to lock the state directory: %s", logPrefix, baseDir.getCanonicalPath()));
+ throw new LockException(String.format("%s Failed to lock the state directory for task %s",
+ logPrefix, taskId));
+ }
+ // get a handle on the parent/base directory of the task directory
+ // note that the parent directory could have been accidentally deleted here,
+ // so catch that exception if that is the case
+ try {
+ this.baseDir = stateDirectory.directoryForTask(taskId);
+ } catch (ProcessorStateException e) {
+ throw new LockException(String.format("%s Failed to get the directory for task %s. Exception %s",
+ logPrefix, taskId, e));
}
// load the checkpoint information
http://git-wip-us.apache.org/repos/asf/kafka/blob/86386758/streams/src/main/java/org/apache/kafka/streams/processor/internals/StateDirectory.java
----------------------------------------------------------------------
diff --git a/streams/src/main/java/org/apache/kafka/streams/processor/internals/StateDirectory.java b/streams/src/main/java/org/apache/kafka/streams/processor/internals/StateDirectory.java
index d264b26..b081e27 100644
--- a/streams/src/main/java/org/apache/kafka/streams/processor/internals/StateDirectory.java
+++ b/streams/src/main/java/org/apache/kafka/streams/processor/internals/StateDirectory.java
@@ -95,11 +95,18 @@ public class StateDirectory {
* @throws IOException
*/
public boolean lock(final TaskId taskId, int retry) throws IOException {
+ final File lockFile;
// we already have the lock so bail out here
if (locks.containsKey(taskId)) {
return true;
}
- final File lockFile = new File(directoryForTask(taskId), LOCK_FILE_NAME);
+ try {
+ lockFile = new File(directoryForTask(taskId), LOCK_FILE_NAME);
+ } catch (ProcessorStateException e) {
+ // directoryForTask could be throwing an exception if another thread
+ // has concurrently deleted the directory
+ return false;
+ }
final FileChannel channel;
http://git-wip-us.apache.org/repos/asf/kafka/blob/86386758/streams/src/test/java/org/apache/kafka/streams/processor/internals/StateDirectoryTest.java
----------------------------------------------------------------------
diff --git a/streams/src/test/java/org/apache/kafka/streams/processor/internals/StateDirectoryTest.java b/streams/src/test/java/org/apache/kafka/streams/processor/internals/StateDirectoryTest.java
index fb55796..6b1d077 100644
--- a/streams/src/test/java/org/apache/kafka/streams/processor/internals/StateDirectoryTest.java
+++ b/streams/src/test/java/org/apache/kafka/streams/processor/internals/StateDirectoryTest.java
@@ -18,6 +18,7 @@
package org.apache.kafka.streams.processor.internals;
import org.apache.kafka.common.utils.Utils;
+import org.apache.kafka.streams.errors.ProcessorStateException;
import org.apache.kafka.streams.processor.TaskId;
import org.apache.kafka.test.TestUtils;
import org.junit.After;
@@ -32,6 +33,7 @@ import java.util.Arrays;
import java.util.List;
import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertFalse;
import static org.junit.Assert.assertTrue;
import static org.junit.Assert.fail;
@@ -97,6 +99,22 @@ public class StateDirectoryTest {
assertTrue(directory.lock(taskId, 0));
}
+ @Test(expected = ProcessorStateException.class)
+ public void shouldThrowProcessorStateException() throws Exception {
+ final TaskId taskId = new TaskId(0, 0);
+
+ Utils.delete(stateDir);
+ directory.directoryForTask(taskId);
+ }
+
+ @Test
+ public void shouldNotLockDeletedDirectory() throws Exception {
+ final TaskId taskId = new TaskId(0, 0);
+
+ Utils.delete(stateDir);
+ assertFalse(directory.lock(taskId, 0));
+ }
+
@Test
public void shouldLockMulitpleTaskDirectories() throws Exception {