You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@kafka.apache.org by ca...@apache.org on 2022/10/11 16:14:13 UTC

[kafka] branch trunk updated: HOTFIX: Only update input partitions of standby tasks if they really changed (#12730)

This is an automated email from the ASF dual-hosted git repository.

cadonna pushed a commit to branch trunk
in repository https://gitbox.apache.org/repos/asf/kafka.git


The following commit(s) were added to refs/heads/trunk by this push:
     new daae2a189de HOTFIX: Only update input partitions of standby tasks if they really changed (#12730)
daae2a189de is described below

commit daae2a189de21e4fed42eb7bd204db2e8636eb4a
Author: Bruno Cadonna <ca...@apache.org>
AuthorDate: Tue Oct 11 18:13:58 2022 +0200

    HOTFIX: Only update input partitions of standby tasks if they really changed (#12730)
    
    Updating the input partitions of tasks also updates the mapping from
    source nodes to input topics in the processor topology within the task.
    The mapping is updated with the topics from the topology metadata.
    The topology metadata does not prefix intermediate internal topics with
    the application ID. Thus, if a standby task has input partitions from an
    intermediate internal topic the update of the mapping in the processor
    topology leads to an invalid topology exception during recycling of a
    standby task to an active task when the input queues are created. This
    is because the input topics in the processor topology and the input
    partitions of the task do not match because the former miss the
    application ID prefix.
    
    The added verification to only update input partitions of standby tasks
    if they really changed avoids the invalid topology exception if the
    standby task only has input partitions from intermediate internal
    topics since they should never change. If the standby task has input
    partitions from intermediate internal topics and external topics
    subscribed to via a regex pattern, the invalid topology exception
    might still be triggered.
    
    Reviewers: Guozhang Wang <gu...@apache.org>, John Roesler <vv...@apache.org>
---
 .../streams/processor/internals/TaskManager.java   | 24 ++++++++++++++--
 .../processor/internals/TaskManagerTest.java       | 32 ++++++++++++++++++++++
 2 files changed, 54 insertions(+), 2 deletions(-)

diff --git a/streams/src/main/java/org/apache/kafka/streams/processor/internals/TaskManager.java b/streams/src/main/java/org/apache/kafka/streams/processor/internals/TaskManager.java
index 31eba40ae61..48afe51aeb9 100644
--- a/streams/src/main/java/org/apache/kafka/streams/processor/internals/TaskManager.java
+++ b/streams/src/main/java/org/apache/kafka/streams/processor/internals/TaskManager.java
@@ -408,8 +408,7 @@ public class TaskManager {
                 activeTasksToCreate.remove(taskId);
             } else if (standbyTasksToCreate.containsKey(taskId)) {
                 if (!task.isActive()) {
-                    final Set<TopicPartition> topicPartitions = standbyTasksToCreate.get(taskId);
-                    task.updateInputPartitions(topicPartitions, topologyMetadata.nodeToSourceTopics(task.id()));
+                    updateInputPartitionsOfStandbyTaskIfTheyChanged(task, standbyTasksToCreate.get(taskId));
                     task.resume();
                 } else {
                     tasksToRecycle.put(task, standbyTasksToCreate.get(taskId));
@@ -421,6 +420,27 @@ public class TaskManager {
         }
     }
 
+    private void updateInputPartitionsOfStandbyTaskIfTheyChanged(final Task task,
+                                                                 final Set<TopicPartition> inputPartitions) {
+        /*
+        We should only update input partitions of a standby task if the input partitions really changed. Updating the
+        input partitions of tasks also updates the mapping from source nodes to input topics in the processor topology
+        within the task. The mapping is updated with the topics from the topology metadata. The topology metadata does
+        not prefix intermediate internal topics with the application ID. Thus, if a standby task has input partitions
+        from an intermediate internal topic the update of the mapping in the processor topology leads to an invalid
+        topology exception during recycling of a standby task to an active task when the input queues are created. This
+        is because the input topics in the processor topology and the input partitions of the task do not match because
+        the former miss the application ID prefix.
+        For standby task that have only input partitions from intermediate internal topics this check avoids the invalid
+        topology exception. Unfortunately, a subtopology might have input partitions subscribed to with a regex
+        additionally intermediate internal topics which might still lead to an invalid topology exception during recycling
+        irrespectively of this check here. Thus, there is still a bug to fix here.
+         */
+        if (!task.inputPartitions().equals(inputPartitions)) {
+            task.updateInputPartitions(inputPartitions, topologyMetadata.nodeToSourceTopics(task.id()));
+        }
+    }
+
     private void handleTasksWithStateUpdater(final Map<TaskId, Set<TopicPartition>> activeTasksToCreate,
                                              final Map<TaskId, Set<TopicPartition>> standbyTasksToCreate,
                                              final Map<Task, Set<TopicPartition>> tasksToRecycle,
diff --git a/streams/src/test/java/org/apache/kafka/streams/processor/internals/TaskManagerTest.java b/streams/src/test/java/org/apache/kafka/streams/processor/internals/TaskManagerTest.java
index 82683ec4488..69688df6f63 100644
--- a/streams/src/test/java/org/apache/kafka/streams/processor/internals/TaskManagerTest.java
+++ b/streams/src/test/java/org/apache/kafka/streams/processor/internals/TaskManagerTest.java
@@ -249,6 +249,38 @@ public class TaskManagerTest {
         Mockito.verifyNoInteractions(stateUpdater);
     }
 
+    @Test
+    public void shouldNotUpdateExistingStandbyTaskIfStandbyIsReassignedWithSameInputPartitionWithoutStateUpdater() {
+        final StandbyTask standbyTask = standbyTask(taskId03, taskId03ChangelogPartitions)
+            .inState(State.RUNNING)
+            .withInputPartitions(taskId03Partitions).build();
+        updateExistingStandbyTaskIfStandbyIsReassignedWithoutStateUpdater(standbyTask, taskId03Partitions);
+        Mockito.verify(standbyTask, never()).updateInputPartitions(Mockito.eq(taskId03Partitions), Mockito.any());
+    }
+
+    @Test
+    public void shouldUpdateExistingStandbyTaskIfStandbyIsReassignedWithDifferentInputPartitionWithoutStateUpdater() {
+        final StandbyTask standbyTask = standbyTask(taskId03, taskId03ChangelogPartitions)
+            .inState(State.RUNNING)
+            .withInputPartitions(taskId03Partitions).build();
+        updateExistingStandbyTaskIfStandbyIsReassignedWithoutStateUpdater(standbyTask, taskId04Partitions);
+        Mockito.verify(standbyTask).updateInputPartitions(Mockito.eq(taskId04Partitions), Mockito.any());
+    }
+
+    private void updateExistingStandbyTaskIfStandbyIsReassignedWithoutStateUpdater(final Task standbyTask,
+                                                                                   final Set<TopicPartition> newInputPartition) {
+        final TasksRegistry tasks = Mockito.mock(TasksRegistry.class);
+        when(tasks.allTasks()).thenReturn(mkSet(standbyTask));
+        final TaskManager taskManager = setUpTaskManager(ProcessingMode.AT_LEAST_ONCE, tasks, false);
+
+        taskManager.handleAssignment(
+            Collections.emptyMap(),
+            mkMap(mkEntry(standbyTask.id(), newInputPartition))
+        );
+
+        Mockito.verify(standbyTask).resume();
+    }
+
     @Test
     public void shouldPrepareActiveTaskInStateUpdaterToBeRecycled() {
         final StreamTask activeTaskToRecycle = statefulTask(taskId03, taskId03ChangelogPartitions)