You are viewing a plain text version of this content. The canonical link for it is here.
Posted to jira@kafka.apache.org by GitBox <gi...@apache.org> on 2022/05/24 19:18:24 UTC

[GitHub] [kafka] guozhangwang commented on a diff in pull request #12200: KAFKA-10199: Implement adding standby tasks to the state updater

guozhangwang commented on code in PR #12200:
URL: https://github.com/apache/kafka/pull/12200#discussion_r880841005


##########
streams/src/main/java/org/apache/kafka/streams/processor/internals/DefaultStateUpdater.java:
##########
@@ -206,30 +230,46 @@ private List<TaskAndAction> getTasksAndActions() {
 
         private void addTask(final Task task) {
             if (isStateless(task)) {
+                log.debug("Stateless active task " + task.id() + " was added to the state updater");
                 addTaskToRestoredTasks((StreamTask) task);
             } else {
-                updatingTasks.put(task.id(), task);
+                if (task.isActive()) {
+                    updatingTasks.put(task.id(), task);

Review Comment:
   nit: we can move this line out of the if-else block.



##########
streams/src/main/java/org/apache/kafka/streams/processor/internals/DefaultStateUpdater.java:
##########
@@ -74,30 +76,44 @@ public Collection<Task> getAllUpdatingTasks() {
             return updatingTasks.values();
         }
 
+        public Collection<StandbyTask> getUpdatingStandbyTasks() {
+            return updatingTasks.values().stream()
+                .filter(t -> !t.isActive())
+                .map(t -> (StandbyTask) t)
+                .collect(Collectors.toList());
+        }
+
+        public boolean onlyStandbyTasksLeft() {
+            return !updatingTasks.isEmpty() && updatingTasks.values().stream().allMatch(t -> !t.isActive());

Review Comment:
   I think we do not need the first condition, since even if there's empty tasks calling `transitToUpdateStandby` does not harm since the thread would be returned immediately from `changelogReader.restore` and then block waiting on `waitIfAllChangelogsCompletelyRead` anyways. Maybe we can just rename it to `noActiveTaskLeft` which only checks the second condition.



##########
streams/src/test/java/org/apache/kafka/streams/processor/internals/DefaultStateUpdaterTest.java:
##########
@@ -356,6 +433,36 @@ public void shouldAddFailedTasksToQueueWhenUncaughtExceptionIsThrown() throws Ex
         assertTrue(stateUpdater.getAllTasks().isEmpty());
     }
 
+    private void verifyRestoredActiveTasks(final StreamTask... tasks) throws Exception {
+        final Set<StreamTask> expectedRestoredTasks = mkSet(tasks);
+        final Set<StreamTask> restoredTasks = new HashSet<>();
+        waitForCondition(
+            () -> {
+                restoredTasks.addAll(stateUpdater.getRestoredActiveTasks(Duration.ofMillis(CALL_TIMEOUT)));
+                return restoredTasks.size() == expectedRestoredTasks.size();
+            },
+            VERIFICATION_TIMEOUT,
+            "Did not get any restored active task within the given timeout!"

Review Comment:
   nit: `any` -> `all`?



##########
streams/src/test/java/org/apache/kafka/streams/processor/internals/DefaultStateUpdaterTest.java:
##########
@@ -163,21 +172,12 @@ public void shouldRestoreSingleActiveStatefulTask() throws Exception {
 
         stateUpdater.add(task);
 
-        final Set<StreamTask> expectedRestoredTasks = Collections.singleton(task);
-        final Set<StreamTask> restoredTasks = new HashSet<>();
-        waitForCondition(
-            () -> {
-                restoredTasks.addAll(stateUpdater.getRestoredActiveTasks(Duration.ofMillis(CALL_TIMEOUT)));
-                return restoredTasks.size() == expectedRestoredTasks.size();
-            },
-            VERIFICATION_TIMEOUT,
-            "Did not get any restored active task within the given timeout!"
-        );
-        assertTrue(restoredTasks.containsAll(expectedRestoredTasks));
-        assertEquals(expectedRestoredTasks.size(), restoredTasks.stream().filter(t -> t.state() == State.RESTORING).count());
+        verifyRestoredActiveTasks(task);
         assertTrue(stateUpdater.getAllTasks().isEmpty());
-        verify(changelogReader, atLeast(3)).restore(anyMap());
+        verify(changelogReader, times(1)).enforceRestoreActive();
+        verify(changelogReader, atLeast(1)).restore(anyMap());

Review Comment:
   Why change from 3 to 1?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscribe@kafka.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org