You are viewing a plain text version of this content. The canonical link for it is here.
Posted to jira@kafka.apache.org by GitBox <gi...@apache.org> on 2020/08/27 18:26:39 UTC

[GitHub] [kafka] guozhangwang commented on a change in pull request #8988: KAFKA-10199: Separate restore threads [WIP]

guozhangwang commented on a change in pull request #8988:
URL: https://github.com/apache/kafka/pull/8988#discussion_r478608933



##########
File path: streams/src/main/java/org/apache/kafka/streams/processor/internals/TaskManager.java
##########
@@ -411,7 +411,10 @@ boolean tryToCompleteRestoration() {
                 } else {
                     // we found a restoring task that isn't done restoring, which is evidence that
                     // not all tasks are running
+                    log.debug("Task {} has not completed restoration, will check next time", task.id());
+
                     allRunning = false;

Review comment:
       My plan is to modify the condition inside `StreamThread#runOnce` so that we can still process even if `state != RUNNING` (i.e. all tasks completed restoration) but as long as `taskManager#hasRunnableActiveTask`. Does that make sense?

##########
File path: streams/src/main/java/org/apache/kafka/streams/processor/internals/StoreChangelogReader.java
##########
@@ -365,72 +368,104 @@ private ChangelogMetadata restoringChangelogByPartition(final TopicPartition par
         return changelogMetadata;
     }
 
-    private Set<ChangelogMetadata> registeredChangelogs() {
+    private synchronized Set<ChangelogMetadata> offsetLimitChangelogs() {
+        return changelogs.entrySet().stream()
+                .filter(entry -> entry.getValue().stateManager.taskType() == Task.TaskType.STANDBY &&
+                        entry.getValue().stateManager.changelogAsSource(entry.getKey()))
+                .map(Map.Entry::getValue).collect(Collectors.toSet());
+    }
+
+    private synchronized Set<ChangelogMetadata> registeredChangelogs() {
         return changelogs.values().stream()
             .filter(metadata -> metadata.changelogState == ChangelogState.REGISTERED)
             .collect(Collectors.toSet());
     }
 
-    private Set<TopicPartition> restoringChangelogs() {
+    private synchronized Set<ChangelogMetadata> restoringChangelogs() {
         return changelogs.values().stream()
             .filter(metadata -> metadata.changelogState == ChangelogState.RESTORING)
-            .map(metadata -> metadata.storeMetadata.changelogPartition())
             .collect(Collectors.toSet());
     }
 
-    private Set<TopicPartition> activeRestoringChangelogs() {
+    private synchronized Set<TopicPartition> activeRestoringChangelogs() {
         return changelogs.values().stream()
             .filter(metadata -> metadata.changelogState == ChangelogState.RESTORING &&
                 metadata.stateManager.taskType() == Task.TaskType.ACTIVE)
             .map(metadata -> metadata.storeMetadata.changelogPartition())
             .collect(Collectors.toSet());
     }
 
-    private Set<TopicPartition> standbyRestoringChangelogs() {
+    private synchronized Set<TopicPartition> standbyRestoringChangelogs() {
         return changelogs.values().stream()
             .filter(metadata -> metadata.changelogState == ChangelogState.RESTORING &&
                 metadata.stateManager.taskType() == Task.TaskType.STANDBY)
             .map(metadata -> metadata.storeMetadata.changelogPartition())
             .collect(Collectors.toSet());
     }
 
-    private boolean allChangelogsCompleted() {
+    private synchronized Set<ChangelogMetadata> allChangelogs() {
+        // we need to make a shallow copy of this set for thread-safety
+        return new HashSet<>(changelogs.values());
+    }
+
+    private synchronized boolean allChangelogsCompleted() {
         return changelogs.values().stream()
             .allMatch(metadata -> metadata.changelogState == ChangelogState.COMPLETED);
     }
 
     @Override
-    public Set<TopicPartition> completedChangelogs() {
+    public synchronized Set<TopicPartition> completedChangelogs() {
         return changelogs.values().stream()
-            .filter(metadata -> metadata.changelogState == ChangelogState.COMPLETED)
-            .map(metadata -> metadata.storeMetadata.changelogPartition())
-            .collect(Collectors.toSet());
+                .filter(metadata -> metadata.changelogState == ChangelogState.COMPLETED)
+                .map(metadata -> metadata.storeMetadata.changelogPartition())
+                .collect(Collectors.toSet());
     }
 
-    // 1. if there are any registered changelogs that needs initialization, try to initialize them first;
-    // 2. if all changelogs have finished, return early;
-    // 3. if there are any restoring changelogs, try to read from the restore consumer and process them.
-    public void restore() {
-        initializeChangelogs(registeredChangelogs());
+    /**
+     * 1. if there are any registered changelogs that needs initialization, try to initialize them first;
+     * 2. if all changelogs have finished, return early;
+     * 3. if there are any restoring changelogs, try to read from the restore consumer and process them.
+     *
+     * @throws StreamsException       If there are unexpected exception thrown during the restoration
+     * @throws TaskCorruptedException If the changelog has been truncated while restoration is still on-going
+     */
+    public int restore() {
+        final ChangelogReaderState currentState;
+        synchronized (this) {
+            while (allChangelogsCompleted()) {
+                log.debug("All changelogs {} have completed restoration so far, will wait " +
+                        "until new changelogs are registered", changelogs.keySet());
+
+                try {
+                    wait();
+                } catch (final InterruptedException e) {
+                    log.trace("Interrupted with updated changelogs {}", changelogs.keySet());
+                }
+            }
+
+            currentState = state;
+        }
+
+        initializeChangelogs(currentState, registeredChangelogs());
 
-        if (!activeRestoringChangelogs().isEmpty() && state == ChangelogReaderState.STANDBY_UPDATING) {
+        if (!activeRestoringChangelogs().isEmpty() && currentState == ChangelogReaderState.STANDBY_UPDATING) {

Review comment:
       Actually in this PR I'm indeed moving both standby updating and active restoring to the new thread, so that active thread is ONLY responsible for processing active tasks. Do you find it is not the intended change in this PR?

##########
File path: streams/src/main/java/org/apache/kafka/streams/processor/internals/StateRestoreThread.java
##########
@@ -0,0 +1,121 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.streams.processor.internals;
+
+import org.apache.kafka.common.errors.TimeoutException;
+import org.apache.kafka.common.utils.LogContext;
+import org.apache.kafka.common.utils.Time;
+import org.apache.kafka.streams.StreamsConfig;
+import org.apache.kafka.streams.errors.TaskCorruptedException;
+import org.slf4j.Logger;
+
+import java.util.Collection;
+import java.util.concurrent.ConcurrentLinkedDeque;
+import java.util.concurrent.CountDownLatch;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.AtomicBoolean;
+import java.util.stream.Collectors;
+
+
+/**
+ * This is the thread responsible for restoring state stores for both active and standby tasks
+ */
+public class StateRestoreThread extends Thread {
+
+    private final Time time;
+    private final Logger log;
+    private final StreamsConfig config;
+    private final ChangelogReader changelogReader;
+    private final AtomicBoolean isRunning = new AtomicBoolean(true);
+    private final CountDownLatch shutdownLatch = new CountDownLatch(1);
+    private final ConcurrentLinkedDeque<TaskCorruptedException> corruptedExceptions;
+
+    public boolean isRunning() {
+        return isRunning.get();
+    }
+
+    public StateRestoreThread(final Time time,
+                              final StreamsConfig config,
+                              final String threadClientId,
+                              final ChangelogReader changelogReader) {
+        super(threadClientId);
+        this.time = time;
+        this.config = config;
+        this.changelogReader = changelogReader;
+        this.corruptedExceptions = new ConcurrentLinkedDeque<>();
+
+        final String logPrefix = String.format("state-restore-thread [%s] ", threadClientId);
+        final LogContext logContext = new LogContext(logPrefix);
+        this.log = logContext.logger(getClass());
+    }
+
+    @Override
+    public void run() {
+        try {
+            while (isRunning()) {
+                final long startMs = time.milliseconds();
+
+                try {
+                    // try to restore some changelogs, if there's nothing to restore it would wait inside this call
+                    final int numRestored = changelogReader.restore();
+                    // TODO: we should record the restoration related metrics
+                    log.info("Restored {} records in {} ms", numRestored, time.milliseconds() - startMs);
+                } catch (final TaskCorruptedException e) {
+                    log.warn("Detected the states of tasks " + e.corruptedTaskWithChangelogs() + " are corrupted. " +
+                            "Will close the task as dirty and re-create and bootstrap from scratch.", e);
+
+                    // remove corrupted partitions form the changelog reader and continue; we can still proceed
+                    // and restore other partitions until the main thread come to handle this exception
+                    changelogReader.unregister(e.corruptedTaskWithChangelogs().values().stream()

Review comment:
       I think that's a fair point --- all I care here is letting the changelogReader continue without needing to fetch on this partition any more, since the handling of the exception is on the main thread and hence async to this thread, we do not want to keep getting the same exception again and again.
   
   Let me think how I can refactor this piece.

##########
File path: streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamThread.java
##########
@@ -659,13 +665,12 @@ void runOnce() {
             }
         }
 
-        // we can always let changelog reader try restoring in order to initialize the changelogs;
-        // if there's no active restoring or standby updating it would not try to fetch any data
-        changelogReader.restore();
-
-        // TODO: we should record the restore latency and its relative time spent ratio after
-        //       we figure out how to move this method out of the stream thread
-        advanceNowAndComputeLatency();
+        // check if restore thread has encountered TaskCorrupted exception; if yes
+        // rethrow it to trigger the handling logic
+        final TaskCorruptedException e = restoreThread.nextCorruptedException();

Review comment:
       Make sense, will try to address this in this PR.

##########
File path: streams/src/main/java/org/apache/kafka/streams/processor/internals/StoreChangelogReader.java
##########
@@ -365,72 +368,104 @@ private ChangelogMetadata restoringChangelogByPartition(final TopicPartition par
         return changelogMetadata;
     }
 
-    private Set<ChangelogMetadata> registeredChangelogs() {
+    private synchronized Set<ChangelogMetadata> offsetLimitChangelogs() {
+        return changelogs.entrySet().stream()
+                .filter(entry -> entry.getValue().stateManager.taskType() == Task.TaskType.STANDBY &&
+                        entry.getValue().stateManager.changelogAsSource(entry.getKey()))
+                .map(Map.Entry::getValue).collect(Collectors.toSet());
+    }
+
+    private synchronized Set<ChangelogMetadata> registeredChangelogs() {
         return changelogs.values().stream()
             .filter(metadata -> metadata.changelogState == ChangelogState.REGISTERED)
             .collect(Collectors.toSet());
     }
 
-    private Set<TopicPartition> restoringChangelogs() {
+    private synchronized Set<ChangelogMetadata> restoringChangelogs() {
         return changelogs.values().stream()
             .filter(metadata -> metadata.changelogState == ChangelogState.RESTORING)
-            .map(metadata -> metadata.storeMetadata.changelogPartition())
             .collect(Collectors.toSet());
     }
 
-    private Set<TopicPartition> activeRestoringChangelogs() {
+    private synchronized Set<TopicPartition> activeRestoringChangelogs() {
         return changelogs.values().stream()
             .filter(metadata -> metadata.changelogState == ChangelogState.RESTORING &&
                 metadata.stateManager.taskType() == Task.TaskType.ACTIVE)
             .map(metadata -> metadata.storeMetadata.changelogPartition())
             .collect(Collectors.toSet());
     }
 
-    private Set<TopicPartition> standbyRestoringChangelogs() {
+    private synchronized Set<TopicPartition> standbyRestoringChangelogs() {
         return changelogs.values().stream()
             .filter(metadata -> metadata.changelogState == ChangelogState.RESTORING &&
                 metadata.stateManager.taskType() == Task.TaskType.STANDBY)
             .map(metadata -> metadata.storeMetadata.changelogPartition())
             .collect(Collectors.toSet());
     }
 
-    private boolean allChangelogsCompleted() {
+    private synchronized Set<ChangelogMetadata> allChangelogs() {
+        // we need to make a shallow copy of this set for thread-safety
+        return new HashSet<>(changelogs.values());
+    }
+
+    private synchronized boolean allChangelogsCompleted() {
         return changelogs.values().stream()
             .allMatch(metadata -> metadata.changelogState == ChangelogState.COMPLETED);
     }
 
     @Override
-    public Set<TopicPartition> completedChangelogs() {
+    public synchronized Set<TopicPartition> completedChangelogs() {
         return changelogs.values().stream()
-            .filter(metadata -> metadata.changelogState == ChangelogState.COMPLETED)
-            .map(metadata -> metadata.storeMetadata.changelogPartition())
-            .collect(Collectors.toSet());
+                .filter(metadata -> metadata.changelogState == ChangelogState.COMPLETED)
+                .map(metadata -> metadata.storeMetadata.changelogPartition())
+                .collect(Collectors.toSet());
     }
 
-    // 1. if there are any registered changelogs that needs initialization, try to initialize them first;
-    // 2. if all changelogs have finished, return early;
-    // 3. if there are any restoring changelogs, try to read from the restore consumer and process them.
-    public void restore() {
-        initializeChangelogs(registeredChangelogs());
+    /**
+     * 1. if there are any registered changelogs that needs initialization, try to initialize them first;
+     * 2. if all changelogs have finished, return early;
+     * 3. if there are any restoring changelogs, try to read from the restore consumer and process them.
+     *
+     * @throws StreamsException       If there are unexpected exception thrown during the restoration
+     * @throws TaskCorruptedException If the changelog has been truncated while restoration is still on-going
+     */
+    public int restore() {
+        final ChangelogReaderState currentState;
+        synchronized (this) {
+            while (allChangelogsCompleted()) {
+                log.debug("All changelogs {} have completed restoration so far, will wait " +
+                        "until new changelogs are registered", changelogs.keySet());
+
+                try {
+                    wait();
+                } catch (final InterruptedException e) {
+                    log.trace("Interrupted with updated changelogs {}", changelogs.keySet());
+                }
+            }
+
+            currentState = state;
+        }
+
+        initializeChangelogs(currentState, registeredChangelogs());
 
-        if (!activeRestoringChangelogs().isEmpty() && state == ChangelogReaderState.STANDBY_UPDATING) {
+        if (!activeRestoringChangelogs().isEmpty() && currentState == ChangelogReaderState.STANDBY_UPDATING) {
             throw new IllegalStateException("Should not be in standby updating state if there are still un-completed active changelogs");
         }
 
-        if (allChangelogsCompleted()) {
-            log.debug("Finished restoring all changelogs {}", changelogs.keySet());
-            return;
+        // we would pause or resume the partitions for standbys depending on the state, this operation is idempotent
+        if (currentState == ChangelogReaderState.ACTIVE_RESTORING) {
+            pauseChangelogsFromRestoreConsumer(standbyRestoringChangelogs());
+        } else {
+            resumeChangelogsFromRestoreConsumer(standbyRestoringChangelogs());
         }
 
-        final Set<TopicPartition> restoringChangelogs = restoringChangelogs();
+        final Set<ChangelogMetadata> restoringChangelogs = restoringChangelogs();
+        int totalRecordsRestored = 0;
         if (!restoringChangelogs.isEmpty()) {
             final ConsumerRecords<byte[], byte[]> polledRecords;
 
             try {
-                // for restoring active and updating standby we may prefer different poll time
-                // in order to make sure we call the main consumer#poll in time.
-                // TODO: once we move ChangelogReader to a separate thread this may no longer be a concern
-                polledRecords = restoreConsumer.poll(state == ChangelogReaderState.STANDBY_UPDATING ? Duration.ZERO : pollTime);

Review comment:
       We are actually moving the processing of the standbys to the other threads as well. So we do not need different poll times.

##########
File path: streams/src/main/java/org/apache/kafka/streams/processor/internals/StoreChangelogReader.java
##########
@@ -310,18 +313,14 @@ public void enforceRestoreActive() {
     // be cleared when the corresponding task is being removed from the thread. In other words, the restore consumer
     // should contain all changelogs that are RESTORING or COMPLETED
     @Override
-    public void transitToUpdateStandby() {
+    public synchronized void transitToUpdateStandby() {

Review comment:
       The ultimate plan, which is also the goal of this PR, is to move both processing of standbys / restoring of actives, which are both captured in the ChangelogReader, to the same separate thread out of the main thread. Do you see a concern with this plan?




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org