You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@kafka.apache.org by ab...@apache.org on 2022/01/25 18:39:57 UTC

[kafka] branch trunk updated: KAFKA-12648: invoke exception handler for MissingSourceTopicException with named topologies (#11686)

This is an automated email from the ASF dual-hosted git repository.

ableegoldman pushed a commit to branch trunk
in repository https://gitbox.apache.org/repos/asf/kafka.git


The following commit(s) were added to refs/heads/trunk by this push:
     new 9d602a0  KAFKA-12648: invoke exception handler for MissingSourceTopicException with named topologies (#11686)
9d602a0 is described below

commit 9d602a01beb0a9467c43b9ee0b80ff53fdd2dbb4
Author: A. Sophie Blee-Goldman <so...@confluent.io>
AuthorDate: Tue Jan 25 10:37:35 2022 -0800

    KAFKA-12648: invoke exception handler for MissingSourceTopicException with named topologies (#11686)
    
    Followup to #11600 to invoke the streams exception handler on the MissingSourceTopicException, without killing/replacing the thread
    
    Reviewers: Guozhang Wang <gu...@confluent.io>, Bruno Cadonna <ca...@confluent.io>
---
 .../org/apache/kafka/streams/KafkaStreams.java     |  40 +--
 .../streams/processor/internals/ClientUtils.java   |   5 +
 .../processor/internals/RepartitionTopics.java     |  95 ++++---
 .../streams/processor/internals/StreamThread.java  |  25 +-
 .../internals/StreamsPartitionAssignor.java        |  11 +-
 .../processor/internals/TopologyMetadata.java      |  29 ++-
 .../internals/assignment/ReferenceContainer.java   |   4 +
 .../integration/NamedTopologyIntegrationTest.java  |  94 ++++++-
 .../processor/internals/RepartitionTopicsTest.java |  29 ++-
 .../processor/internals/StreamThreadTest.java      | 280 +++++----------------
 10 files changed, 314 insertions(+), 298 deletions(-)

diff --git a/streams/src/main/java/org/apache/kafka/streams/KafkaStreams.java b/streams/src/main/java/org/apache/kafka/streams/KafkaStreams.java
index ad56c7e..dc08d91 100644
--- a/streams/src/main/java/org/apache/kafka/streams/KafkaStreams.java
+++ b/streams/src/main/java/org/apache/kafka/streams/KafkaStreams.java
@@ -73,11 +73,13 @@ import org.apache.kafka.streams.state.HostInfo;
 import org.apache.kafka.streams.state.internals.GlobalStateStoreProvider;
 import org.apache.kafka.streams.state.internals.QueryableStoreProvider;
 import org.apache.kafka.streams.state.internals.StreamThreadStateStoreProvider;
+
 import org.slf4j.Logger;
 
 import java.time.Duration;
 import java.util.ArrayList;
 import java.util.Arrays;
+import java.util.function.BiConsumer;
 import java.util.Collection;
 import java.util.Collections;
 import java.util.HashMap;
@@ -182,7 +184,7 @@ public class KafkaStreams implements AutoCloseable {
     private KafkaStreams.StateListener stateListener;
     private StateRestoreListener globalStateRestoreListener;
     private boolean oldHandler;
-    private java.util.function.Consumer<Throwable> streamsUncaughtExceptionHandler;
+    private BiConsumer<Throwable, Boolean> streamsUncaughtExceptionHandler;
     private final Object changeThreadCount = new Object();
 
     // container states
@@ -452,19 +454,22 @@ public class KafkaStreams implements AutoCloseable {
      * Note, this handler must be threadsafe, since it will be shared among all threads, and invoked from any
      * thread that encounters such an exception.
      *
-     * @param streamsUncaughtExceptionHandler the uncaught exception handler of type {@link StreamsUncaughtExceptionHandler} for all internal threads
+     * @param userStreamsUncaughtExceptionHandler the uncaught exception handler of type {@link StreamsUncaughtExceptionHandler} for all internal threads
      * @throws IllegalStateException if this {@code KafkaStreams} instance has already been started.
-     * @throws NullPointerException if streamsUncaughtExceptionHandler is null.
+     * @throws NullPointerException if userStreamsUncaughtExceptionHandler is null.
      */
-    public void setUncaughtExceptionHandler(final StreamsUncaughtExceptionHandler streamsUncaughtExceptionHandler) {
-        final Consumer<Throwable> handler = exception -> handleStreamsUncaughtException(exception, streamsUncaughtExceptionHandler);
+    public void setUncaughtExceptionHandler(final StreamsUncaughtExceptionHandler userStreamsUncaughtExceptionHandler) {
         synchronized (stateLock) {
             if (state.hasNotStarted()) {
-                this.streamsUncaughtExceptionHandler = handler;
-                Objects.requireNonNull(streamsUncaughtExceptionHandler);
-                processStreamThread(thread -> thread.setStreamsUncaughtExceptionHandler(handler));
+                Objects.requireNonNull(userStreamsUncaughtExceptionHandler);
+                streamsUncaughtExceptionHandler =
+                    (exception, skipThreadReplacement) ->
+                        handleStreamsUncaughtException(exception, userStreamsUncaughtExceptionHandler, skipThreadReplacement);
+                processStreamThread(thread -> thread.setStreamsUncaughtExceptionHandler(streamsUncaughtExceptionHandler));
                 if (globalStreamThread != null) {
-                    globalStreamThread.setUncaughtExceptionHandler(handler);
+                    globalStreamThread.setUncaughtExceptionHandler(
+                        exception -> handleStreamsUncaughtException(exception, userStreamsUncaughtExceptionHandler, false)
+                    );
                 }
             } else {
                 throw new IllegalStateException("Can only set UncaughtExceptionHandler before calling start(). " +
@@ -473,7 +478,7 @@ public class KafkaStreams implements AutoCloseable {
         }
     }
 
-    private void defaultStreamsUncaughtExceptionHandler(final Throwable throwable) {
+    private void defaultStreamsUncaughtExceptionHandler(final Throwable throwable, final boolean skipThreadReplacement) {
         if (oldHandler) {
             threads.remove(Thread.currentThread());
             if (throwable instanceof RuntimeException) {
@@ -484,7 +489,7 @@ public class KafkaStreams implements AutoCloseable {
                 throw new RuntimeException("Unexpected checked exception caught in the uncaught exception handler", throwable);
             }
         } else {
-            handleStreamsUncaughtException(throwable, t -> SHUTDOWN_CLIENT);
+            handleStreamsUncaughtException(throwable, t -> SHUTDOWN_CLIENT, skipThreadReplacement);
         }
     }
 
@@ -523,7 +528,8 @@ public class KafkaStreams implements AutoCloseable {
     }
 
     private void handleStreamsUncaughtException(final Throwable throwable,
-                                                final StreamsUncaughtExceptionHandler streamsUncaughtExceptionHandler) {
+                                                final StreamsUncaughtExceptionHandler streamsUncaughtExceptionHandler,
+                                                final boolean skipThreadReplacement) {
         final StreamsUncaughtExceptionHandler.StreamThreadExceptionResponse action = getActionForThrowable(throwable, streamsUncaughtExceptionHandler);
         if (oldHandler) {
             log.warn("Stream's new uncaught exception handler is set as well as the deprecated old handler." +
@@ -531,8 +537,12 @@ public class KafkaStreams implements AutoCloseable {
         }
         switch (action) {
             case REPLACE_THREAD:
-                log.error("Replacing thread in the streams uncaught exception handler", throwable);
-                replaceStreamThread(throwable);
+                if (!skipThreadReplacement) {
+                    log.error("Replacing thread in the streams uncaught exception handler", throwable);
+                    replaceStreamThread(throwable);
+                } else {
+                    log.debug("Skipping thread replacement for recoverable error");
+                }
                 break;
             case SHUTDOWN_CLIENT:
                 log.error("Encountered the following exception during processing " +
@@ -940,7 +950,7 @@ public class KafkaStreams implements AutoCloseable {
                 time,
                 globalThreadId,
                 delegatingStateRestoreListener,
-                streamsUncaughtExceptionHandler
+                exception -> defaultStreamsUncaughtExceptionHandler(exception, false)
             );
             globalThreadState = globalStreamThread.state();
         }
diff --git a/streams/src/main/java/org/apache/kafka/streams/processor/internals/ClientUtils.java b/streams/src/main/java/org/apache/kafka/streams/processor/internals/ClientUtils.java
index a5807b5..b47b68f 100644
--- a/streams/src/main/java/org/apache/kafka/streams/processor/internals/ClientUtils.java
+++ b/streams/src/main/java/org/apache/kafka/streams/processor/internals/ClientUtils.java
@@ -161,4 +161,9 @@ public class ClientUtils {
         }
         return getEndOffsets(fetchEndOffsetsFuture(partitions, adminClient));
     }
+
+    public static String extractThreadId(final String fullThreadName) {
+        final int index = fullThreadName.indexOf("StreamThread-");
+        return fullThreadName.substring(index);
+    }
 }
diff --git a/streams/src/main/java/org/apache/kafka/streams/processor/internals/RepartitionTopics.java b/streams/src/main/java/org/apache/kafka/streams/processor/internals/RepartitionTopics.java
index 2951451..44be19e 100644
--- a/streams/src/main/java/org/apache/kafka/streams/processor/internals/RepartitionTopics.java
+++ b/streams/src/main/java/org/apache/kafka/streams/processor/internals/RepartitionTopics.java
@@ -21,9 +21,16 @@ import org.apache.kafka.common.Node;
 import org.apache.kafka.common.PartitionInfo;
 import org.apache.kafka.common.TopicPartition;
 import org.apache.kafka.common.utils.LogContext;
+import org.apache.kafka.streams.errors.MissingSourceTopicException;
+import org.apache.kafka.streams.errors.StreamsException;
 import org.apache.kafka.streams.errors.TaskAssignmentException;
+import org.apache.kafka.streams.processor.TaskId;
 import org.apache.kafka.streams.processor.internals.InternalTopologyBuilder.TopicsInfo;
+import org.apache.kafka.streams.processor.internals.TopologyMetadata.Subtopology;
 import org.apache.kafka.streams.processor.internals.assignment.CopartitionedTopicsEnforcer;
+
+import java.util.LinkedList;
+import java.util.Queue;
 import org.slf4j.Logger;
 
 import java.util.Collection;
@@ -35,6 +42,8 @@ import java.util.Optional;
 import java.util.Set;
 import java.util.stream.Collectors;
 
+import static org.apache.kafka.streams.processor.internals.TopologyMetadata.getTopologyNameOrElseUnnamed;
+
 public class RepartitionTopics {
 
     private final InternalTopicManager internalTopicManager;
@@ -43,7 +52,7 @@ public class RepartitionTopics {
     private final CopartitionedTopicsEnforcer copartitionedTopicsEnforcer;
     private final Logger log;
     private final Map<TopicPartition, PartitionInfo> topicPartitionInfos = new HashMap<>();
-    private final Map<String, Set<String>> missingUserInputTopicsPerTopology = new HashMap<>();
+    private final Map<Subtopology, Set<String>> missingInputTopicsBySubtopology = new HashMap<>();
 
     public RepartitionTopics(final TopologyMetadata topologyMetadata,
                              final InternalTopicManager internalTopicManager,
@@ -58,16 +67,11 @@ public class RepartitionTopics {
         log = logContext.logger(getClass());
     }
 
-    /**
-     * @return   true iff setup was completed successfully and all user input topics were verified to exist
-     */
-    public boolean setup() {
-        final Map<String, Collection<TopicsInfo>> topicGroups = topologyMetadata.topicGroupsByTopology();
-        final Map<String, InternalTopicConfig> repartitionTopicMetadata
-            = computeRepartitionTopicConfig(topicGroups, clusterMetadata);
+    public void setup() {
+        final Map<String, InternalTopicConfig> repartitionTopicMetadata = computeRepartitionTopicConfig(clusterMetadata);
 
         if (repartitionTopicMetadata.isEmpty()) {
-            if (missingUserInputTopicsPerTopology.isEmpty()) {
+            if (missingInputTopicsBySubtopology.isEmpty()) {
                 log.info("Skipping the repartition topic validation since there are no repartition topics.");
             } else {
                 log.info("Skipping the repartition topic validation since all topologies containing repartition"
@@ -97,12 +101,27 @@ public class RepartitionTopics {
                 }
             }
         }
+    }
 
-        return missingUserInputTopicsPerTopology.isEmpty();
+    public Set<String> topologiesWithMissingInputTopics() {
+        return missingInputTopicsBySubtopology.keySet()
+            .stream()
+            .map(s -> getTopologyNameOrElseUnnamed(s.namedTopology))
+            .collect(Collectors.toSet());
     }
 
-    public Map<String, Set<String>> missingUserInputTopicsPerTopology() {
-        return Collections.unmodifiableMap(missingUserInputTopicsPerTopology);
+    public Queue<StreamsException> missingSourceTopicExceptions() {
+        return missingInputTopicsBySubtopology.entrySet().stream().map(entry -> {
+            final Set<String> missingSourceTopics = entry.getValue();
+            final int subtopologyId = entry.getKey().nodeGroupId;
+            final String topologyName = entry.getKey().namedTopology;
+
+            return new StreamsException(
+                new MissingSourceTopicException(String.format(
+                    "Missing source topics %s for subtopology %s of topology %s",
+                    missingSourceTopics, subtopologyId, topologyName)),
+                new TaskId(subtopologyId, 0, topologyName));
+        }).collect(Collectors.toCollection(LinkedList::new));
     }
 
     public Map<TopicPartition, PartitionInfo> topicPartitionsInfo() {
@@ -110,36 +129,46 @@ public class RepartitionTopics {
     }
 
     /**
-     * @param topicGroups                            information about the topic groups (subtopologies) in this application
-     * @param clusterMetadata                        cluster metadata, eg which topics exist on the brokers
+     * @param clusterMetadata  cluster metadata, eg which topics exist on the brokers
      */
-    private Map<String, InternalTopicConfig> computeRepartitionTopicConfig(final Map<String, Collection<TopicsInfo>> topicGroups,
-                                                                           final Cluster clusterMetadata) {
+    private Map<String, InternalTopicConfig> computeRepartitionTopicConfig(final Cluster clusterMetadata) {
         final Set<TopicsInfo> allTopicsInfo = new HashSet<>();
         final Map<String, InternalTopicConfig> allRepartitionTopicConfigs = new HashMap<>();
-        for (final Map.Entry<String, Collection<TopicsInfo>> topology : topicGroups.entrySet()) {
-            final String topologyName = topology.getKey();
-            final Set<String> missingSourceTopics = new HashSet<>();
-            final Map<String, InternalTopicConfig> repartitionTopicConfigsPerTopology = new HashMap<>();
-            for (final TopicsInfo topicsInfo : topology.getValue()) {
-                missingSourceTopics.addAll(computeMissingExternalSourceTopics(topicsInfo, clusterMetadata));
-                repartitionTopicConfigsPerTopology.putAll(
+        for (final Map.Entry<String, Map<Subtopology, TopicsInfo>> topologyEntry : topologyMetadata.topologyToSubtopologyTopicsInfoMap().entrySet()) {
+            final String topologyName = topologyMetadata.hasNamedTopologies() ? topologyEntry.getKey() : null;
+
+            final Set<TopicsInfo> topicsInfoForTopology = new HashSet<>();
+            final Set<String> missingSourceTopicsForTopology = new HashSet<>();
+            final Map<String, InternalTopicConfig> repartitionTopicConfigsForTopology = new HashMap<>();
+
+            for (final Map.Entry<Subtopology, TopicsInfo> subtopologyEntry : topologyEntry.getValue().entrySet()) {
+                final TopicsInfo topicsInfo = subtopologyEntry.getValue();
+
+                topicsInfoForTopology.add(topicsInfo);
+                repartitionTopicConfigsForTopology.putAll(
                     topicsInfo.repartitionSourceTopics
                         .values()
                         .stream()
                         .collect(Collectors.toMap(InternalTopicConfig::name, topicConfig -> topicConfig)));
+
+                final Set<String> missingSourceTopicsForSubtopology = computeMissingExternalSourceTopics(topicsInfo, clusterMetadata);
+                missingSourceTopicsForTopology.addAll(missingSourceTopicsForSubtopology);
+                if (!missingSourceTopicsForSubtopology.isEmpty()) {
+                    missingInputTopicsBySubtopology.put(subtopologyEntry.getKey(), missingSourceTopicsForSubtopology);
+                    log.error("Subtopology {} was missing source topics {} and will be excluded from the current assignment, "
+                        + "this can be due to the consumer client's metadata being stale or because they have "
+                        + "not been created yet. Please verify that you have created all input topics; if they "
+                        + "do exist, you just need to wait for the metadata to be updated, at which time a new "
+                        + "rebalance will be kicked off automatically and the topology will be retried at that time."
+                        + topologyName, missingSourceTopicsForTopology);
+                }
             }
-            if (missingSourceTopics.isEmpty()) {
-                allRepartitionTopicConfigs.putAll(repartitionTopicConfigsPerTopology);
-                allTopicsInfo.addAll(topology.getValue());
+            if (missingSourceTopicsForTopology.isEmpty()) {
+                allRepartitionTopicConfigs.putAll(repartitionTopicConfigsForTopology);
+                allTopicsInfo.addAll(topicsInfoForTopology);
             } else {
-                missingUserInputTopicsPerTopology.put(topologyName, missingSourceTopics);
-                log.error("Topology {} was missing source topics {} and will be excluded from the current assignment, "
-                              + "this can be due to the consumer client's metadata being stale or because they have "
-                              + "not been created yet. Please verify that you have created all input topics; if they "
-                              + "do exist, you just need to wait for the metadata to be updated, at which time a new "
-                              + "rebalance will be kicked off automatically and the topology will be retried at that time."
-                              + topologyName, missingSourceTopics);
+                log.debug("Skipping repartition topic validation for entire topology {} due to missing source topics {}",
+                    topologyName, missingSourceTopicsForTopology);
             }
         }
         setRepartitionSourceTopicPartitionCount(allRepartitionTopicConfigs, allTopicsInfo, clusterMetadata);
diff --git a/streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamThread.java b/streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamThread.java
index a8d2c18..fa3e85b 100644
--- a/streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamThread.java
+++ b/streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamThread.java
@@ -47,6 +47,9 @@ import org.apache.kafka.streams.processor.internals.assignment.ReferenceContaine
 import org.apache.kafka.streams.processor.internals.metrics.StreamsMetricsImpl;
 import org.apache.kafka.streams.processor.internals.metrics.ThreadMetrics;
 import org.apache.kafka.streams.state.internals.ThreadCache;
+
+import java.util.Queue;
+import java.util.function.BiConsumer;
 import org.slf4j.Logger;
 
 import java.time.Duration;
@@ -298,12 +301,15 @@ public class StreamThread extends Thread {
     private final TopologyMetadata topologyMetadata;
     private final java.util.function.Consumer<Long> cacheResizer;
 
-    private java.util.function.Consumer<Throwable> streamsUncaughtExceptionHandler;
+    private BiConsumer<Throwable, Boolean> streamsUncaughtExceptionHandler;
     private final Runnable shutdownErrorHook;
 
     // These must be Atomic references as they are shared and used to signal between the assignor and the stream thread
     private final AtomicInteger assignmentErrorCode;
     private final AtomicLong nextProbingRebalanceMs;
+    // recoverable errors (don't require killing thread) that we need to invoke the exception
+    // handler for, eg MissingSourceTopicException with named topologies
+    private final Queue<StreamsException> nonFatalExceptionsToHandle;
 
     // These are used to signal from outside the stream thread, but the variables themselves are internal to the thread
     private final AtomicLong cacheResizeSize = new AtomicLong(-1L);
@@ -324,7 +330,7 @@ public class StreamThread extends Thread {
                                       final StateRestoreListener userStateRestoreListener,
                                       final int threadIdx,
                                       final Runnable shutdownErrorHook,
-                                      final java.util.function.Consumer<Throwable> streamsUncaughtExceptionHandler) {
+                                      final BiConsumer<Throwable, Boolean> streamsUncaughtExceptionHandler) {
         final String threadId = clientId + "-StreamThread-" + threadIdx;
 
         final String logPrefix = String.format("stream-thread [%s] ", threadId);
@@ -419,6 +425,7 @@ public class StreamThread extends Thread {
             logContext,
             referenceContainer.assignmentErrorCode,
             referenceContainer.nextScheduledRebalanceMs,
+            referenceContainer.nonFatalExceptionsToHandle,
             shutdownErrorHook,
             streamsUncaughtExceptionHandler,
             cache::resize
@@ -479,8 +486,9 @@ public class StreamThread extends Thread {
                         final LogContext logContext,
                         final AtomicInteger assignmentErrorCode,
                         final AtomicLong nextProbingRebalanceMs,
+                        final Queue<StreamsException> nonFatalExceptionsToHandle,
                         final Runnable shutdownErrorHook,
-                        final java.util.function.Consumer<Throwable> streamsUncaughtExceptionHandler,
+                        final BiConsumer<Throwable, Boolean> streamsUncaughtExceptionHandler,
                         final java.util.function.Consumer<Long> cacheResizer) {
         super(threadId);
         this.stateLock = new Object();
@@ -538,6 +546,7 @@ public class StreamThread extends Thread {
         this.changelogReader = changelogReader;
         this.originalReset = originalReset;
         this.nextProbingRebalanceMs = nextProbingRebalanceMs;
+        this.nonFatalExceptionsToHandle = nonFatalExceptionsToHandle;
         this.getGroupInstanceID = mainConsumer.groupMetadata().groupInstanceId();
 
         this.pollTime = Duration.ofMillis(config.getLong(StreamsConfig.POLL_MS_CONFIG));
@@ -575,7 +584,7 @@ public class StreamThread extends Thread {
             cleanRun = runLoop();
         } catch (final Throwable e) {
             failedStreamThreadSensor.record();
-            this.streamsUncaughtExceptionHandler.accept(e);
+            this.streamsUncaughtExceptionHandler.accept(e, false);
         } finally {
             completeShutdown(cleanRun);
         }
@@ -641,7 +650,7 @@ public class StreamThread extends Thread {
                           StreamsConfig.EXACTLY_ONCE_V2, StreamsConfig.EXACTLY_ONCE_BETA);
                 }
                 failedStreamThreadSensor.record();
-                this.streamsUncaughtExceptionHandler.accept(new StreamsException(e));
+                this.streamsUncaughtExceptionHandler.accept(new StreamsException(e), false);
                 return false;
             } catch (final StreamsException e) {
                 throw e;
@@ -657,7 +666,7 @@ public class StreamThread extends Thread {
      *
      * @param streamsUncaughtExceptionHandler the user handler wrapped in shell to execute the action
      */
-    public void setStreamsUncaughtExceptionHandler(final java.util.function.Consumer<Throwable> streamsUncaughtExceptionHandler) {
+    public void setStreamsUncaughtExceptionHandler(final BiConsumer<Throwable, Boolean> streamsUncaughtExceptionHandler) {
         this.streamsUncaughtExceptionHandler = streamsUncaughtExceptionHandler;
     }
 
@@ -969,6 +978,10 @@ public class StreamThread extends Thread {
             pollRecordsSensor.record(numRecords, now);
             taskManager.addRecordsToTasks(records);
         }
+
+        while (!nonFatalExceptionsToHandle.isEmpty()) {
+            streamsUncaughtExceptionHandler.accept(nonFatalExceptionsToHandle.poll(), true);
+        }
         return pollLatency;
     }
 
diff --git a/streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamsPartitionAssignor.java b/streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamsPartitionAssignor.java
index bcfa94b..80c506c 100644
--- a/streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamsPartitionAssignor.java
+++ b/streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamsPartitionAssignor.java
@@ -178,6 +178,7 @@ public class StreamsPartitionAssignor implements ConsumerPartitionAssignor, Conf
     private PartitionGrouper partitionGrouper;
     private AtomicInteger assignmentErrorCode;
     private AtomicLong nextScheduledRebalanceMs;
+    private Queue<StreamsException> nonFatalExceptionsToHandle;
     private Time time;
 
     protected int usedSubscriptionMetadataVersion = LATEST_SUPPORTED_VERSION;
@@ -212,6 +213,7 @@ public class StreamsPartitionAssignor implements ConsumerPartitionAssignor, Conf
         streamsMetadataState = Objects.requireNonNull(referenceContainer.streamsMetadataState, "StreamsMetadataState was not specified");
         assignmentErrorCode = referenceContainer.assignmentErrorCode;
         nextScheduledRebalanceMs = referenceContainer.nextScheduledRebalanceMs;
+        nonFatalExceptionsToHandle = referenceContainer.nonFatalExceptionsToHandle;
         time = Objects.requireNonNull(referenceContainer.time, "Time was not specified");
         assignmentConfigs = assignorConfiguration.assignmentConfigs();
         partitionGrouper = new PartitionGrouper();
@@ -382,7 +384,6 @@ public class StreamsPartitionAssignor implements ConsumerPartitionAssignor, Conf
             // the maximum of the depending sub-topologies source topics' number of partitions
             final RepartitionTopics repartitionTopics = prepareRepartitionTopics(metadata);
             final Map<TopicPartition, PartitionInfo> allRepartitionTopicPartitions = repartitionTopics.topicPartitionsInfo();
-            final Map<String, Set<String>> missingUserInputTopicsPerTopology = repartitionTopics.missingUserInputTopicsPerTopology();
 
             final Cluster fullMetadata = metadata.withPartitions(allRepartitionTopicPartitions);
             log.debug("Created repartition topics {} from the parsed topology.", allRepartitionTopicPartitions.values());
@@ -392,7 +393,7 @@ public class StreamsPartitionAssignor implements ConsumerPartitionAssignor, Conf
             // construct the assignment of tasks to clients
 
             final Map<Subtopology, TopicsInfo> topicGroups =
-                taskManager.topologyMetadata().subtopologyTopicsInfoMapExcluding(missingUserInputTopicsPerTopology.keySet());
+                taskManager.topologyMetadata().subtopologyTopicsInfoMapExcluding(repartitionTopics.topologiesWithMissingInputTopics());
 
             final Set<String> allSourceTopics = new HashSet<>();
             final Map<Subtopology, Set<String>> sourceTopicsByGroup = new HashMap<>();
@@ -511,15 +512,19 @@ public class StreamsPartitionAssignor implements ConsumerPartitionAssignor, Conf
             metadata,
             logPrefix
         );
-        final boolean isMissingInputTopics = !repartitionTopics.setup();
+        repartitionTopics.setup();
+        final boolean isMissingInputTopics = !repartitionTopics.missingSourceTopicExceptions().isEmpty();
         if (isMissingInputTopics) {
             if (!taskManager.topologyMetadata().hasNamedTopologies()) {
                 throw new MissingSourceTopicException("Missing source topics.");
+            } else {
+                nonFatalExceptionsToHandle.addAll(repartitionTopics.missingSourceTopicExceptions());
             }
         }
         return repartitionTopics;
     }
 
+
     /**
      * Populates the taskForPartition and tasksForTopicGroup maps, and checks that partitions are assigned to exactly
      * one task.
diff --git a/streams/src/main/java/org/apache/kafka/streams/processor/internals/TopologyMetadata.java b/streams/src/main/java/org/apache/kafka/streams/processor/internals/TopologyMetadata.java
index c3da7ce..3ff6e66 100644
--- a/streams/src/main/java/org/apache/kafka/streams/processor/internals/TopologyMetadata.java
+++ b/streams/src/main/java/org/apache/kafka/streams/processor/internals/TopologyMetadata.java
@@ -450,34 +450,33 @@ public class TopologyMetadata {
         return lookupBuilderForNamedTopology(topologyName).sourceTopicsForStore(storeName);
     }
 
-    private String getTopologyNameOrElseUnnamed(final String topologyName) {
+    public static String getTopologyNameOrElseUnnamed(final String topologyName) {
         return topologyName == null ? UNNAMED_TOPOLOGY : topologyName;
     }
 
     /**
      * @param topologiesToExclude the names of any topologies to exclude from the returned topic groups,
      *                            eg because they have missing source topics and can't be processed yet
+     *
+     * @return                    flattened map of all subtopologies (from all topologies) to topics info
      */
     public Map<Subtopology, TopicsInfo> subtopologyTopicsInfoMapExcluding(final Set<String> topologiesToExclude) {
-        final Map<Subtopology, TopicsInfo> topicGroups = new HashMap<>();
-        for (final InternalTopologyBuilder builder : builders.values()) {
-            if (!topologiesToExclude.contains(builder.topologyName())) {
-                topicGroups.putAll(builder.subtopologyToTopicsInfo());
+        final Map<Subtopology, TopicsInfo> subtopologyTopicsInfo = new HashMap<>();
+        applyToEachBuilder(b -> {
+            if (!topologiesToExclude.contains(b.topologyName())) {
+                subtopologyTopicsInfo.putAll(b.subtopologyToTopicsInfo());
             }
-        }
-        return topicGroups;
+        });
+        return subtopologyTopicsInfo;
     }
 
     /**
-     * @return    map from topologies with missing external source topics to the set of missing topic names,
-     *            keyed by topology name or
+     * @return    map from topology to its subtopologies and their topics info
      */
-    public Map<String, Collection<TopicsInfo>> topicGroupsByTopology() {
-        final Map<String, Collection<TopicsInfo>> topicGroups = new HashMap<>();
-        applyToEachBuilder(
-            b -> topicGroups.put(getTopologyNameOrElseUnnamed(b.topologyName()), b.subtopologyToTopicsInfo().values())
-        );
-        return topicGroups;
+    public Map<String, Map<Subtopology, TopicsInfo>> topologyToSubtopologyTopicsInfoMap() {
+        final Map<String, Map<Subtopology, TopicsInfo>> topologyToSubtopologyTopicsInfoMap = new HashMap<>();
+        applyToEachBuilder(b -> topologyToSubtopologyTopicsInfoMap.put(b.topologyName(), b.subtopologyToTopicsInfo()));
+        return  topologyToSubtopologyTopicsInfoMap;
     }
 
     public Map<String, List<String>> nodeToSourceTopics(final TaskId task) {
diff --git a/streams/src/main/java/org/apache/kafka/streams/processor/internals/assignment/ReferenceContainer.java b/streams/src/main/java/org/apache/kafka/streams/processor/internals/assignment/ReferenceContainer.java
index fbf65e5..9b46eeb 100644
--- a/streams/src/main/java/org/apache/kafka/streams/processor/internals/assignment/ReferenceContainer.java
+++ b/streams/src/main/java/org/apache/kafka/streams/processor/internals/assignment/ReferenceContainer.java
@@ -19,9 +19,12 @@ package org.apache.kafka.streams.processor.internals.assignment;
 import org.apache.kafka.clients.admin.Admin;
 import org.apache.kafka.clients.consumer.Consumer;
 import org.apache.kafka.common.utils.Time;
+import org.apache.kafka.streams.errors.StreamsException;
 import org.apache.kafka.streams.processor.internals.StreamsMetadataState;
 import org.apache.kafka.streams.processor.internals.TaskManager;
 
+import java.util.LinkedList;
+import java.util.Queue;
 import java.util.concurrent.atomic.AtomicInteger;
 import java.util.concurrent.atomic.AtomicLong;
 
@@ -32,5 +35,6 @@ public class ReferenceContainer {
     public StreamsMetadataState streamsMetadataState;
     public final AtomicInteger assignmentErrorCode = new AtomicInteger();
     public final AtomicLong nextScheduledRebalanceMs = new AtomicLong(Long.MAX_VALUE);
+    public final Queue<StreamsException> nonFatalExceptionsToHandle = new LinkedList<>();
     public Time time;
 }
diff --git a/streams/src/test/java/org/apache/kafka/streams/integration/NamedTopologyIntegrationTest.java b/streams/src/test/java/org/apache/kafka/streams/integration/NamedTopologyIntegrationTest.java
index 9ce39fd..73b4085 100644
--- a/streams/src/test/java/org/apache/kafka/streams/integration/NamedTopologyIntegrationTest.java
+++ b/streams/src/test/java/org/apache/kafka/streams/integration/NamedTopologyIntegrationTest.java
@@ -31,6 +31,9 @@ import org.apache.kafka.streams.KeyValue;
 import org.apache.kafka.streams.LagInfo;
 import org.apache.kafka.streams.StreamsConfig;
 import org.apache.kafka.streams.StreamsMetadata;
+import org.apache.kafka.streams.errors.MissingSourceTopicException;
+import org.apache.kafka.streams.errors.StreamsException;
+import org.apache.kafka.streams.errors.StreamsUncaughtExceptionHandler;
 import org.apache.kafka.streams.integration.utils.EmbeddedKafkaCluster;
 import org.apache.kafka.streams.integration.utils.IntegrationTestUtils;
 import org.apache.kafka.streams.kstream.Consumed;
@@ -52,6 +55,10 @@ import org.apache.kafka.streams.state.internals.StreamsMetadataImpl;
 import org.apache.kafka.streams.utils.UniqueTopicSerdeScope;
 import org.apache.kafka.test.TestUtils;
 
+import java.util.HashMap;
+import java.util.LinkedList;
+import java.util.Map;
+import java.util.Queue;
 import org.junit.After;
 import org.junit.AfterClass;
 import org.junit.Before;
@@ -63,7 +70,6 @@ import java.time.Duration;
 import java.util.Collection;
 import java.util.Iterator;
 import java.util.List;
-import java.util.Map;
 import java.util.Optional;
 import java.util.Properties;
 import java.util.Set;
@@ -76,10 +82,14 @@ import static org.apache.kafka.streams.KeyValue.pair;
 import static org.apache.kafka.streams.integration.utils.IntegrationTestUtils.safeUniqueTestName;
 import static org.apache.kafka.streams.integration.utils.IntegrationTestUtils.waitForApplicationState;
 import static org.apache.kafka.streams.integration.utils.IntegrationTestUtils.waitUntilMinKeyValueRecordsReceived;
+import static org.apache.kafka.streams.processor.internals.ClientUtils.extractThreadId;
+import static org.apache.kafka.test.TestUtils.retryOnExceptionWithTimeout;
 
 import static org.hamcrest.CoreMatchers.equalTo;
 import static org.hamcrest.CoreMatchers.is;
 import static org.hamcrest.CoreMatchers.not;
+import static org.hamcrest.CoreMatchers.notNullValue;
+import static org.hamcrest.CoreMatchers.nullValue;
 import static org.hamcrest.MatcherAssert.assertThat;
 import static java.util.Arrays.asList;
 import static java.util.Collections.singleton;
@@ -183,7 +193,6 @@ public class NamedTopologyIntegrationTest {
     // builders for the 2nd Streams instance
     private NamedTopologyBuilder topology1Builder2;
     private NamedTopologyBuilder topology2Builder2;
-    private NamedTopologyBuilder topology3Builder2;
 
     private Properties configProps(final String appId, final String host) {
         final Properties streamsConfiguration = new Properties();
@@ -226,7 +235,6 @@ public class NamedTopologyIntegrationTest {
         streams2 = new KafkaStreamsNamedTopologyWrapper(props2, clientSupplier);
         topology1Builder2 = streams2.newNamedTopologyBuilder(TOPOLOGY_1);
         topology2Builder2 = streams2.newNamedTopologyBuilder(TOPOLOGY_2);
-        topology3Builder2 = streams2.newNamedTopologyBuilder(TOPOLOGY_3);
     }
 
     @After
@@ -666,6 +674,10 @@ public class NamedTopologyIntegrationTest {
             topology1Builder.stream(EXISTING_STREAM).groupBy((k, v) -> k).count(IN_MEMORY_STORE).toStream().to(OUTPUT_STREAM_1);
             topology1Builder2.stream(EXISTING_STREAM).groupBy((k, v) -> k).count(IN_MEMORY_STORE).toStream().to(OUTPUT_STREAM_1);
 
+            final TrackingExceptionHandler handler = new TrackingExceptionHandler();
+            streams.setUncaughtExceptionHandler(handler);
+            streams2.setUncaughtExceptionHandler(handler);
+
             streams.start(topology1Builder.build());
             streams2.start(topology1Builder2.build());
             waitForApplicationState(asList(streams, streams2), State.RUNNING, Duration.ofSeconds(30));
@@ -674,9 +686,18 @@ public class NamedTopologyIntegrationTest {
             topology2Builder.stream(NEW_STREAM).groupBy((k, v) -> k).count(IN_MEMORY_STORE).toStream().to(OUTPUT_STREAM_2);
             topology2Builder2.stream(NEW_STREAM).groupBy((k, v) -> k).count(IN_MEMORY_STORE).toStream().to(OUTPUT_STREAM_2);
 
+            assertThat(handler.nextError(TOPOLOGY_2), nullValue());
+
             streams.addNamedTopology(topology2Builder.build());
             streams2.addNamedTopology(topology2Builder2.build());
 
+            // verify that the missing source topics were noticed and the handler invoked
+            retryOnExceptionWithTimeout(() -> {
+                final Throwable error = handler.nextError(TOPOLOGY_2);
+                assertThat(error, notNullValue());
+                assertThat(error.getCause().getClass(), is(MissingSourceTopicException.class));
+            });
+
             // make sure the original topology can continue processing while waiting on the new source topics
             produceToInputTopics(EXISTING_STREAM, singletonList(pair("A", 30L)));
             assertThat(waitUntilMinKeyValueRecordsReceived(consumerConfig, OUTPUT_STREAM_1, 1), equalTo(singletonList(pair("A", 3L))));
@@ -684,6 +705,22 @@ public class NamedTopologyIntegrationTest {
             CLUSTER.createTopic(NEW_STREAM, 2, 1);
             produceToInputTopics(NEW_STREAM, STANDARD_INPUT_DATA);
             assertThat(waitUntilMinKeyValueRecordsReceived(consumerConfig, OUTPUT_STREAM_2, 3), equalTo(COUNT_OUTPUT_DATA));
+
+            // Make sure the threads were not actually killed and replaced
+            assertThat(streams.metadataForLocalThreads().size(), equalTo(2));
+            assertThat(streams2.metadataForLocalThreads().size(), equalTo(2));
+
+            final Set<String> localThreadsNames = streams.metadataForLocalThreads().stream()
+                .map(t -> extractThreadId(t.threadName()))
+                .collect(Collectors.toSet());
+            final Set<String> localThreadsNames2 = streams2.metadataForLocalThreads().stream()
+                .map(t -> extractThreadId(t.threadName()))
+                .collect(Collectors.toSet());
+
+            assertThat(localThreadsNames.contains("StreamThread-1"), is(true));
+            assertThat(localThreadsNames.contains("StreamThread-2"), is(true));
+            assertThat(localThreadsNames2.contains("StreamThread-1"), is(true));
+            assertThat(localThreadsNames2.contains("StreamThread-2"), is(true));
         } finally {
             CLUSTER.deleteTopicsAndWait(EXISTING_STREAM, NEW_STREAM);
         }
@@ -695,15 +732,41 @@ public class NamedTopologyIntegrationTest {
         topology1Builder.stream(NEW_STREAM).groupBy((k, v) -> k).count(IN_MEMORY_STORE).toStream().to(OUTPUT_STREAM_1);
         topology1Builder2.stream(NEW_STREAM).groupBy((k, v) -> k).count(IN_MEMORY_STORE).toStream().to(OUTPUT_STREAM_1);
 
+        final TrackingExceptionHandler handler = new  TrackingExceptionHandler();
+        streams.setUncaughtExceptionHandler(handler);
+        streams2.setUncaughtExceptionHandler(handler);
+
         streams.start(topology1Builder.build());
         streams2.start(topology1Builder2.build());
         waitForApplicationState(asList(streams, streams2), State.RUNNING, Duration.ofSeconds(30));
 
+        retryOnExceptionWithTimeout(() -> {
+            final Throwable error = handler.nextError(TOPOLOGY_1);
+            assertThat(error, notNullValue());
+            assertThat(error.getCause().getClass(), is(MissingSourceTopicException.class));
+        });
+
         try {
             CLUSTER.createTopic(NEW_STREAM, 2, 1);
             produceToInputTopics(NEW_STREAM, STANDARD_INPUT_DATA);
 
             assertThat(waitUntilMinKeyValueRecordsReceived(consumerConfig, OUTPUT_STREAM_1, 3), equalTo(COUNT_OUTPUT_DATA));
+
+            // Make sure the threads were not actually killed and replaced
+            assertThat(streams.metadataForLocalThreads().size(), equalTo(2));
+            assertThat(streams2.metadataForLocalThreads().size(), equalTo(2));
+
+            final Set<String> localThreadsNames = streams.metadataForLocalThreads().stream()
+                .map(t -> extractThreadId(t.threadName()))
+                .collect(Collectors.toSet());
+            final Set<String> localThreadsNames2 = streams2.metadataForLocalThreads().stream()
+                .map(t -> extractThreadId(t.threadName()))
+                .collect(Collectors.toSet());
+
+            assertThat(localThreadsNames.contains("StreamThread-1"), is(true));
+            assertThat(localThreadsNames.contains("StreamThread-2"), is(true));
+            assertThat(localThreadsNames2.contains("StreamThread-1"), is(true));
+            assertThat(localThreadsNames2.contains("StreamThread-2"), is(true));
         } finally {
             CLUSTER.deleteTopicsAndWait(NEW_STREAM);
         }
@@ -778,4 +841,29 @@ public class NamedTopologyIntegrationTest {
             CLUSTER.time
         );
     }
+
+    private static class TrackingExceptionHandler implements StreamsUncaughtExceptionHandler {
+        private final Map<String, Queue<Throwable>> newErrorsByTopology = new HashMap<>();
+
+        @Override
+        public synchronized StreamThreadExceptionResponse handle(final Throwable exception) {
+            final String topologyName =
+                exception instanceof StreamsException && ((StreamsException) exception).taskId().isPresent() ?
+                    ((StreamsException) exception).taskId().get().topologyName()
+                    : null;
+
+            newErrorsByTopology.computeIfAbsent(topologyName, t -> new LinkedList<>()).add(exception);
+            if (exception.getCause() instanceof MissingSourceTopicException) {
+                return StreamThreadExceptionResponse.REPLACE_THREAD;
+            } else {
+                return StreamThreadExceptionResponse.SHUTDOWN_APPLICATION;
+            }
+        }
+
+        public synchronized Throwable nextError(final String topologyName) {
+            return newErrorsByTopology.containsKey(topologyName) ?
+                newErrorsByTopology.get(topologyName).poll() :
+                null;
+        }
+    }
 }
diff --git a/streams/src/test/java/org/apache/kafka/streams/processor/internals/RepartitionTopicsTest.java b/streams/src/test/java/org/apache/kafka/streams/processor/internals/RepartitionTopicsTest.java
index bacab10..5a020d0 100644
--- a/streams/src/test/java/org/apache/kafka/streams/processor/internals/RepartitionTopicsTest.java
+++ b/streams/src/test/java/org/apache/kafka/streams/processor/internals/RepartitionTopicsTest.java
@@ -21,7 +21,9 @@ import org.apache.kafka.common.Node;
 import org.apache.kafka.common.PartitionInfo;
 import org.apache.kafka.common.TopicPartition;
 import org.apache.kafka.streams.StreamsConfig;
+import org.apache.kafka.streams.errors.StreamsException;
 import org.apache.kafka.streams.errors.TaskAssignmentException;
+import org.apache.kafka.streams.processor.TaskId;
 import org.apache.kafka.streams.processor.internals.InternalTopologyBuilder.TopicsInfo;
 import org.apache.kafka.streams.processor.internals.assignment.CopartitionedTopicsEnforcer;
 import org.apache.kafka.streams.processor.internals.testutil.DummyStreamsConfig;
@@ -54,6 +56,7 @@ import static org.easymock.EasyMock.replay;
 import static org.easymock.EasyMock.verify;
 import static org.hamcrest.CoreMatchers.equalTo;
 import static org.hamcrest.CoreMatchers.is;
+import static org.hamcrest.CoreMatchers.notNullValue;
 import static org.hamcrest.CoreMatchers.nullValue;
 import static org.hamcrest.MatcherAssert.assertThat;
 import static org.junit.Assert.assertThrows;
@@ -145,6 +148,9 @@ public class RepartitionTopicsTest {
         verifyRepartitionTopicPartitionInfo(topicPartitionsInfo, REPARTITION_TOPIC_NAME1, 3);
         verifyRepartitionTopicPartitionInfo(topicPartitionsInfo, REPARTITION_TOPIC_NAME2, 0);
         verifyRepartitionTopicPartitionInfo(topicPartitionsInfo, REPARTITION_TOPIC_NAME2, 1);
+
+        assertThat(repartitionTopics.topologiesWithMissingInputTopics().isEmpty(), is(true));
+        assertThat(repartitionTopics.missingSourceTopicExceptions().isEmpty(), is(true));
     }
 
     @Test
@@ -168,12 +174,16 @@ public class RepartitionTopicsTest {
             clusterMetadata,
             "[test] "
         );
+        repartitionTopics.setup();
 
-        assertThat(repartitionTopics.setup(), equalTo(false));
         assertThat(
-            repartitionTopics.missingUserInputTopicsPerTopology(),
-            equalTo(Collections.singletonMap(UNNAMED_TOPOLOGY, missingSourceTopics))
+            repartitionTopics.topologiesWithMissingInputTopics(),
+            equalTo(Collections.singleton(UNNAMED_TOPOLOGY))
         );
+        final StreamsException exception = repartitionTopics.missingSourceTopicExceptions().poll();
+        assertThat(exception, notNullValue());
+        assertThat(exception.taskId().isPresent(), is(true));
+        assertThat(exception.taskId().get(), equalTo(new TaskId(0, 0)));
     }
 
     @Test
@@ -204,6 +214,8 @@ public class RepartitionTopicsTest {
 
         final TaskAssignmentException exception = assertThrows(TaskAssignmentException.class, repartitionTopics::setup);
         assertThat(exception.getMessage(), is("Failed to compute number of partitions for all repartition topics, make sure all user input topics are created and all Pattern subscriptions match at least one topic in the cluster"));
+        assertThat(repartitionTopics.topologiesWithMissingInputTopics().isEmpty(), is(true));
+        assertThat(repartitionTopics.missingSourceTopicExceptions().isEmpty(), is(true));
     }
 
     @Test
@@ -245,6 +257,8 @@ public class RepartitionTopicsTest {
             exception.getMessage(),
             is("No partition count found for source topic " + SOURCE_TOPIC_NAME1 + ", but it should have been.")
         );
+        assertThat(repartitionTopics.topologiesWithMissingInputTopics().isEmpty(), is(true));
+        assertThat(repartitionTopics.missingSourceTopicExceptions().isEmpty(), is(true));
     }
 
     @Test
@@ -299,6 +313,9 @@ public class RepartitionTopicsTest {
         verifyRepartitionTopicPartitionInfo(topicPartitionsInfo, REPARTITION_WITHOUT_PARTITION_COUNT, 0);
         verifyRepartitionTopicPartitionInfo(topicPartitionsInfo, REPARTITION_WITHOUT_PARTITION_COUNT, 1);
         verifyRepartitionTopicPartitionInfo(topicPartitionsInfo, REPARTITION_WITHOUT_PARTITION_COUNT, 2);
+
+        assertThat(repartitionTopics.topologiesWithMissingInputTopics().isEmpty(), is(true));
+        assertThat(repartitionTopics.missingSourceTopicExceptions().isEmpty(), is(true));
     }
 
     @Test
@@ -354,6 +371,9 @@ public class RepartitionTopicsTest {
         verifyRepartitionTopicPartitionInfo(topicPartitionsInfo, REPARTITION_WITHOUT_PARTITION_COUNT, 1);
         verifyRepartitionTopicPartitionInfo(topicPartitionsInfo, REPARTITION_WITHOUT_PARTITION_COUNT, 2);
         verifyRepartitionTopicPartitionInfo(topicPartitionsInfo, REPARTITION_WITHOUT_PARTITION_COUNT, 3);
+
+        assertThat(repartitionTopics.topologiesWithMissingInputTopics().isEmpty(), is(true));
+        assertThat(repartitionTopics.missingSourceTopicExceptions().isEmpty(), is(true));
     }
 
     @Test
@@ -381,6 +401,9 @@ public class RepartitionTopicsTest {
         verify(internalTopicManager, internalTopologyBuilder);
         final Map<TopicPartition, PartitionInfo> topicPartitionsInfo = repartitionTopics.topicPartitionsInfo();
         assertThat(topicPartitionsInfo, is(Collections.emptyMap()));
+
+        assertThat(repartitionTopics.topologiesWithMissingInputTopics().isEmpty(), is(true));
+        assertThat(repartitionTopics.missingSourceTopicExceptions().isEmpty(), is(true));
     }
 
     private void verifyRepartitionTopicPartitionInfo(final Map<TopicPartition, PartitionInfo> topicPartitionsInfo,
diff --git a/streams/src/test/java/org/apache/kafka/streams/processor/internals/StreamThreadTest.java b/streams/src/test/java/org/apache/kafka/streams/processor/internals/StreamThreadTest.java
index 20fab01..5b29961 100644
--- a/streams/src/test/java/org/apache/kafka/streams/processor/internals/StreamThreadTest.java
+++ b/streams/src/test/java/org/apache/kafka/streams/processor/internals/StreamThreadTest.java
@@ -79,6 +79,8 @@ import org.apache.kafka.test.MockStateRestoreListener;
 import org.apache.kafka.test.MockTimestampExtractor;
 import org.apache.kafka.test.StreamsTestUtils;
 import org.apache.kafka.test.TestUtils;
+
+import java.util.function.BiConsumer;
 import org.easymock.EasyMock;
 import org.junit.Assert;
 import org.junit.Before;
@@ -157,7 +159,7 @@ public class StreamThreadTest {
     private final InternalStreamsBuilder internalStreamsBuilder = new InternalStreamsBuilder(internalTopologyBuilder);
 
     private StreamsMetadataState streamsMetadataState;
-    private final static java.util.function.Consumer<Throwable> HANDLER = e -> {
+    private final static BiConsumer<Throwable, Boolean> HANDLER = (e, b) -> {
         if (e instanceof RuntimeException) {
             throw (RuntimeException) e;
         } else if (e instanceof Error) {
@@ -446,28 +448,9 @@ public class StreamThreadTest {
         final TaskManager taskManager = mockTaskManagerCommit(consumer, 1, 1);
         EasyMock.replay(consumer, consumerGroupMetadata);
 
-        final StreamsMetricsImpl streamsMetrics =
-            new StreamsMetricsImpl(metrics, CLIENT_ID, StreamsConfig.METRICS_LATEST, mockTime);
         final TopologyMetadata topologyMetadata = new TopologyMetadata(internalTopologyBuilder, config);
-        final StreamThread thread = new StreamThread(
-            mockTime,
-            config,
-            null,
-            consumer,
-            consumer,
-            null,
-            null,
-            taskManager,
-            streamsMetrics,
-            topologyMetadata,
-            CLIENT_ID,
-            new LogContext(""),
-            new AtomicInteger(),
-            new AtomicLong(Long.MAX_VALUE),
-            null,
-            HANDLER,
-            null
-        );
+        topologyMetadata.buildAndRewriteTopology();
+        final StreamThread thread = buildStreamThread(consumer, taskManager, config, topologyMetadata);
         thread.setNow(mockTime.milliseconds());
         thread.maybeCommit();
         mockTime.sleep(commitInterval - 10L);
@@ -698,29 +681,10 @@ public class StreamThreadTest {
         EasyMock.replay(consumer, consumerGroupMetadata);
         final TaskManager taskManager = mockTaskManagerCommit(consumer, 1, 0);
 
-        final StreamsMetricsImpl streamsMetrics =
-            new StreamsMetricsImpl(metrics, CLIENT_ID, StreamsConfig.METRICS_LATEST, mockTime);
         final TopologyMetadata topologyMetadata = new TopologyMetadata(internalTopologyBuilder, config);
         topologyMetadata.buildAndRewriteTopology();
-        final StreamThread thread = new StreamThread(
-            mockTime,
-            config,
-            null,
-            consumer,
-            consumer,
-            null,
-            null,
-            taskManager,
-            streamsMetrics,
-            topologyMetadata,
-            CLIENT_ID,
-            new LogContext(""),
-            new AtomicInteger(),
-            new AtomicLong(Long.MAX_VALUE),
-            null,
-            HANDLER,
-            null
-        );
+
+        final StreamThread thread = buildStreamThread(consumer, taskManager, config, topologyMetadata);
         thread.setNow(mockTime.milliseconds());
         thread.maybeCommit();
         mockTime.sleep(commitInterval - 10L);
@@ -746,9 +710,6 @@ public class StreamThreadTest {
         expect(consumerGroupMetadata.groupInstanceId()).andReturn(Optional.empty());
         EasyMock.replay(consumer, consumerGroupMetadata);
 
-        final StreamsMetricsImpl streamsMetrics =
-            new StreamsMetricsImpl(metrics, CLIENT_ID, StreamsConfig.METRICS_LATEST, mockTime);
-
         final AtomicBoolean committed = new AtomicBoolean(false);
         final TaskManager taskManager = new TaskManager(
             null,
@@ -771,28 +732,9 @@ public class StreamThreadTest {
                 return 1;
             }
         };
-
         final TopologyMetadata topologyMetadata = new TopologyMetadata(internalTopologyBuilder, config);
         topologyMetadata.buildAndRewriteTopology();
-        final StreamThread thread = new StreamThread(
-            mockTime,
-            config,
-            null,
-            consumer,
-            consumer,
-            changelogReader,
-            null,
-            taskManager,
-            streamsMetrics,
-            topologyMetadata,
-            CLIENT_ID,
-            new LogContext(""),
-            new AtomicInteger(),
-            new AtomicLong(Long.MAX_VALUE),
-            null,
-            HANDLER,
-            null
-        );
+        final StreamThread thread = buildStreamThread(consumer, taskManager, config, topologyMetadata);
 
         thread.setNow(mockTime.milliseconds());
         thread.maybeCommit();
@@ -863,25 +805,7 @@ public class StreamThreadTest {
         };
         taskManager.setMainConsumer(consumer);
 
-        final StreamThread thread = new StreamThread(
-            mockTime,
-            config,
-            null,
-            consumer,
-            consumer,
-            changelogReader,
-            null,
-            taskManager,
-            streamsMetrics,
-            topologyMetadata,
-            CLIENT_ID,
-            new LogContext(""),
-            new AtomicInteger(),
-            new AtomicLong(Long.MAX_VALUE),
-            null,
-            HANDLER,
-            null
-        );
+        final StreamThread thread = buildStreamThread(consumer, taskManager, config, topologyMetadata);
         thread.updateThreadMetadata("adminClientId");
         thread.setState(StreamThread.State.STARTING);
 
@@ -1148,25 +1072,8 @@ public class StreamThreadTest {
             new StreamsMetricsImpl(metrics, CLIENT_ID, StreamsConfig.METRICS_LATEST, mockTime);
         final TopologyMetadata topologyMetadata = new TopologyMetadata(internalTopologyBuilder, config);
         topologyMetadata.buildAndRewriteTopology();
-        final StreamThread thread = new StreamThread(
-            mockTime,
-            config,
-            null,
-            consumer,
-            consumer,
-            null,
-            null,
-            taskManager,
-            streamsMetrics,
-            topologyMetadata,
-            CLIENT_ID,
-            new LogContext(""),
-            new AtomicInteger(),
-            new AtomicLong(Long.MAX_VALUE),
-            null,
-            HANDLER,
-            null
-        ).updateThreadMetadata(getSharedAdminClientId(CLIENT_ID));
+        final StreamThread thread = buildStreamThread(consumer, taskManager, config, topologyMetadata)
+            .updateThreadMetadata(getSharedAdminClientId(CLIENT_ID));
         thread.setStateListener(
             (t, newState, oldState) -> {
                 if (oldState == StreamThread.State.CREATED && newState == StreamThread.State.STARTING) {
@@ -1223,6 +1130,7 @@ public class StreamThreadTest {
             new LogContext(""),
             new AtomicInteger(),
             new AtomicLong(Long.MAX_VALUE),
+            new LinkedList<>(),
             null,
             HANDLER,
             null
@@ -1255,25 +1163,8 @@ public class StreamThreadTest {
             new StreamsMetricsImpl(metrics, CLIENT_ID, StreamsConfig.METRICS_LATEST, mockTime);
         final TopologyMetadata topologyMetadata = new TopologyMetadata(internalTopologyBuilder, config);
         topologyMetadata.buildAndRewriteTopology();
-        final StreamThread thread = new StreamThread(
-            mockTime,
-            config,
-            null,
-            consumer,
-            consumer,
-            null,
-            null,
-            taskManager,
-            streamsMetrics,
-            topologyMetadata,
-            CLIENT_ID,
-            new LogContext(""),
-            new AtomicInteger(),
-            new AtomicLong(Long.MAX_VALUE),
-            null,
-            HANDLER,
-            null
-        ).updateThreadMetadata(getSharedAdminClientId(CLIENT_ID));
+        final StreamThread thread = buildStreamThread(consumer, taskManager, config, topologyMetadata)
+            .updateThreadMetadata(getSharedAdminClientId(CLIENT_ID));
         thread.shutdown();
         verify(taskManager);
     }
@@ -1295,25 +1186,8 @@ public class StreamThreadTest {
             new StreamsMetricsImpl(metrics, CLIENT_ID, StreamsConfig.METRICS_LATEST, mockTime);
         final TopologyMetadata topologyMetadata = new TopologyMetadata(internalTopologyBuilder, config);
         topologyMetadata.buildAndRewriteTopology();
-        final StreamThread thread = new StreamThread(
-            mockTime,
-            config,
-            null,
-            consumer,
-            consumer,
-            null,
-            null,
-            taskManager,
-            streamsMetrics,
-            topologyMetadata,
-            CLIENT_ID,
-            new LogContext(""),
-            new AtomicInteger(),
-            new AtomicLong(Long.MAX_VALUE),
-            null,
-            HANDLER,
-            null
-        ).updateThreadMetadata(getSharedAdminClientId(CLIENT_ID));
+        final StreamThread thread = buildStreamThread(consumer, taskManager, config, topologyMetadata)
+            .updateThreadMetadata(getSharedAdminClientId(CLIENT_ID));
         thread.shutdown();
         // Execute the run method. Verification of the mock will check that shutdown was only done once
         thread.run();
@@ -2205,25 +2079,8 @@ public class StreamThreadTest {
             new StreamsMetricsImpl(metrics, CLIENT_ID, StreamsConfig.METRICS_LATEST, mockTime);
         final TopologyMetadata topologyMetadata = new TopologyMetadata(internalTopologyBuilder, config);
         topologyMetadata.buildAndRewriteTopology();
-        final StreamThread thread = new StreamThread(
-            mockTime,
-            config,
-            null,
-            consumer,
-            consumer,
-            null,
-            null,
-            taskManager,
-            streamsMetrics,
-            topologyMetadata,
-            CLIENT_ID,
-            new LogContext(""),
-            new AtomicInteger(),
-            new AtomicLong(Long.MAX_VALUE),
-            null,
-            HANDLER,
-            null
-        ).updateThreadMetadata(getSharedAdminClientId(CLIENT_ID));
+        final StreamThread thread = buildStreamThread(consumer, taskManager, config, topologyMetadata)
+            .updateThreadMetadata(getSharedAdminClientId(CLIENT_ID));
 
         consumer.schedulePollTask(() -> {
             thread.setState(StreamThread.State.PARTITIONS_REVOKED);
@@ -2255,25 +2112,8 @@ public class StreamThreadTest {
             new StreamsMetricsImpl(metrics, CLIENT_ID, StreamsConfig.METRICS_LATEST, mockTime);
         final TopologyMetadata topologyMetadata = new TopologyMetadata(internalTopologyBuilder, config);
         topologyMetadata.buildAndRewriteTopology();
-        final StreamThread thread = new StreamThread(
-            mockTime,
-            config,
-            null,
-            consumer,
-            consumer,
-            null,
-            null,
-            taskManager,
-            streamsMetrics,
-            topologyMetadata,
-            CLIENT_ID,
-            new LogContext(""),
-            new AtomicInteger(),
-            new AtomicLong(Long.MAX_VALUE),
-            null,
-            HANDLER,
-            null
-        ).updateThreadMetadata(getSharedAdminClientId(CLIENT_ID));
+        final StreamThread thread = buildStreamThread(consumer, taskManager, config, topologyMetadata)
+            .updateThreadMetadata(getSharedAdminClientId(CLIENT_ID));
 
         consumer.schedulePollTask(() -> {
             thread.setState(StreamThread.State.PARTITIONS_REVOKED);
@@ -2333,6 +2173,7 @@ public class StreamThreadTest {
             new LogContext(""),
             new AtomicInteger(),
             new AtomicLong(Long.MAX_VALUE),
+            new LinkedList<>(),
             null,
             HANDLER,
             null
@@ -2399,6 +2240,7 @@ public class StreamThreadTest {
             new LogContext(""),
             new AtomicInteger(),
             new AtomicLong(Long.MAX_VALUE),
+            new LinkedList<>(),
             null,
             HANDLER,
             null
@@ -2412,7 +2254,7 @@ public class StreamThreadTest {
 
         final AtomicBoolean exceptionHandlerInvoked = new AtomicBoolean(false);
 
-        thread.setStreamsUncaughtExceptionHandler(e -> exceptionHandlerInvoked.set(true));
+        thread.setStreamsUncaughtExceptionHandler((e, b) -> exceptionHandlerInvoked.set(true));
         thread.run();
 
         verify(taskManager);
@@ -2473,6 +2315,7 @@ public class StreamThreadTest {
             new LogContext(""),
             new AtomicInteger(),
             new AtomicLong(Long.MAX_VALUE),
+            new LinkedList<>(),
             null,
             HANDLER,
             null
@@ -2542,6 +2385,7 @@ public class StreamThreadTest {
             new LogContext(""),
             new AtomicInteger(),
             new AtomicLong(Long.MAX_VALUE),
+            new LinkedList<>(),
             null,
             HANDLER,
             null
@@ -2609,6 +2453,7 @@ public class StreamThreadTest {
             new LogContext(""),
             new AtomicInteger(),
             new AtomicLong(Long.MAX_VALUE),
+            new LinkedList<>(),
             null,
             HANDLER,
             null
@@ -2663,25 +2508,7 @@ public class StreamThreadTest {
             new StreamsMetricsImpl(metrics, CLIENT_ID, StreamsConfig.METRICS_LATEST, mockTime);
         final TopologyMetadata topologyMetadata = new TopologyMetadata(internalTopologyBuilder, config);
         topologyMetadata.buildAndRewriteTopology();
-        final StreamThread thread = new StreamThread(
-            mockTime,
-            config,
-            null,
-            consumer,
-            consumer,
-            null,
-            null,
-            taskManager,
-            streamsMetrics,
-            topologyMetadata,
-            CLIENT_ID,
-            new LogContext(""),
-            new AtomicInteger(),
-            new AtomicLong(Long.MAX_VALUE),
-            null,
-            HANDLER,
-            null
-        );
+        final StreamThread thread = buildStreamThread(consumer, taskManager, config, topologyMetadata);
 
         EasyMock.replay(task1, task2, task3, taskManager);
 
@@ -2807,25 +2634,7 @@ public class StreamThreadTest {
             new StreamsMetricsImpl(metrics, CLIENT_ID, StreamsConfig.METRICS_LATEST, mockTime);
         final TopologyMetadata topologyMetadata = new TopologyMetadata(internalTopologyBuilder, config);
         topologyMetadata.buildAndRewriteTopology();
-        final StreamThread thread = new StreamThread(
-            mockTime,
-            new StreamsConfig(configProps(true)),
-            null,
-            consumer,
-            consumer,
-            null,
-            null,
-            taskManager,
-            streamsMetrics,
-            topologyMetadata,
-            CLIENT_ID,
-            new LogContext(""),
-            new AtomicInteger(),
-            new AtomicLong(Long.MAX_VALUE),
-            null,
-            HANDLER,
-            null
-        );
+        final StreamThread thread = buildStreamThread(consumer, taskManager, config, topologyMetadata);
 
         assertThat(dummyProducerMetrics, is(thread.producerMetrics()));
     }
@@ -2865,6 +2674,7 @@ public class StreamThreadTest {
             new LogContext(""),
             new AtomicInteger(),
             new AtomicLong(Long.MAX_VALUE),
+            new LinkedList<>(),
             null,
             HANDLER,
             null
@@ -2921,8 +2731,9 @@ public class StreamThreadTest {
             new LogContext(""),
             new AtomicInteger(),
             new AtomicLong(Long.MAX_VALUE),
+            new LinkedList<>(),
             null,
-            e -> { },
+            (e, b) -> { },
             null
         ) {
             @Override
@@ -3016,4 +2827,33 @@ public class StreamThreadTest {
         }
         return null;
     }
+
+    private StreamThread buildStreamThread(final Consumer<byte[], byte[]> consumer,
+                                           final TaskManager taskManager,
+                                           final StreamsConfig config,
+                                           final TopologyMetadata topologyMetadata) {
+        final StreamsMetricsImpl streamsMetrics =
+            new StreamsMetricsImpl(metrics, CLIENT_ID, StreamsConfig.METRICS_LATEST, mockTime);
+
+        return new StreamThread(
+            mockTime,
+            config,
+            null,
+            consumer,
+            consumer,
+            changelogReader,
+            null,
+            taskManager,
+            streamsMetrics,
+            topologyMetadata,
+            CLIENT_ID,
+            new LogContext(""),
+            new AtomicInteger(),
+            new AtomicLong(Long.MAX_VALUE),
+            new LinkedList<>(),
+            null,
+            HANDLER,
+            null
+        );
+    }
 }