You are viewing a plain text version of this content. The canonical link for it is here.
Posted to jira@kafka.apache.org by GitBox <gi...@apache.org> on 2020/10/12 12:00:44 UTC

[GitHub] [kafka] cadonna commented on a change in pull request #8988: KAFKA-10199: Separate restore threads

cadonna commented on a change in pull request #8988:
URL: https://github.com/apache/kafka/pull/8988#discussion_r502525264



##########
File path: streams/src/main/java/org/apache/kafka/streams/processor/internals/StateRestoreThread.java
##########
@@ -0,0 +1,316 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.streams.processor.internals;
+
+import org.apache.kafka.clients.admin.Admin;
+import org.apache.kafka.clients.consumer.Consumer;
+import org.apache.kafka.common.TopicPartition;
+import org.apache.kafka.common.errors.InterruptException;
+import org.apache.kafka.common.errors.TimeoutException;
+import org.apache.kafka.common.utils.LogContext;
+import org.apache.kafka.common.utils.Time;
+import org.apache.kafka.streams.StreamsConfig;
+import org.apache.kafka.streams.errors.StreamsException;
+import org.apache.kafka.streams.errors.TaskCorruptedException;
+import org.apache.kafka.streams.processor.StateRestoreListener;
+import org.apache.kafka.streams.processor.TaskId;
+import org.slf4j.Logger;
+
+import java.util.ArrayList;
+import java.util.Collection;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.Set;
+import java.util.concurrent.CountDownLatch;
+import java.util.concurrent.LinkedBlockingDeque;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.AtomicBoolean;
+import java.util.concurrent.atomic.AtomicReference;
+import java.util.stream.Collectors;
+
+
+/**
+ * This is the thread responsible for restoring state stores for both active and standby tasks
+ */
+public class StateRestoreThread extends Thread {
+
+    private final Time time;
+    private final Logger log;
+    private final ChangelogReader changelogReader;
+    private final AtomicBoolean isRunning = new AtomicBoolean(true);
+    private final CountDownLatch shutdownLatch = new CountDownLatch(1);
+    private final LinkedBlockingDeque<TaskItem> taskItemQueue;
+    private final AtomicReference<Set<TopicPartition>> completedChangelogs;
+    private final LinkedBlockingDeque<TaskCorruptedException> corruptedExceptions;
+    private final AtomicReference<RuntimeException> fatalException;
+
+    public boolean isRunning() {
+        return isRunning.get();
+    }
+
+    public StateRestoreThread(final Time time,
+                              final StreamsConfig config,
+                              final String threadClientId,
+                              final Admin adminClient,
+                              final String groupId,
+                              final Consumer<byte[], byte[]> restoreConsumer,
+                              final StateRestoreListener userStateRestoreListener) {
+        this(time, threadClientId, new StoreChangelogReader(time, config, threadClientId,
+                adminClient, groupId, restoreConsumer, userStateRestoreListener));
+    }
+
+    // for testing only
+    public StateRestoreThread(final Time time,
+                              final String threadClientId,
+                              final ChangelogReader changelogReader) {
+        super(threadClientId);
+
+        final String logPrefix = String.format("state-restore-thread [%s] ", threadClientId);
+        final LogContext logContext = new LogContext(logPrefix);
+
+        this.time = time;
+        this.log = logContext.logger(getClass());
+        this.taskItemQueue = new LinkedBlockingDeque<>();
+        this.fatalException = new AtomicReference<>();
+        this.corruptedExceptions = new LinkedBlockingDeque<>();
+        this.completedChangelogs = new AtomicReference<>(Collections.emptySet());
+
+        this.changelogReader = changelogReader;
+    }
+
+    private synchronized void waitIfAllChangelogsCompleted() {
+        final Set<TopicPartition> allChangelogs = changelogReader.allChangelogs();
+        if (allChangelogs.equals(changelogReader.completedChangelogs())) {
+            log.debug("All changelogs {} have completed restoration so far, will wait " +
+                    "until new changelogs are registered", allChangelogs);
+
+            while (isRunning.get() && taskItemQueue.isEmpty()) {
+                try {
+                    wait();
+                } catch (final InterruptedException e) {
+                    // do nothing
+                }
+            }
+        }
+    }
+
+    public synchronized void addInitializedTasks(final List<Task> tasks) {
+        if (!tasks.isEmpty()) {
+            for (final Task task: tasks) {
+                taskItemQueue.add(new TaskItem(task, ItemType.CREATE, task.changelogPartitions()));
+            }
+            notifyAll();
+        }
+    }
+
+    public synchronized void addClosedTasks(final Map<Task, Collection<TopicPartition>> tasks) {
+        if (!tasks.isEmpty()) {
+            for (final Map.Entry<Task, Collection<TopicPartition>> entry : tasks.entrySet()) {
+                taskItemQueue.add(new TaskItem(entry.getKey(), ItemType.CLOSE, entry.getValue()));
+            }
+            notifyAll();
+        }
+    }
+
+    public Set<TopicPartition> completedChangelogs() {
+        return completedChangelogs.get();
+    }
+
+    @Override
+    public void run() {
+        try {
+            while (isRunning()) {
+                runOnce();
+            }
+        } catch (final RuntimeException e) {
+            log.error("Encountered the following exception while restoring states " +
+                    "and the thread is going to shut down: ", e);
+
+            // we would not throw the exception from the restore thread
+            // but would need the main thread to get and throw it
+            fatalException.set(e);
+        } finally {
+            // if the thread is exiting due to exception,
+            // we would still set its running flag
+            isRunning.set(false);
+
+            try {
+                changelogReader.clear();
+            } catch (final Throwable e) {
+                log.error("Failed to close changelog reader due to the following error:", e);
+            }
+
+            shutdownLatch.countDown();
+        }
+    }
+
+    // Visible for testing
+    void runOnce() {
+        waitIfAllChangelogsCompleted();
+
+        if (!isRunning.get())
+            return;
+
+        // a task being recycled maybe in both closed and initialized tasks,
+        // and hence we should process the closed ones first and then initialized ones

Review comment:
       What do you want to say with this comment? 

##########
File path: streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamThread.java
##########
@@ -622,16 +613,42 @@ void runOnce() {
             return;
         }
 
-        initializeAndRestorePhase();
+        // we need to first add any closed revoked/corrupted/recycled tasks and then add the initialized tasks to update the changelogs of revived/recycled tasks
+        restoreThread.addClosedTasks(taskManager.drainRemovedTasks());
+
+        // try to initialize created tasks that are either newly assigned, recycled, or revived from corrupted tasks
+        final List<Task> initializedTasks = taskManager.tryInitializeNewTasks();
+        if (!initializedTasks.isEmpty()) {
+            log.info("Initialized new tasks {} under state {}, will start restoring them",
+                    initializedTasks.stream().map(Task::id).collect(Collectors.toList()), state);
+
+            restoreThread.addInitializedTasks(initializedTasks);
+        }
+
+        // try complete restoration if there are any restoring tasks
+        taskManager.tryToCompleteRestoration(restoreThread.completedChangelogs());
+
+        if (state == State.PARTITIONS_ASSIGNED && taskManager.allTasksRunning()) {
+            // it is possible that we have no assigned tasks in which case we would still transit state
+            setState(State.RUNNING);
 
-        // TODO: we should record the restore latency and its relative time spent ratio after
-        //       we figure out how to move this method out of the stream thread
-        advanceNowAndComputeLatency();
+            log.info("All tasks are now running");
+        }
+
+        // check if restore thread has encountered TaskCorrupted exception; if yes
+        // rethrow it to trigger the handling logic
+        final RuntimeException e = restoreThread.pollNextExceptionIfAny();
+        if (e != null) {
+            throw e;
+        }

Review comment:
       I think that could also be done in the task manager. What I mean is to call a method on the task manager that then throws the exception.

##########
File path: streams/src/main/java/org/apache/kafka/streams/processor/internals/TaskManager.java
##########
@@ -93,16 +95,14 @@
     private final Set<TaskId> lockedTaskDirectories = new HashSet<>();
     private java.util.function.Consumer<Set<TopicPartition>> resetter;
 
-    TaskManager(final ChangelogReader changelogReader,
-                final UUID processId,
+    TaskManager(final UUID processId,

Review comment:
       What do you think of passing the restore thread to the `TaskManager` and start it in its constructor? I would even propose to create and start the restore thread in the `TaskManager`'s constructor. In such a way, the specifics of restoration is hidden in the `TaskManager`. WDYT?

##########
File path: streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamThread.java
##########
@@ -946,14 +934,14 @@ private void completeShutdown(final boolean cleanRun) {
         log.info("Shutting down");
 
         try {
-            taskManager.shutdown(cleanRun);
+            restoreThread.shutdown(10_000L);

Review comment:
       I also think that it makes sense to pass in the timeout to `shutdown()`. At least for the code path originating from  `KafkaStreams#close()`, we can apply the timeout passed to `close()`.

##########
File path: streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamThread.java
##########
@@ -622,16 +613,42 @@ void runOnce() {
             return;
         }
 
-        initializeAndRestorePhase();
+        // we need to first add any closed revoked/corrupted/recycled tasks and then add the initialized tasks to update the changelogs of revived/recycled tasks
+        restoreThread.addClosedTasks(taskManager.drainRemovedTasks());
+
+        // try to initialize created tasks that are either newly assigned, recycled, or revived from corrupted tasks
+        final List<Task> initializedTasks = taskManager.tryInitializeNewTasks();
+        if (!initializedTasks.isEmpty()) {
+            log.info("Initialized new tasks {} under state {}, will start restoring them",
+                    initializedTasks.stream().map(Task::id).collect(Collectors.toList()), state);
+
+            restoreThread.addInitializedTasks(initializedTasks);
+        }
+
+        // try complete restoration if there are any restoring tasks
+        taskManager.tryToCompleteRestoration(restoreThread.completedChangelogs());

Review comment:
       Would it possible to move this code into the task manager and have just one call to the task manager here that returns if restoration is done?

##########
File path: streams/src/main/java/org/apache/kafka/streams/processor/internals/StateRestoreThread.java
##########
@@ -0,0 +1,316 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.streams.processor.internals;
+
+import org.apache.kafka.clients.admin.Admin;
+import org.apache.kafka.clients.consumer.Consumer;
+import org.apache.kafka.common.TopicPartition;
+import org.apache.kafka.common.errors.InterruptException;
+import org.apache.kafka.common.errors.TimeoutException;
+import org.apache.kafka.common.utils.LogContext;
+import org.apache.kafka.common.utils.Time;
+import org.apache.kafka.streams.StreamsConfig;
+import org.apache.kafka.streams.errors.StreamsException;
+import org.apache.kafka.streams.errors.TaskCorruptedException;
+import org.apache.kafka.streams.processor.StateRestoreListener;
+import org.apache.kafka.streams.processor.TaskId;
+import org.slf4j.Logger;
+
+import java.util.ArrayList;
+import java.util.Collection;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.Set;
+import java.util.concurrent.CountDownLatch;
+import java.util.concurrent.LinkedBlockingDeque;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.AtomicBoolean;
+import java.util.concurrent.atomic.AtomicReference;
+import java.util.stream.Collectors;
+
+
+/**
+ * This is the thread responsible for restoring state stores for both active and standby tasks
+ */
+public class StateRestoreThread extends Thread {
+
+    private final Time time;
+    private final Logger log;
+    private final ChangelogReader changelogReader;
+    private final AtomicBoolean isRunning = new AtomicBoolean(true);
+    private final CountDownLatch shutdownLatch = new CountDownLatch(1);
+    private final LinkedBlockingDeque<TaskItem> taskItemQueue;
+    private final AtomicReference<Set<TopicPartition>> completedChangelogs;
+    private final LinkedBlockingDeque<TaskCorruptedException> corruptedExceptions;
+    private final AtomicReference<RuntimeException> fatalException;
+
+    public boolean isRunning() {
+        return isRunning.get();
+    }
+
+    public StateRestoreThread(final Time time,
+                              final StreamsConfig config,
+                              final String threadClientId,
+                              final Admin adminClient,
+                              final String groupId,
+                              final Consumer<byte[], byte[]> restoreConsumer,
+                              final StateRestoreListener userStateRestoreListener) {
+        this(time, threadClientId, new StoreChangelogReader(time, config, threadClientId,
+                adminClient, groupId, restoreConsumer, userStateRestoreListener));
+    }
+
+    // for testing only
+    public StateRestoreThread(final Time time,
+                              final String threadClientId,
+                              final ChangelogReader changelogReader) {
+        super(threadClientId);
+
+        final String logPrefix = String.format("state-restore-thread [%s] ", threadClientId);
+        final LogContext logContext = new LogContext(logPrefix);
+
+        this.time = time;
+        this.log = logContext.logger(getClass());
+        this.taskItemQueue = new LinkedBlockingDeque<>();
+        this.fatalException = new AtomicReference<>();
+        this.corruptedExceptions = new LinkedBlockingDeque<>();
+        this.completedChangelogs = new AtomicReference<>(Collections.emptySet());
+
+        this.changelogReader = changelogReader;
+    }
+
+    private synchronized void waitIfAllChangelogsCompleted() {
+        final Set<TopicPartition> allChangelogs = changelogReader.allChangelogs();
+        if (allChangelogs.equals(changelogReader.completedChangelogs())) {
+            log.debug("All changelogs {} have completed restoration so far, will wait " +
+                    "until new changelogs are registered", allChangelogs);
+
+            while (isRunning.get() && taskItemQueue.isEmpty()) {
+                try {
+                    wait();
+                } catch (final InterruptedException e) {
+                    // do nothing
+                }
+            }
+        }
+    }
+
+    public synchronized void addInitializedTasks(final List<Task> tasks) {
+        if (!tasks.isEmpty()) {
+            for (final Task task: tasks) {
+                taskItemQueue.add(new TaskItem(task, ItemType.CREATE, task.changelogPartitions()));
+            }
+            notifyAll();
+        }
+    }
+
+    public synchronized void addClosedTasks(final Map<Task, Collection<TopicPartition>> tasks) {
+        if (!tasks.isEmpty()) {
+            for (final Map.Entry<Task, Collection<TopicPartition>> entry : tasks.entrySet()) {
+                taskItemQueue.add(new TaskItem(entry.getKey(), ItemType.CLOSE, entry.getValue()));
+            }
+            notifyAll();
+        }
+    }
+
+    public Set<TopicPartition> completedChangelogs() {
+        return completedChangelogs.get();
+    }
+
+    @Override
+    public void run() {
+        try {
+            while (isRunning()) {
+                runOnce();
+            }
+        } catch (final RuntimeException e) {
+            log.error("Encountered the following exception while restoring states " +
+                    "and the thread is going to shut down: ", e);
+
+            // we would not throw the exception from the restore thread
+            // but would need the main thread to get and throw it
+            fatalException.set(e);
+        } finally {
+            // if the thread is exiting due to exception,
+            // we would still set its running flag
+            isRunning.set(false);
+
+            try {
+                changelogReader.clear();
+            } catch (final Throwable e) {
+                log.error("Failed to close changelog reader due to the following error:", e);
+            }
+
+            shutdownLatch.countDown();
+        }
+    }
+
+    // Visible for testing
+    void runOnce() {
+        waitIfAllChangelogsCompleted();
+
+        if (!isRunning.get())
+            return;
+
+        // a task being recycled maybe in both closed and initialized tasks,
+        // and hence we should process the closed ones first and then initialized ones
+        final List<TaskItem> items = new ArrayList<>();
+        taskItemQueue.drainTo(items);
+
+        for (final TaskItem item : items) {
+            // TODO KAFKA-10575: we should consider also call the listener if the
+            //                   task is closed but not yet completed restoration
+            if (item.type == ItemType.CLOSE) {
+                changelogReader.unregister(item.changelogPartitions);
+
+                log.info("Unregistered changelogs {} for closing task {}",
+                        item.task.changelogPartitions(),
+                        item.task.id());
+            } else if (item.type == ItemType.CREATE) {
+                // we should only convert the state manager type right before re-registering the changelog
+                item.task.stateManager().maybeCompleteTaskTypeConversion();
+
+                for (final TopicPartition partition : item.changelogPartitions) {
+                    changelogReader.register(partition, item.task.stateManager());
+                }
+
+                log.info("Registered changelogs {} for created task {}",
+                        item.task.changelogPartitions(),
+                        item.task.id());
+            }
+        }
+        items.clear();
+
+        // try to restore some changelogs
+        final long startMs = time.milliseconds();
+        try {
+            final int numRestored = changelogReader.restore();
+            // TODO KIP-444: we should record the restoration related metrics including restore-ratio
+            log.debug("Restored {} records in {} ms", numRestored, time.milliseconds() - startMs);
+        } catch (final TaskCorruptedException e) {
+            log.warn("Detected the states of tasks " + e.corruptedTaskWithChangelogs() + " are corrupted. " +
+                    "Will close the task as dirty and re-create and bootstrap from scratch.", e);
+
+            // remove corrupted partitions form the changelog reader and continue; we can still proceed
+            // and restore other partitions until the main thread come to handle this exception
+            changelogReader.unregister(e.corruptedTaskWithChangelogs().values().stream()
+                    .flatMap(Collection::stream)
+                    .collect(Collectors.toList()));
+
+            corruptedExceptions.add(e);
+        } catch (final StreamsException e) {
+            // if we are shutting down, the consumer could throw interrupt exception which can be ignored;
+            // otherwise, we re-throw
+            if (!(e.getCause() instanceof InterruptException) || isRunning.get()) {
+                throw e;
+            }
+        } catch (final TimeoutException e) {
+            log.info("Encountered timeout when restoring states, will retry in the next loop");
+        }
+
+        // finally update completed changelogs
+        completedChangelogs.set(changelogReader.completedChangelogs());

Review comment:
       I would put extract this code into a method named `tryRestoreChangelogs()`.

##########
File path: streams/src/main/java/org/apache/kafka/streams/processor/internals/StateRestoreThread.java
##########
@@ -0,0 +1,316 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.streams.processor.internals;
+
+import org.apache.kafka.clients.admin.Admin;
+import org.apache.kafka.clients.consumer.Consumer;
+import org.apache.kafka.common.TopicPartition;
+import org.apache.kafka.common.errors.InterruptException;
+import org.apache.kafka.common.errors.TimeoutException;
+import org.apache.kafka.common.utils.LogContext;
+import org.apache.kafka.common.utils.Time;
+import org.apache.kafka.streams.StreamsConfig;
+import org.apache.kafka.streams.errors.StreamsException;
+import org.apache.kafka.streams.errors.TaskCorruptedException;
+import org.apache.kafka.streams.processor.StateRestoreListener;
+import org.apache.kafka.streams.processor.TaskId;
+import org.slf4j.Logger;
+
+import java.util.ArrayList;
+import java.util.Collection;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.Set;
+import java.util.concurrent.CountDownLatch;
+import java.util.concurrent.LinkedBlockingDeque;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.AtomicBoolean;
+import java.util.concurrent.atomic.AtomicReference;
+import java.util.stream.Collectors;
+
+
+/**
+ * This is the thread responsible for restoring state stores for both active and standby tasks
+ */
+public class StateRestoreThread extends Thread {
+
+    private final Time time;
+    private final Logger log;
+    private final ChangelogReader changelogReader;
+    private final AtomicBoolean isRunning = new AtomicBoolean(true);
+    private final CountDownLatch shutdownLatch = new CountDownLatch(1);
+    private final LinkedBlockingDeque<TaskItem> taskItemQueue;
+    private final AtomicReference<Set<TopicPartition>> completedChangelogs;
+    private final LinkedBlockingDeque<TaskCorruptedException> corruptedExceptions;
+    private final AtomicReference<RuntimeException> fatalException;
+
+    public boolean isRunning() {
+        return isRunning.get();
+    }
+
+    public StateRestoreThread(final Time time,
+                              final StreamsConfig config,
+                              final String threadClientId,
+                              final Admin adminClient,
+                              final String groupId,
+                              final Consumer<byte[], byte[]> restoreConsumer,
+                              final StateRestoreListener userStateRestoreListener) {
+        this(time, threadClientId, new StoreChangelogReader(time, config, threadClientId,
+                adminClient, groupId, restoreConsumer, userStateRestoreListener));
+    }
+
+    // for testing only
+    public StateRestoreThread(final Time time,
+                              final String threadClientId,
+                              final ChangelogReader changelogReader) {
+        super(threadClientId);
+
+        final String logPrefix = String.format("state-restore-thread [%s] ", threadClientId);
+        final LogContext logContext = new LogContext(logPrefix);
+
+        this.time = time;
+        this.log = logContext.logger(getClass());
+        this.taskItemQueue = new LinkedBlockingDeque<>();
+        this.fatalException = new AtomicReference<>();
+        this.corruptedExceptions = new LinkedBlockingDeque<>();
+        this.completedChangelogs = new AtomicReference<>(Collections.emptySet());
+
+        this.changelogReader = changelogReader;
+    }
+
+    private synchronized void waitIfAllChangelogsCompleted() {
+        final Set<TopicPartition> allChangelogs = changelogReader.allChangelogs();
+        if (allChangelogs.equals(changelogReader.completedChangelogs())) {
+            log.debug("All changelogs {} have completed restoration so far, will wait " +
+                    "until new changelogs are registered", allChangelogs);
+
+            while (isRunning.get() && taskItemQueue.isEmpty()) {
+                try {
+                    wait();
+                } catch (final InterruptedException e) {
+                    // do nothing
+                }
+            }
+        }
+    }
+
+    public synchronized void addInitializedTasks(final List<Task> tasks) {
+        if (!tasks.isEmpty()) {
+            for (final Task task: tasks) {
+                taskItemQueue.add(new TaskItem(task, ItemType.CREATE, task.changelogPartitions()));
+            }
+            notifyAll();
+        }
+    }
+
+    public synchronized void addClosedTasks(final Map<Task, Collection<TopicPartition>> tasks) {
+        if (!tasks.isEmpty()) {
+            for (final Map.Entry<Task, Collection<TopicPartition>> entry : tasks.entrySet()) {
+                taskItemQueue.add(new TaskItem(entry.getKey(), ItemType.CLOSE, entry.getValue()));
+            }
+            notifyAll();
+        }
+    }
+
+    public Set<TopicPartition> completedChangelogs() {
+        return completedChangelogs.get();
+    }
+
+    @Override
+    public void run() {
+        try {
+            while (isRunning()) {
+                runOnce();
+            }
+        } catch (final RuntimeException e) {
+            log.error("Encountered the following exception while restoring states " +
+                    "and the thread is going to shut down: ", e);
+
+            // we would not throw the exception from the restore thread
+            // but would need the main thread to get and throw it
+            fatalException.set(e);
+        } finally {
+            // if the thread is exiting due to exception,
+            // we would still set its running flag
+            isRunning.set(false);
+
+            try {
+                changelogReader.clear();
+            } catch (final Throwable e) {
+                log.error("Failed to close changelog reader due to the following error:", e);
+            }
+
+            shutdownLatch.countDown();
+        }
+    }
+
+    // Visible for testing
+    void runOnce() {
+        waitIfAllChangelogsCompleted();
+
+        if (!isRunning.get())
+            return;
+
+        // a task being recycled maybe in both closed and initialized tasks,
+        // and hence we should process the closed ones first and then initialized ones
+        final List<TaskItem> items = new ArrayList<>();
+        taskItemQueue.drainTo(items);
+
+        for (final TaskItem item : items) {
+            // TODO KAFKA-10575: we should consider also call the listener if the
+            //                   task is closed but not yet completed restoration
+            if (item.type == ItemType.CLOSE) {
+                changelogReader.unregister(item.changelogPartitions);
+
+                log.info("Unregistered changelogs {} for closing task {}",
+                        item.task.changelogPartitions(),
+                        item.task.id());
+            } else if (item.type == ItemType.CREATE) {
+                // we should only convert the state manager type right before re-registering the changelog
+                item.task.stateManager().maybeCompleteTaskTypeConversion();
+
+                for (final TopicPartition partition : item.changelogPartitions) {
+                    changelogReader.register(partition, item.task.stateManager());
+                }
+
+                log.info("Registered changelogs {} for created task {}",
+                        item.task.changelogPartitions(),
+                        item.task.id());
+            }
+        }
+        items.clear();

Review comment:
       I would extract this code into a method with a meaningful name.

##########
File path: streams/src/main/java/org/apache/kafka/streams/processor/internals/StoreChangelogReader.java
##########
@@ -55,16 +55,16 @@
 import static org.apache.kafka.streams.processor.internals.ClientUtils.fetchCommittedOffsets;
 
 /**
- * ChangelogReader is created and maintained by the stream thread and used for both updating standby tasks and
+ * ChangelogReader is created and shared by the stream thread and restore thread. It is used for both updating standby tasks and

Review comment:
       As far I can see, the store changelog manager is not shared but exclusively maintained by the restore thread.




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org