You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@kafka.apache.org by mj...@apache.org on 2020/07/11 18:20:23 UTC
[kafka] branch 2.6 updated: KAFKA-10262: Ensure that creating task
directory is thread safe (#9010)
This is an automated email from the ASF dual-hosted git repository.
mjsax pushed a commit to branch 2.6
in repository https://gitbox.apache.org/repos/asf/kafka.git
The following commit(s) were added to refs/heads/2.6 by this push:
new 25e8557 KAFKA-10262: Ensure that creating task directory is thread safe (#9010)
25e8557 is described below
commit 25e855723ed999fa5ed6f96f90e69d22bfcb153f
Author: Matthias J. Sax <ma...@confluent.io>
AuthorDate: Sat Jul 11 11:11:06 2020 -0700
KAFKA-10262: Ensure that creating task directory is thread safe (#9010)
Reviewers: A. Sophie Blee-Goldman <so...@confluent.io>, John Roesler <jo...@confluent.io>
---
.../processor/internals/StateDirectory.java | 18 +++++---
.../processor/internals/StateDirectoryTest.java | 50 ++++++++++++++++++++++
2 files changed, 63 insertions(+), 5 deletions(-)
diff --git a/streams/src/main/java/org/apache/kafka/streams/processor/internals/StateDirectory.java b/streams/src/main/java/org/apache/kafka/streams/processor/internals/StateDirectory.java
index 0a1e715c..30dd2ca 100644
--- a/streams/src/main/java/org/apache/kafka/streams/processor/internals/StateDirectory.java
+++ b/streams/src/main/java/org/apache/kafka/streams/processor/internals/StateDirectory.java
@@ -47,10 +47,10 @@ import static org.apache.kafka.streams.processor.internals.StateManagerUtil.CHEC
public class StateDirectory {
private static final Pattern PATH_NAME = Pattern.compile("\\d+_\\d+");
-
- static final String LOCK_FILE_NAME = ".lock";
private static final Logger log = LoggerFactory.getLogger(StateDirectory.class);
+ static final String LOCK_FILE_NAME = ".lock";
+ private final Object taskDirCreationLock = new Object();
private final Time time;
private final String appId;
private final File stateDir;
@@ -107,9 +107,17 @@ public class StateDirectory {
*/
public File directoryForTask(final TaskId taskId) {
final File taskDir = new File(stateDir, taskId.toString());
- if (hasPersistentStores && !taskDir.exists() && !taskDir.mkdir()) {
- throw new ProcessorStateException(
- String.format("task directory [%s] doesn't exist and couldn't be created", taskDir.getPath()));
+ if (hasPersistentStores && !taskDir.exists()) {
+ synchronized (taskDirCreationLock) {
+ // to avoid a race condition, we need to check again if the directory does not exist:
+ // otherwise, two threads might pass the outer `if` (and enter the `then` block),
+ // one blocks on `synchronized` while the other creates the directory,
+ // and the blocking one fails when trying to create it after it's unblocked
+ if (!taskDir.exists() && !taskDir.mkdir()) {
+ throw new ProcessorStateException(
+ String.format("task directory [%s] doesn't exist and couldn't be created", taskDir.getPath()));
+ }
+ }
}
return taskDir;
}
diff --git a/streams/src/test/java/org/apache/kafka/streams/processor/internals/StateDirectoryTest.java b/streams/src/test/java/org/apache/kafka/streams/processor/internals/StateDirectoryTest.java
index d1adbe9..1645ea8 100644
--- a/streams/src/test/java/org/apache/kafka/streams/processor/internals/StateDirectoryTest.java
+++ b/streams/src/test/java/org/apache/kafka/streams/processor/internals/StateDirectoryTest.java
@@ -34,6 +34,7 @@ import java.nio.channels.FileChannel;
import java.nio.channels.OverlappingFileLockException;
import java.nio.file.Files;
import java.nio.file.StandardOpenOption;
+import java.time.Duration;
import java.util.Arrays;
import java.util.Collections;
import java.util.Objects;
@@ -41,6 +42,7 @@ import java.util.Properties;
import java.util.Set;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicReference;
import java.util.stream.Collectors;
@@ -49,6 +51,7 @@ import static org.apache.kafka.streams.processor.internals.StateDirectory.LOCK_F
import static org.apache.kafka.streams.processor.internals.StateManagerUtil.CHECKPOINT_FILE_NAME;
import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertFalse;
+import static org.junit.Assert.assertNotNull;
import static org.junit.Assert.assertNull;
import static org.junit.Assert.assertThrows;
import static org.junit.Assert.assertTrue;
@@ -507,4 +510,51 @@ public class StateDirectoryTest {
initializeStateDirectory(false);
assertTrue(directory.lockGlobalState());
}
+
+ @Test
+ public void shouldNotFailWhenCreatingTaskDirectoryInParallel() throws Exception {
+ final TaskId taskId = new TaskId(0, 0);
+ final AtomicBoolean passed = new AtomicBoolean(true);
+
+ final CreateTaskDirRunner runner = new CreateTaskDirRunner(directory, taskId, passed);
+
+ final Thread t1 = new Thread(runner);
+ final Thread t2 = new Thread(runner);
+
+ t1.start();
+ t2.start();
+
+ t1.join(Duration.ofMillis(500L).toMillis());
+ t2.join(Duration.ofMillis(500L).toMillis());
+
+ assertNotNull(runner.taskDirectory);
+ assertTrue(passed.get());
+ assertTrue(runner.taskDirectory.exists());
+ assertTrue(runner.taskDirectory.isDirectory());
+ }
+
+ private static class CreateTaskDirRunner implements Runnable {
+ private final StateDirectory directory;
+ private final TaskId taskId;
+ private final AtomicBoolean passed;
+
+ private File taskDirectory;
+
+ private CreateTaskDirRunner(final StateDirectory directory,
+ final TaskId taskId,
+ final AtomicBoolean passed) {
+ this.directory = directory;
+ this.taskId = taskId;
+ this.passed = passed;
+ }
+
+ @Override
+ public void run() {
+ try {
+ taskDirectory = directory.directoryForTask(taskId);
+ } catch (final ProcessorStateException error) {
+ passed.set(false);
+ }
+ }
+ }
}
\ No newline at end of file