You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@kafka.apache.org by mj...@apache.org on 2020/07/11 18:20:23 UTC

[kafka] branch 2.6 updated: KAFKA-10262: Ensure that creating task directory is thread safe (#9010)

This is an automated email from the ASF dual-hosted git repository.

mjsax pushed a commit to branch 2.6
in repository https://gitbox.apache.org/repos/asf/kafka.git


The following commit(s) were added to refs/heads/2.6 by this push:
     new 25e8557  KAFKA-10262: Ensure that creating task directory is thread safe (#9010)
25e8557 is described below

commit 25e855723ed999fa5ed6f96f90e69d22bfcb153f
Author: Matthias J. Sax <ma...@confluent.io>
AuthorDate: Sat Jul 11 11:11:06 2020 -0700

    KAFKA-10262: Ensure that creating task directory is thread safe (#9010)
    
    Reviewers: A. Sophie Blee-Goldman <so...@confluent.io>, John Roesler <jo...@confluent.io>
---
 .../processor/internals/StateDirectory.java        | 18 +++++---
 .../processor/internals/StateDirectoryTest.java    | 50 ++++++++++++++++++++++
 2 files changed, 63 insertions(+), 5 deletions(-)

diff --git a/streams/src/main/java/org/apache/kafka/streams/processor/internals/StateDirectory.java b/streams/src/main/java/org/apache/kafka/streams/processor/internals/StateDirectory.java
index 0a1e715c..30dd2ca 100644
--- a/streams/src/main/java/org/apache/kafka/streams/processor/internals/StateDirectory.java
+++ b/streams/src/main/java/org/apache/kafka/streams/processor/internals/StateDirectory.java
@@ -47,10 +47,10 @@ import static org.apache.kafka.streams.processor.internals.StateManagerUtil.CHEC
 public class StateDirectory {
 
     private static final Pattern PATH_NAME = Pattern.compile("\\d+_\\d+");
-
-    static final String LOCK_FILE_NAME = ".lock";
     private static final Logger log = LoggerFactory.getLogger(StateDirectory.class);
+    static final String LOCK_FILE_NAME = ".lock";
 
+    private final Object taskDirCreationLock = new Object();
     private final Time time;
     private final String appId;
     private final File stateDir;
@@ -107,9 +107,17 @@ public class StateDirectory {
      */
     public File directoryForTask(final TaskId taskId) {
         final File taskDir = new File(stateDir, taskId.toString());
-        if (hasPersistentStores && !taskDir.exists() && !taskDir.mkdir()) {
-            throw new ProcessorStateException(
-                String.format("task directory [%s] doesn't exist and couldn't be created", taskDir.getPath()));
+        if (hasPersistentStores && !taskDir.exists()) {
+            synchronized (taskDirCreationLock) {
+                // to avoid a race condition, we need to check again if the directory does not exist:
+                // otherwise, two threads might pass the outer `if` (and enter the `then` block),
+                // one blocks on `synchronized` while the other creates the directory,
+                // and the blocking one fails when trying to create it after it's unblocked
+                if (!taskDir.exists() && !taskDir.mkdir()) {
+                    throw new ProcessorStateException(
+                        String.format("task directory [%s] doesn't exist and couldn't be created", taskDir.getPath()));
+                }
+            }
         }
         return taskDir;
     }
diff --git a/streams/src/test/java/org/apache/kafka/streams/processor/internals/StateDirectoryTest.java b/streams/src/test/java/org/apache/kafka/streams/processor/internals/StateDirectoryTest.java
index d1adbe9..1645ea8 100644
--- a/streams/src/test/java/org/apache/kafka/streams/processor/internals/StateDirectoryTest.java
+++ b/streams/src/test/java/org/apache/kafka/streams/processor/internals/StateDirectoryTest.java
@@ -34,6 +34,7 @@ import java.nio.channels.FileChannel;
 import java.nio.channels.OverlappingFileLockException;
 import java.nio.file.Files;
 import java.nio.file.StandardOpenOption;
+import java.time.Duration;
 import java.util.Arrays;
 import java.util.Collections;
 import java.util.Objects;
@@ -41,6 +42,7 @@ import java.util.Properties;
 import java.util.Set;
 import java.util.concurrent.CountDownLatch;
 import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.AtomicBoolean;
 import java.util.concurrent.atomic.AtomicReference;
 import java.util.stream.Collectors;
 
@@ -49,6 +51,7 @@ import static org.apache.kafka.streams.processor.internals.StateDirectory.LOCK_F
 import static org.apache.kafka.streams.processor.internals.StateManagerUtil.CHECKPOINT_FILE_NAME;
 import static org.junit.Assert.assertEquals;
 import static org.junit.Assert.assertFalse;
+import static org.junit.Assert.assertNotNull;
 import static org.junit.Assert.assertNull;
 import static org.junit.Assert.assertThrows;
 import static org.junit.Assert.assertTrue;
@@ -507,4 +510,51 @@ public class StateDirectoryTest {
         initializeStateDirectory(false);
         assertTrue(directory.lockGlobalState());
     }
+
+    @Test
+    public void shouldNotFailWhenCreatingTaskDirectoryInParallel() throws Exception {
+        final TaskId taskId = new TaskId(0, 0);
+        final AtomicBoolean passed = new AtomicBoolean(true);
+
+        final CreateTaskDirRunner runner = new CreateTaskDirRunner(directory, taskId, passed);
+
+        final Thread t1 = new Thread(runner);
+        final Thread t2 = new Thread(runner);
+
+        t1.start();
+        t2.start();
+
+        t1.join(Duration.ofMillis(500L).toMillis());
+        t2.join(Duration.ofMillis(500L).toMillis());
+
+        assertNotNull(runner.taskDirectory);
+        assertTrue(passed.get());
+        assertTrue(runner.taskDirectory.exists());
+        assertTrue(runner.taskDirectory.isDirectory());
+    }
+
+    private static class CreateTaskDirRunner implements Runnable {
+        private final StateDirectory directory;
+        private final TaskId taskId;
+        private final AtomicBoolean passed;
+
+        private File taskDirectory;
+
+        private CreateTaskDirRunner(final StateDirectory directory,
+                                    final TaskId taskId,
+                                    final AtomicBoolean passed) {
+            this.directory = directory;
+            this.taskId = taskId;
+            this.passed = passed;
+        }
+
+        @Override
+        public void run() {
+            try {
+                taskDirectory = directory.directoryForTask(taskId);
+            } catch (final ProcessorStateException error) {
+                passed.set(false);
+            }
+        }
+    }
 }
\ No newline at end of file