You are viewing a plain text version of this content. The canonical link for it is here.
Posted to jira@kafka.apache.org by GitBox <gi...@apache.org> on 2020/07/23 23:08:42 UTC

[GitHub] [kafka] ableegoldman commented on a change in pull request #8988: KAFKA-10199: Separate restore threads [WIP]

ableegoldman commented on a change in pull request #8988:
URL: https://github.com/apache/kafka/pull/8988#discussion_r459768271



##########
File path: streams/src/main/java/org/apache/kafka/streams/processor/internals/StoreChangelogReader.java
##########
@@ -365,72 +368,104 @@ private ChangelogMetadata restoringChangelogByPartition(final TopicPartition par
         return changelogMetadata;
     }
 
-    private Set<ChangelogMetadata> registeredChangelogs() {
+    private synchronized Set<ChangelogMetadata> offsetLimitChangelogs() {
+        return changelogs.entrySet().stream()
+                .filter(entry -> entry.getValue().stateManager.taskType() == Task.TaskType.STANDBY &&
+                        entry.getValue().stateManager.changelogAsSource(entry.getKey()))
+                .map(Map.Entry::getValue).collect(Collectors.toSet());
+    }
+
+    private synchronized Set<ChangelogMetadata> registeredChangelogs() {
         return changelogs.values().stream()
             .filter(metadata -> metadata.changelogState == ChangelogState.REGISTERED)
             .collect(Collectors.toSet());
     }
 
-    private Set<TopicPartition> restoringChangelogs() {
+    private synchronized Set<ChangelogMetadata> restoringChangelogs() {
         return changelogs.values().stream()
             .filter(metadata -> metadata.changelogState == ChangelogState.RESTORING)
-            .map(metadata -> metadata.storeMetadata.changelogPartition())
             .collect(Collectors.toSet());
     }
 
-    private Set<TopicPartition> activeRestoringChangelogs() {
+    private synchronized Set<TopicPartition> activeRestoringChangelogs() {
         return changelogs.values().stream()
             .filter(metadata -> metadata.changelogState == ChangelogState.RESTORING &&
                 metadata.stateManager.taskType() == Task.TaskType.ACTIVE)
             .map(metadata -> metadata.storeMetadata.changelogPartition())
             .collect(Collectors.toSet());
     }
 
-    private Set<TopicPartition> standbyRestoringChangelogs() {
+    private synchronized Set<TopicPartition> standbyRestoringChangelogs() {
         return changelogs.values().stream()
             .filter(metadata -> metadata.changelogState == ChangelogState.RESTORING &&
                 metadata.stateManager.taskType() == Task.TaskType.STANDBY)
             .map(metadata -> metadata.storeMetadata.changelogPartition())
             .collect(Collectors.toSet());
     }
 
-    private boolean allChangelogsCompleted() {
+    private synchronized Set<ChangelogMetadata> allChangelogs() {
+        // we need to make a shallow copy of this set for thread-safety
+        return new HashSet<>(changelogs.values());
+    }
+
+    private synchronized boolean allChangelogsCompleted() {
         return changelogs.values().stream()
             .allMatch(metadata -> metadata.changelogState == ChangelogState.COMPLETED);
     }
 
     @Override
-    public Set<TopicPartition> completedChangelogs() {
+    public synchronized Set<TopicPartition> completedChangelogs() {
         return changelogs.values().stream()
-            .filter(metadata -> metadata.changelogState == ChangelogState.COMPLETED)
-            .map(metadata -> metadata.storeMetadata.changelogPartition())
-            .collect(Collectors.toSet());
+                .filter(metadata -> metadata.changelogState == ChangelogState.COMPLETED)
+                .map(metadata -> metadata.storeMetadata.changelogPartition())
+                .collect(Collectors.toSet());
     }
 
-    // 1. if there are any registered changelogs that needs initialization, try to initialize them first;
-    // 2. if all changelogs have finished, return early;
-    // 3. if there are any restoring changelogs, try to read from the restore consumer and process them.
-    public void restore() {
-        initializeChangelogs(registeredChangelogs());
+    /**
+     * 1. if there are any registered changelogs that needs initialization, try to initialize them first;
+     * 2. if all changelogs have finished, return early;
+     * 3. if there are any restoring changelogs, try to read from the restore consumer and process them.
+     *
+     * @throws StreamsException       If there are unexpected exception thrown during the restoration
+     * @throws TaskCorruptedException If the changelog has been truncated while restoration is still on-going
+     */
+    public int restore() {
+        final ChangelogReaderState currentState;
+        synchronized (this) {
+            while (allChangelogsCompleted()) {
+                log.debug("All changelogs {} have completed restoration so far, will wait " +
+                        "until new changelogs are registered", changelogs.keySet());
+
+                try {
+                    wait();
+                } catch (final InterruptedException e) {
+                    log.trace("Interrupted with updated changelogs {}", changelogs.keySet());
+                }
+            }
+
+            currentState = state;
+        }
+
+        initializeChangelogs(currentState, registeredChangelogs());
 
-        if (!activeRestoringChangelogs().isEmpty() && state == ChangelogReaderState.STANDBY_UPDATING) {
+        if (!activeRestoringChangelogs().isEmpty() && currentState == ChangelogReaderState.STANDBY_UPDATING) {

Review comment:
       Why not let the StreamThread process some standbys while the restore thread does its restoring? Not sure if we plan to ultimately move standbys to a new thread, or to share with the restore thread, but it seems like we shouldn't block them on restoration or we're missing out on a huge piece of the available improvement.
   
   Especially with KIP-441 where for most tasks, the majority of restoration will actually occur as a standby and not with actual restoration. 

##########
File path: streams/src/main/java/org/apache/kafka/streams/processor/internals/TaskManager.java
##########
@@ -411,7 +411,10 @@ boolean tryToCompleteRestoration() {
                 } else {
                     // we found a restoring task that isn't done restoring, which is evidence that
                     // not all tasks are running
+                    log.debug("Task {} has not completed restoration, will check next time", task.id());
+
                     allRunning = false;

Review comment:
       Don't we want to modify the TaskManager so the StreamThread doesn't have to wait for all tasks to finish restoring? It should be able to start processing any active tasks as soon as they finish restoring in the other thread. 

##########
File path: streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamThread.java
##########
@@ -659,13 +665,12 @@ void runOnce() {
             }
         }
 
-        // we can always let changelog reader try restoring in order to initialize the changelogs;
-        // if there's no active restoring or standby updating it would not try to fetch any data
-        changelogReader.restore();
-
-        // TODO: we should record the restore latency and its relative time spent ratio after
-        //       we figure out how to move this method out of the stream thread
-        advanceNowAndComputeLatency();
+        // check if restore thread has encountered TaskCorrupted exception; if yes
+        // rethrow it to trigger the handling logic
+        final TaskCorruptedException e = restoreThread.nextCorruptedException();

Review comment:
       Might be nice to gather and handle all TaskCorruptedExceptions at once rather than one per loop like this, especially since each one likely involves committing all tasks (and I would imagine that with EOS, when we get one TaskCorrupted we are likely to  also have more). That can definitely be follow-on work, just putting it out there

##########
File path: streams/src/main/java/org/apache/kafka/streams/processor/internals/StoreChangelogReader.java
##########
@@ -365,72 +368,104 @@ private ChangelogMetadata restoringChangelogByPartition(final TopicPartition par
         return changelogMetadata;
     }
 
-    private Set<ChangelogMetadata> registeredChangelogs() {
+    private synchronized Set<ChangelogMetadata> offsetLimitChangelogs() {
+        return changelogs.entrySet().stream()
+                .filter(entry -> entry.getValue().stateManager.taskType() == Task.TaskType.STANDBY &&
+                        entry.getValue().stateManager.changelogAsSource(entry.getKey()))
+                .map(Map.Entry::getValue).collect(Collectors.toSet());
+    }
+
+    private synchronized Set<ChangelogMetadata> registeredChangelogs() {
         return changelogs.values().stream()
             .filter(metadata -> metadata.changelogState == ChangelogState.REGISTERED)
             .collect(Collectors.toSet());
     }
 
-    private Set<TopicPartition> restoringChangelogs() {
+    private synchronized Set<ChangelogMetadata> restoringChangelogs() {
         return changelogs.values().stream()
             .filter(metadata -> metadata.changelogState == ChangelogState.RESTORING)
-            .map(metadata -> metadata.storeMetadata.changelogPartition())
             .collect(Collectors.toSet());
     }
 
-    private Set<TopicPartition> activeRestoringChangelogs() {
+    private synchronized Set<TopicPartition> activeRestoringChangelogs() {
         return changelogs.values().stream()
             .filter(metadata -> metadata.changelogState == ChangelogState.RESTORING &&
                 metadata.stateManager.taskType() == Task.TaskType.ACTIVE)
             .map(metadata -> metadata.storeMetadata.changelogPartition())
             .collect(Collectors.toSet());
     }
 
-    private Set<TopicPartition> standbyRestoringChangelogs() {
+    private synchronized Set<TopicPartition> standbyRestoringChangelogs() {
         return changelogs.values().stream()
             .filter(metadata -> metadata.changelogState == ChangelogState.RESTORING &&
                 metadata.stateManager.taskType() == Task.TaskType.STANDBY)
             .map(metadata -> metadata.storeMetadata.changelogPartition())
             .collect(Collectors.toSet());
     }
 
-    private boolean allChangelogsCompleted() {
+    private synchronized Set<ChangelogMetadata> allChangelogs() {
+        // we need to make a shallow copy of this set for thread-safety
+        return new HashSet<>(changelogs.values());
+    }
+
+    private synchronized boolean allChangelogsCompleted() {
         return changelogs.values().stream()
             .allMatch(metadata -> metadata.changelogState == ChangelogState.COMPLETED);
     }
 
     @Override
-    public Set<TopicPartition> completedChangelogs() {
+    public synchronized Set<TopicPartition> completedChangelogs() {
         return changelogs.values().stream()
-            .filter(metadata -> metadata.changelogState == ChangelogState.COMPLETED)
-            .map(metadata -> metadata.storeMetadata.changelogPartition())
-            .collect(Collectors.toSet());
+                .filter(metadata -> metadata.changelogState == ChangelogState.COMPLETED)
+                .map(metadata -> metadata.storeMetadata.changelogPartition())
+                .collect(Collectors.toSet());
     }
 
-    // 1. if there are any registered changelogs that needs initialization, try to initialize them first;
-    // 2. if all changelogs have finished, return early;
-    // 3. if there are any restoring changelogs, try to read from the restore consumer and process them.
-    public void restore() {
-        initializeChangelogs(registeredChangelogs());
+    /**
+     * 1. if there are any registered changelogs that needs initialization, try to initialize them first;
+     * 2. if all changelogs have finished, return early;
+     * 3. if there are any restoring changelogs, try to read from the restore consumer and process them.
+     *
+     * @throws StreamsException       If there are unexpected exception thrown during the restoration
+     * @throws TaskCorruptedException If the changelog has been truncated while restoration is still on-going
+     */
+    public int restore() {
+        final ChangelogReaderState currentState;
+        synchronized (this) {
+            while (allChangelogsCompleted()) {
+                log.debug("All changelogs {} have completed restoration so far, will wait " +
+                        "until new changelogs are registered", changelogs.keySet());
+
+                try {
+                    wait();
+                } catch (final InterruptedException e) {
+                    log.trace("Interrupted with updated changelogs {}", changelogs.keySet());
+                }
+            }
+
+            currentState = state;
+        }
+
+        initializeChangelogs(currentState, registeredChangelogs());
 
-        if (!activeRestoringChangelogs().isEmpty() && state == ChangelogReaderState.STANDBY_UPDATING) {
+        if (!activeRestoringChangelogs().isEmpty() && currentState == ChangelogReaderState.STANDBY_UPDATING) {
             throw new IllegalStateException("Should not be in standby updating state if there are still un-completed active changelogs");
         }
 
-        if (allChangelogsCompleted()) {
-            log.debug("Finished restoring all changelogs {}", changelogs.keySet());
-            return;
+        // we would pause or resume the partitions for standbys depending on the state, this operation is idempotent
+        if (currentState == ChangelogReaderState.ACTIVE_RESTORING) {
+            pauseChangelogsFromRestoreConsumer(standbyRestoringChangelogs());
+        } else {
+            resumeChangelogsFromRestoreConsumer(standbyRestoringChangelogs());
         }
 
-        final Set<TopicPartition> restoringChangelogs = restoringChangelogs();
+        final Set<ChangelogMetadata> restoringChangelogs = restoringChangelogs();
+        int totalRecordsRestored = 0;
         if (!restoringChangelogs.isEmpty()) {
             final ConsumerRecords<byte[], byte[]> polledRecords;
 
             try {
-                // for restoring active and updating standby we may prefer different poll time
-                // in order to make sure we call the main consumer#poll in time.
-                // TODO: once we move ChangelogReader to a separate thread this may no longer be a concern
-                polledRecords = restoreConsumer.poll(state == ChangelogReaderState.STANDBY_UPDATING ? Duration.ZERO : pollTime);

Review comment:
       Why did we remove this logic? AFAICT standbys are still processed in the main StreamThread and the whole point of this was to make sure we don't block active processing on polling standby tasks.
   
   But I wasn't paying close attention to this particular issue/PR so I may have misremembered or misunderstood

##########
File path: streams/src/main/java/org/apache/kafka/streams/processor/internals/StateRestoreThread.java
##########
@@ -0,0 +1,121 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.streams.processor.internals;
+
+import org.apache.kafka.common.errors.TimeoutException;
+import org.apache.kafka.common.utils.LogContext;
+import org.apache.kafka.common.utils.Time;
+import org.apache.kafka.streams.StreamsConfig;
+import org.apache.kafka.streams.errors.TaskCorruptedException;
+import org.slf4j.Logger;
+
+import java.util.Collection;
+import java.util.concurrent.ConcurrentLinkedDeque;
+import java.util.concurrent.CountDownLatch;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.AtomicBoolean;
+import java.util.stream.Collectors;
+
+
+/**
+ * This is the thread responsible for restoring state stores for both active and standby tasks
+ */
+public class StateRestoreThread extends Thread {
+
+    private final Time time;
+    private final Logger log;
+    private final StreamsConfig config;
+    private final ChangelogReader changelogReader;
+    private final AtomicBoolean isRunning = new AtomicBoolean(true);
+    private final CountDownLatch shutdownLatch = new CountDownLatch(1);
+    private final ConcurrentLinkedDeque<TaskCorruptedException> corruptedExceptions;
+
+    public boolean isRunning() {
+        return isRunning.get();
+    }
+
+    public StateRestoreThread(final Time time,
+                              final StreamsConfig config,
+                              final String threadClientId,
+                              final ChangelogReader changelogReader) {
+        super(threadClientId);
+        this.time = time;
+        this.config = config;
+        this.changelogReader = changelogReader;
+        this.corruptedExceptions = new ConcurrentLinkedDeque<>();
+
+        final String logPrefix = String.format("state-restore-thread [%s] ", threadClientId);
+        final LogContext logContext = new LogContext(logPrefix);
+        this.log = logContext.logger(getClass());
+    }
+
+    @Override
+    public void run() {
+        try {
+            while (isRunning()) {
+                final long startMs = time.milliseconds();
+
+                try {
+                    // try to restore some changelogs, if there's nothing to restore it would wait inside this call
+                    final int numRestored = changelogReader.restore();
+                    // TODO: we should record the restoration related metrics
+                    log.info("Restored {} records in {} ms", numRestored, time.milliseconds() - startMs);
+                } catch (final TaskCorruptedException e) {
+                    log.warn("Detected the states of tasks " + e.corruptedTaskWithChangelogs() + " are corrupted. " +
+                            "Will close the task as dirty and re-create and bootstrap from scratch.", e);
+
+                    // remove corrupted partitions form the changelog reader and continue; we can still proceed
+                    // and restore other partitions until the main thread come to handle this exception
+                    changelogReader.unregister(e.corruptedTaskWithChangelogs().values().stream()

Review comment:
       I'm not sure this is strictly incorrect since unregistering is idempotent, but it certainly seems unnecessary. Why do we need to unregister the changelogs here? We do so in `ProcessorStateManager`'s `close` which should be called in `handleCorruption` from the main thread. 
   I'm also generally against unregistering something so far away from where it gets registered 🙂 




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org