You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@kafka.apache.org by mj...@apache.org on 2024/03/07 20:38:14 UTC

(kafka) branch 3.7 updated: KAFKA-16025: Fix orphaned locks when rebalancing and store cleanup race on unassigned task directories (#15088)

This is an automated email from the ASF dual-hosted git repository.

mjsax pushed a commit to branch 3.7
in repository https://gitbox.apache.org/repos/asf/kafka.git


The following commit(s) were added to refs/heads/3.7 by this push:
     new 0760ca5e942 KAFKA-16025: Fix orphaned locks when rebalancing and store cleanup race on unassigned task directories (#15088)
0760ca5e942 is described below

commit 0760ca5e942eb67407b9d3a3c46620c5a3e97140
Author: sanepal <gt...@gmail.com>
AuthorDate: Mon Jan 8 14:49:48 2024 -0800

    KAFKA-16025: Fix orphaned locks when rebalancing and store cleanup race on unassigned task directories (#15088)
    
    KAFKA-16025 describes the race condition sequence in detail. When this occurs, it can cause the impacted task's initializing to block indefinitely, blocking progress on the impacted task, and any other task assigned to the same stream thread. The fix I have implemented is pretty simple, simply re-check whether a directory is still empty after locking it during the start of rebalancing, and if it is, unlock it immediately. This preserves the idempotency of the method when it coincides  [...]
    
    Reviewers: Anna Sophie Blee-Goldman <ab...@apache.org>
---
 .../streams/processor/internals/TaskManager.java   | 12 ++++++--
 .../processor/internals/TaskManagerTest.java       | 32 +++++++++++++++++++++-
 2 files changed, 40 insertions(+), 4 deletions(-)

diff --git a/streams/src/main/java/org/apache/kafka/streams/processor/internals/TaskManager.java b/streams/src/main/java/org/apache/kafka/streams/processor/internals/TaskManager.java
index f3603b2a316..d8e3f41c20b 100644
--- a/streams/src/main/java/org/apache/kafka/streams/processor/internals/TaskManager.java
+++ b/streams/src/main/java/org/apache/kafka/streams/processor/internals/TaskManager.java
@@ -1249,9 +1249,15 @@ public class TaskManager {
             try {
                 final TaskId id = parseTaskDirectoryName(dir.getName(), namedTopology);
                 if (stateDirectory.lock(id)) {
-                    lockedTaskDirectories.add(id);
-                    if (!allTasks.containsKey(id)) {
-                        log.debug("Temporarily locked unassigned task {} for the upcoming rebalance", id);
+                    // Check again in case the cleaner thread ran and emptied the directory
+                    if (stateDirectory.directoryForTaskIsEmpty(id)) {
+                        log.debug("Releasing lock on empty directory for task {}", id);
+                        stateDirectory.unlock(id);
+                    } else {
+                        lockedTaskDirectories.add(id);
+                        if (!allTasks.containsKey(id)) {
+                            log.debug("Temporarily locked unassigned task {} for the upcoming rebalance", id);
+                        }
                     }
                 }
             } catch (final TaskIdFormatException e) {
diff --git a/streams/src/test/java/org/apache/kafka/streams/processor/internals/TaskManagerTest.java b/streams/src/test/java/org/apache/kafka/streams/processor/internals/TaskManagerTest.java
index 21f5f9561d9..61e078868bf 100644
--- a/streams/src/test/java/org/apache/kafka/streams/processor/internals/TaskManagerTest.java
+++ b/streams/src/test/java/org/apache/kafka/streams/processor/internals/TaskManagerTest.java
@@ -1905,6 +1905,7 @@ public class TaskManagerTest {
     public void shouldTryToLockValidTaskDirsAtRebalanceStart() throws Exception {
         expectLockObtainedFor(taskId01);
         expectLockFailedFor(taskId10);
+        expectDirectoryNotEmpty(taskId01);
 
         makeTaskFolders(
             taskId01.toString(),
@@ -1918,6 +1919,21 @@ public class TaskManagerTest {
         assertThat(taskManager.lockedTaskDirectories(), is(singleton(taskId01)));
     }
 
+    @Test
+    public void shouldUnlockEmptyDirsAtRebalanceStart() throws Exception {
+        expectLockObtainedFor(taskId01, taskId10);
+        expectDirectoryNotEmpty(taskId01);
+        expect(stateDirectory.directoryForTaskIsEmpty(taskId10)).andReturn(true);
+        expectUnlockFor(taskId10);
+
+        makeTaskFolders(taskId01.toString(), taskId10.toString());
+        replay(stateDirectory);
+        taskManager.handleRebalanceStart(singleton("topic"));
+
+        verify(stateDirectory);
+        assertThat(taskManager.lockedTaskDirectories(), is(singleton(taskId01)));
+    }
+
     @Test
     public void shouldPauseAllTopicsWithoutStateUpdaterOnRebalanceComplete() {
         final Set<TopicPartition> assigned = mkSet(t1p0, t1p1);
@@ -1951,6 +1967,7 @@ public class TaskManagerTest {
     @Test
     public void shouldReleaseLockForUnassignedTasksAfterRebalance() throws Exception {
         expectLockObtainedFor(taskId00, taskId01, taskId02);
+        expectDirectoryNotEmpty(taskId00, taskId01, taskId02);
         expectUnlockFor(taskId02);
 
         makeTaskFolders(
@@ -1993,6 +2010,7 @@ public class TaskManagerTest {
         when(stateUpdater.getTasks()).thenReturn(mkSet(standbyTask, restoringStatefulTask));
         when(tasks.allTasks()).thenReturn(mkSet(runningStatefulTask));
         expectLockObtainedFor(taskId00, taskId01, taskId02, taskId03);
+        expectDirectoryNotEmpty(taskId00, taskId01, taskId02, taskId03);
         expectUnlockFor(taskId03);
         makeTaskFolders(
             taskId00.toString(),
@@ -2121,6 +2139,7 @@ public class TaskManagerTest {
     private void computeOffsetSumAndVerify(final Map<TopicPartition, Long> changelogOffsets,
                                            final Map<TaskId, Long> expectedOffsetSums) throws Exception {
         expectLockObtainedFor(taskId00);
+        expectDirectoryNotEmpty(taskId00);
         makeTaskFolders(taskId00.toString());
         replay(stateDirectory);
 
@@ -2144,6 +2163,7 @@ public class TaskManagerTest {
         final Map<TaskId, Long> expectedOffsetSums = mkMap(mkEntry(taskId00, 15L));
 
         expectLockObtainedFor(taskId00);
+        expectDirectoryNotEmpty(taskId00);
         makeTaskFolders(taskId00.toString());
         replay(stateDirectory);
 
@@ -2243,6 +2263,7 @@ public class TaskManagerTest {
     public void shouldNotReportOffsetSumsAndReleaseLockForUnassignedTaskWithoutCheckpoint() throws Exception {
         expectLockObtainedFor(taskId00);
         makeTaskFolders(taskId00.toString());
+        expectDirectoryNotEmpty(taskId00);
         expect(stateDirectory.checkpointFileFor(taskId00)).andReturn(getCheckpointFile(taskId00));
         replay(stateDirectory);
         taskManager.handleRebalanceStart(singleton("topic"));
@@ -2350,6 +2371,7 @@ public class TaskManagerTest {
 
         makeTaskFolders(taskId00.toString(), taskId01.toString());
         expectLockObtainedFor(taskId00, taskId01);
+        expectDirectoryNotEmpty(taskId00, taskId01);
 
         // The second attempt will return empty tasks.
         makeTaskFolders();
@@ -4050,7 +4072,8 @@ public class TaskManagerTest {
         final StateMachineTask task00 = new StateMachineTask(taskId00, taskId00Partitions, true, stateManager);
         final StateMachineTask task01 = new StateMachineTask(taskId01, taskId01Partitions, false, stateManager);
 
-        makeTaskFolders(taskId00.toString(), task01.toString());
+        makeTaskFolders(taskId00.toString(), taskId01.toString());
+        expectDirectoryNotEmpty(taskId00, taskId01);
         expectLockObtainedFor(taskId00, taskId01);
         expectRestoreToBeCompleted(consumer);
         when(activeTaskCreator.createTasks(any(), Mockito.eq(taskId00Assignment)))
@@ -4829,6 +4852,12 @@ public class TaskManagerTest {
         }
     }
 
+    private void expectDirectoryNotEmpty(final TaskId... tasks) {
+        for (final TaskId taskId : tasks) {
+            expect(stateDirectory.directoryForTaskIsEmpty(taskId)).andReturn(false);
+        }
+    }
+
     private static void expectConsumerAssignmentPaused(final Consumer<byte[], byte[]> consumer) {
         final Set<TopicPartition> assignment = singleton(new TopicPartition("assignment", 0));
         expect(consumer.assignment()).andReturn(assignment);
@@ -5119,6 +5148,7 @@ public class TaskManagerTest {
         Files.createFile(checkpointFile.toPath());
         new OffsetCheckpoint(checkpointFile).write(offsets);
         expect(stateDirectory.checkpointFileFor(task)).andReturn(checkpointFile);
+        expectDirectoryNotEmpty(task);
     }
 
     private File getCheckpointFile(final TaskId task) {