You are viewing a plain text version of this content. The canonical link for it is here.
Posted to jira@kafka.apache.org by GitBox <gi...@apache.org> on 2022/06/20 09:09:52 UTC

[GitHub] [kafka] cadonna commented on a diff in pull request #12312: KAFKA-10199: Expose tasks in state updater

cadonna commented on code in PR #12312:
URL: https://github.com/apache/kafka/pull/12312#discussion_r901432897


##########
streams/src/test/java/org/apache/kafka/streams/processor/internals/DefaultStateUpdaterTest.java:
##########
@@ -327,6 +329,56 @@ public void shouldRestoreActiveStatefulTaskThenUpdateStandbyTaskAndAgainRestoreA
         orderVerifier.verify(changelogReader, times(1)).transitToUpdateStandby();
     }
 
+    @Test
+    public void shouldUpdateStandbyTaskAfterAllActiveStatefulTasksFailed() throws Exception {
+        final StreamTask activeTask1 = createActiveStatefulTaskInStateRestoring(TASK_0_0, Collections.singletonList(TOPIC_PARTITION_A_0));
+        final StreamTask activeTask2 = createActiveStatefulTaskInStateRestoring(TASK_0_1, Collections.singletonList(TOPIC_PARTITION_B_0));
+        final StandbyTask standbyTask = createStandbyTaskInStateRunning(TASK_1_0, Collections.singletonList(TOPIC_PARTITION_C_0));
+        final TaskCorruptedException taskCorruptedException =
+            new TaskCorruptedException(mkSet(activeTask1.id(), activeTask2.id()));
+        final Map<TaskId, Task> updatingTasks1 = mkMap(
+            mkEntry(activeTask1.id(), activeTask1),
+            mkEntry(activeTask2.id(), activeTask2),
+            mkEntry(standbyTask.id(), standbyTask)
+        );
+        doThrow(taskCorruptedException).doNothing().when(changelogReader).restore(updatingTasks1);
+        when(changelogReader.allChangelogsCompleted())
+            .thenReturn(false);
+
+        stateUpdater.add(activeTask1);
+        stateUpdater.add(activeTask2);
+        stateUpdater.add(standbyTask);
+
+        final ExceptionAndTasks expectedExceptionAndTasks =
+            new ExceptionAndTasks(mkSet(activeTask1, activeTask2), taskCorruptedException);
+        verifyExceptionsAndFailedTasks(expectedExceptionAndTasks);
+        final InOrder orderVerifier = inOrder(changelogReader);
+        orderVerifier.verify(changelogReader, atLeast(1)).enforceRestoreActive();
+        orderVerifier.verify(changelogReader, times(1)).transitToUpdateStandby();
+    }
+
+    @Test
+    public void shouldUpdateStandbyTaskAfterAllActiveStatefulTasksRemoved() throws Exception {
+        final StreamTask activeTask1 = createActiveStatefulTaskInStateRestoring(TASK_0_0, Collections.singletonList(TOPIC_PARTITION_A_0));
+        final StreamTask activeTask2 = createActiveStatefulTaskInStateRestoring(TASK_0_1, Collections.singletonList(TOPIC_PARTITION_B_0));
+        final StandbyTask standbyTask = createStandbyTaskInStateRunning(TASK_1_0, Collections.singletonList(TOPIC_PARTITION_C_0));
+        when(changelogReader.completedChangelogs()).thenReturn(Collections.emptySet());
+        when(changelogReader.allChangelogsCompleted())
+            .thenReturn(false);
+        stateUpdater.add(activeTask1);
+        stateUpdater.add(activeTask2);
+        stateUpdater.add(standbyTask);
+        verifyUpdatingTasks(activeTask1, activeTask2, standbyTask);
+
+        stateUpdater.remove(activeTask1.id());
+        stateUpdater.remove(activeTask2.id());
+
+        verifyRemovedTasks(activeTask1, activeTask2);
+        final InOrder orderVerifier = inOrder(changelogReader);
+        orderVerifier.verify(changelogReader, atLeast(1)).enforceRestoreActive();
+        orderVerifier.verify(changelogReader, times(1)).transitToUpdateStandby();
+    }
+

Review Comment:
   We missed to switch to updating standby tasks when all active tasks are removed or they failed. This are the corresponding unit tests.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscribe@kafka.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org