You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@kafka.apache.org by ca...@apache.org on 2023/09/27 15:51:37 UTC

[kafka] branch trunk updated: KAFKA-10199: Do not unlock state directories of tasks in state updater (#14442)

This is an automated email from the ASF dual-hosted git repository.

cadonna pushed a commit to branch trunk
in repository https://gitbox.apache.org/repos/asf/kafka.git


The following commit(s) were added to refs/heads/trunk by this push:
     new 673a25acc3a KAFKA-10199: Do not unlock state directories of tasks in state updater (#14442)
673a25acc3a is described below

commit 673a25acc3ac231c5fc1bfe0fcdd3c7e57f2de91
Author: Bruno Cadonna <ca...@apache.org>
AuthorDate: Wed Sep 27 17:51:30 2023 +0200

    KAFKA-10199: Do not unlock state directories of tasks in state updater (#14442)
    
    When Streams completes a rebalance, it unlocks state directories
    all unassigned tasks. Unfortunately, when the state updater is enabled,
    Streams does not look into the state updater to determine the
    unassigned tasks.
    This commit corrects this by adding the check.
    
    Reviewer: Lucas Brutschy <lb...@confluent.io>
---
 .../streams/processor/internals/TaskManager.java   |  3 +-
 .../processor/internals/TaskManagerTest.java       | 42 ++++++++++++++++++++++
 2 files changed, 44 insertions(+), 1 deletion(-)

diff --git a/streams/src/main/java/org/apache/kafka/streams/processor/internals/TaskManager.java b/streams/src/main/java/org/apache/kafka/streams/processor/internals/TaskManager.java
index e7d8bbb3dae..e117e3bd5ea 100644
--- a/streams/src/main/java/org/apache/kafka/streams/processor/internals/TaskManager.java
+++ b/streams/src/main/java/org/apache/kafka/streams/processor/internals/TaskManager.java
@@ -1229,9 +1229,10 @@ public class TaskManager {
      */
     private void releaseLockedUnassignedTaskDirectories() {
         final Iterator<TaskId> taskIdIterator = lockedTaskDirectories.iterator();
+        final Map<TaskId, Task> allTasks = allTasks();
         while (taskIdIterator.hasNext()) {
             final TaskId id = taskIdIterator.next();
-            if (!tasks.contains(id)) {
+            if (!allTasks.containsKey(id)) {
                 stateDirectory.unlock(id);
                 taskIdIterator.remove();
             }
diff --git a/streams/src/test/java/org/apache/kafka/streams/processor/internals/TaskManagerTest.java b/streams/src/test/java/org/apache/kafka/streams/processor/internals/TaskManagerTest.java
index 705bea18d55..d534077cf2e 100644
--- a/streams/src/test/java/org/apache/kafka/streams/processor/internals/TaskManagerTest.java
+++ b/streams/src/test/java/org/apache/kafka/streams/processor/internals/TaskManagerTest.java
@@ -1684,6 +1684,48 @@ public class TaskManagerTest {
         verify(stateDirectory);
     }
 
+    @Test
+    public void shouldReleaseLockForUnassignedTasksAfterRebalanceWithStateUpdater() throws Exception {
+        final StreamTask runningStatefulTask = statefulTask(taskId00, taskId00ChangelogPartitions)
+            .inState(State.RUNNING)
+            .withInputPartitions(taskId00Partitions).build();
+        final StreamTask restoringStatefulTask = statefulTask(taskId01, taskId01ChangelogPartitions)
+            .inState(State.RESTORING)
+            .withInputPartitions(taskId01Partitions).build();
+        final StandbyTask standbyTask = standbyTask(taskId02, taskId02ChangelogPartitions)
+            .inState(State.RUNNING)
+            .withInputPartitions(taskId02Partitions).build();
+        final StandbyTask unassignedStandbyTask = standbyTask(taskId03, taskId03ChangelogPartitions)
+            .inState(State.CREATED)
+            .withInputPartitions(taskId03Partitions).build();
+        final TasksRegistry tasks = Mockito.mock(TasksRegistry.class);
+        final TaskManager taskManager = setUpTaskManager(ProcessingMode.AT_LEAST_ONCE, tasks, true);
+        when(tasks.allTasksPerId()).thenReturn(mkMap(mkEntry(taskId00, runningStatefulTask)));
+        when(stateUpdater.getTasks()).thenReturn(mkSet(standbyTask, restoringStatefulTask));
+        when(tasks.allTasks()).thenReturn(mkSet(runningStatefulTask));
+        expectLockObtainedFor(taskId00, taskId01, taskId02, taskId03);
+        expectUnlockFor(taskId03);
+        makeTaskFolders(
+            taskId00.toString(),
+            taskId01.toString(),
+            taskId02.toString(),
+            taskId03.toString()
+        );
+        replay(stateDirectory);
+
+        final Set<TopicPartition> assigned = mkSet(t1p0, t1p1, t1p2);
+        expect(consumer.assignment()).andReturn(assigned);
+        consumer.pause(mkSet(t1p1, t1p2));
+        replay(consumer);
+
+        taskManager.handleRebalanceStart(singleton("topic"));
+        taskManager.handleRebalanceComplete();
+
+        verify(consumer);
+        verify(stateDirectory);
+        assertThat(taskManager.lockedTaskDirectories(), is(mkSet(taskId00, taskId01, taskId02)));
+    }
+
     @Test
     public void shouldReportLatestOffsetAsOffsetSumForRunningTask() throws Exception {
         final Map<TopicPartition, Long> changelogOffsets = mkMap(