You are viewing a plain text version of this content. The canonical link for it is here.
Posted to jira@kafka.apache.org by GitBox <gi...@apache.org> on 2022/09/06 05:13:47 UTC

[GitHub] [kafka] guozhangwang commented on a diff in pull request #12583: KAFKA-10199: Separate state updater from old restore

guozhangwang commented on code in PR #12583:
URL: https://github.com/apache/kafka/pull/12583#discussion_r963256124


##########
streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamThread.java:
##########
@@ -867,37 +870,47 @@ void runOnce() {
     }
 
     private void initializeAndRestorePhase() {
-        // only try to initialize the assigned tasks
-        // if the state is still in PARTITION_ASSIGNED after the poll call
+        final java.util.function.Consumer<Set<TopicPartition>> offsetResetter = partitions -> resetOffsets(partitions, null);
         final State stateSnapshot = state;
-        if (stateSnapshot == State.PARTITIONS_ASSIGNED
-            || stateSnapshot == State.RUNNING && taskManager.needsInitializationOrRestoration()) {
+        if (stateUpdaterEnabled) {
+            checkStateUpdater();
+        } else {
+            // only try to initialize the assigned tasks
+            // if the state is still in PARTITION_ASSIGNED after the poll call
+            if (stateSnapshot == State.PARTITIONS_ASSIGNED
+                || stateSnapshot == State.RUNNING && taskManager.needsInitializationOrRestoration()) {
 
-            log.debug("State is {}; initializing tasks if necessary", stateSnapshot);
+                log.debug("State is {}; initializing tasks if necessary", stateSnapshot);
 
-            // transit to restore active is idempotent so we can call it multiple times
-            changelogReader.enforceRestoreActive();
+                if (taskManager.tryToCompleteRestoration(now, offsetResetter)) {
+                    log.info("Restoration took {} ms for all tasks {}", time.milliseconds() - lastPartitionAssignedMs,
+                        taskManager.allTasks().keySet());
+                    setState(State.RUNNING);
+                }
 
-            if (taskManager.tryToCompleteRestoration(now, partitions -> resetOffsets(partitions, null))) {
-                changelogReader.transitToUpdateStandby();
-                log.info("Restoration took {} ms for all tasks {}", time.milliseconds() - lastPartitionAssignedMs,
-                    taskManager.allTasks().keySet());
-                setState(State.RUNNING);
+                if (log.isDebugEnabled()) {
+                    log.debug("Initialization call done. State is {}", state);
+                }
             }
 
             if (log.isDebugEnabled()) {
-                log.debug("Initialization call done. State is {}", state);
+                log.debug("Idempotently invoking restoration logic in state {}", state);
             }
+            // we can always let changelog reader try restoring in order to initialize the changelogs;
+            // if there's no active restoring or standby updating it would not try to fetch any data
+            // After KAFKA-13873, we only restore the not paused tasks.
+            changelogReader.restore(taskManager.notPausedTasks());
+            log.debug("Idempotent restore call done. Thread state has not changed.");
         }
+    }
 
-        if (log.isDebugEnabled()) {
-            log.debug("Idempotently invoking restoration logic in state {}", state);
+    private void checkStateUpdater() {
+        final java.util.function.Consumer<Set<TopicPartition>> offsetResetter = partitions -> resetOffsets(partitions, null);
+        final State stateSnapshot = state;

Review Comment:
   Why bookkeep a snapshot here? We would only change state within this thread, including the rebalance listener, so there's no concurrency to worry I think?



##########
streams/src/main/java/org/apache/kafka/streams/processor/internals/TaskManager.java:
##########
@@ -610,73 +610,79 @@ private StreamTask convertStandbyToActive(final StandbyTask standbyTask, final S
      * @throws StreamsException if the store's change log does not contain the partition
      * @return {@code true} if all tasks are fully restored
      */
-    boolean tryToCompleteRestoration(final long now, final java.util.function.Consumer<Set<TopicPartition>> offsetResetter) {
-        if (stateUpdater == null) {
-            boolean allRunning = true;
+    boolean tryToCompleteRestoration(final long now,

Review Comment:
   Just to confirm there's no changes in `tryToCompleteRestoration` except the param right? The diffs are quite confusing... If it's the case I will skip comparing each line.



##########
streams/src/main/java/org/apache/kafka/streams/processor/internals/TaskManager.java:
##########
@@ -685,6 +691,7 @@ private void recycleTask(final Task task,
                              final Map<TaskId, RuntimeException> taskExceptions) {
         Task newTask = null;
         try {
+            task.suspend();

Review Comment:
   Is it assuming that suspend is idempotent here? Without state updater would we call it twice, in `handleRevocation` and `handleAssignment`?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscribe@kafka.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org