You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@kafka.apache.org by ab...@apache.org on 2022/01/25 18:39:57 UTC
[kafka] branch trunk updated: KAFKA-12648: invoke exception handler for MissingSourceTopicException with named topologies (#11686)
This is an automated email from the ASF dual-hosted git repository.
ableegoldman pushed a commit to branch trunk
in repository https://gitbox.apache.org/repos/asf/kafka.git
The following commit(s) were added to refs/heads/trunk by this push:
new 9d602a0 KAFKA-12648: invoke exception handler for MissingSourceTopicException with named topologies (#11686)
9d602a0 is described below
commit 9d602a01beb0a9467c43b9ee0b80ff53fdd2dbb4
Author: A. Sophie Blee-Goldman <so...@confluent.io>
AuthorDate: Tue Jan 25 10:37:35 2022 -0800
KAFKA-12648: invoke exception handler for MissingSourceTopicException with named topologies (#11686)
Followup to #11600 to invoke the streams exception handler on the MissingSourceTopicException, without killing/replacing the thread
Reviewers: Guozhang Wang <gu...@confluent.io>, Bruno Cadonna <ca...@confluent.io>
---
.../org/apache/kafka/streams/KafkaStreams.java | 40 +--
.../streams/processor/internals/ClientUtils.java | 5 +
.../processor/internals/RepartitionTopics.java | 95 ++++---
.../streams/processor/internals/StreamThread.java | 25 +-
.../internals/StreamsPartitionAssignor.java | 11 +-
.../processor/internals/TopologyMetadata.java | 29 ++-
.../internals/assignment/ReferenceContainer.java | 4 +
.../integration/NamedTopologyIntegrationTest.java | 94 ++++++-
.../processor/internals/RepartitionTopicsTest.java | 29 ++-
.../processor/internals/StreamThreadTest.java | 280 +++++----------------
10 files changed, 314 insertions(+), 298 deletions(-)
diff --git a/streams/src/main/java/org/apache/kafka/streams/KafkaStreams.java b/streams/src/main/java/org/apache/kafka/streams/KafkaStreams.java
index ad56c7e..dc08d91 100644
--- a/streams/src/main/java/org/apache/kafka/streams/KafkaStreams.java
+++ b/streams/src/main/java/org/apache/kafka/streams/KafkaStreams.java
@@ -73,11 +73,13 @@ import org.apache.kafka.streams.state.HostInfo;
import org.apache.kafka.streams.state.internals.GlobalStateStoreProvider;
import org.apache.kafka.streams.state.internals.QueryableStoreProvider;
import org.apache.kafka.streams.state.internals.StreamThreadStateStoreProvider;
+
import org.slf4j.Logger;
import java.time.Duration;
import java.util.ArrayList;
import java.util.Arrays;
+import java.util.function.BiConsumer;
import java.util.Collection;
import java.util.Collections;
import java.util.HashMap;
@@ -182,7 +184,7 @@ public class KafkaStreams implements AutoCloseable {
private KafkaStreams.StateListener stateListener;
private StateRestoreListener globalStateRestoreListener;
private boolean oldHandler;
- private java.util.function.Consumer<Throwable> streamsUncaughtExceptionHandler;
+ private BiConsumer<Throwable, Boolean> streamsUncaughtExceptionHandler;
private final Object changeThreadCount = new Object();
// container states
@@ -452,19 +454,22 @@ public class KafkaStreams implements AutoCloseable {
* Note, this handler must be threadsafe, since it will be shared among all threads, and invoked from any
* thread that encounters such an exception.
*
- * @param streamsUncaughtExceptionHandler the uncaught exception handler of type {@link StreamsUncaughtExceptionHandler} for all internal threads
+ * @param userStreamsUncaughtExceptionHandler the uncaught exception handler of type {@link StreamsUncaughtExceptionHandler} for all internal threads
* @throws IllegalStateException if this {@code KafkaStreams} instance has already been started.
- * @throws NullPointerException if streamsUncaughtExceptionHandler is null.
+ * @throws NullPointerException if userStreamsUncaughtExceptionHandler is null.
*/
- public void setUncaughtExceptionHandler(final StreamsUncaughtExceptionHandler streamsUncaughtExceptionHandler) {
- final Consumer<Throwable> handler = exception -> handleStreamsUncaughtException(exception, streamsUncaughtExceptionHandler);
+ public void setUncaughtExceptionHandler(final StreamsUncaughtExceptionHandler userStreamsUncaughtExceptionHandler) {
synchronized (stateLock) {
if (state.hasNotStarted()) {
- this.streamsUncaughtExceptionHandler = handler;
- Objects.requireNonNull(streamsUncaughtExceptionHandler);
- processStreamThread(thread -> thread.setStreamsUncaughtExceptionHandler(handler));
+ Objects.requireNonNull(userStreamsUncaughtExceptionHandler);
+ streamsUncaughtExceptionHandler =
+ (exception, skipThreadReplacement) ->
+ handleStreamsUncaughtException(exception, userStreamsUncaughtExceptionHandler, skipThreadReplacement);
+ processStreamThread(thread -> thread.setStreamsUncaughtExceptionHandler(streamsUncaughtExceptionHandler));
if (globalStreamThread != null) {
- globalStreamThread.setUncaughtExceptionHandler(handler);
+ globalStreamThread.setUncaughtExceptionHandler(
+ exception -> handleStreamsUncaughtException(exception, userStreamsUncaughtExceptionHandler, false)
+ );
}
} else {
throw new IllegalStateException("Can only set UncaughtExceptionHandler before calling start(). " +
@@ -473,7 +478,7 @@ public class KafkaStreams implements AutoCloseable {
}
}
- private void defaultStreamsUncaughtExceptionHandler(final Throwable throwable) {
+ private void defaultStreamsUncaughtExceptionHandler(final Throwable throwable, final boolean skipThreadReplacement) {
if (oldHandler) {
threads.remove(Thread.currentThread());
if (throwable instanceof RuntimeException) {
@@ -484,7 +489,7 @@ public class KafkaStreams implements AutoCloseable {
throw new RuntimeException("Unexpected checked exception caught in the uncaught exception handler", throwable);
}
} else {
- handleStreamsUncaughtException(throwable, t -> SHUTDOWN_CLIENT);
+ handleStreamsUncaughtException(throwable, t -> SHUTDOWN_CLIENT, skipThreadReplacement);
}
}
@@ -523,7 +528,8 @@ public class KafkaStreams implements AutoCloseable {
}
private void handleStreamsUncaughtException(final Throwable throwable,
- final StreamsUncaughtExceptionHandler streamsUncaughtExceptionHandler) {
+ final StreamsUncaughtExceptionHandler streamsUncaughtExceptionHandler,
+ final boolean skipThreadReplacement) {
final StreamsUncaughtExceptionHandler.StreamThreadExceptionResponse action = getActionForThrowable(throwable, streamsUncaughtExceptionHandler);
if (oldHandler) {
log.warn("Stream's new uncaught exception handler is set as well as the deprecated old handler." +
@@ -531,8 +537,12 @@ public class KafkaStreams implements AutoCloseable {
}
switch (action) {
case REPLACE_THREAD:
- log.error("Replacing thread in the streams uncaught exception handler", throwable);
- replaceStreamThread(throwable);
+ if (!skipThreadReplacement) {
+ log.error("Replacing thread in the streams uncaught exception handler", throwable);
+ replaceStreamThread(throwable);
+ } else {
+ log.debug("Skipping thread replacement for recoverable error");
+ }
break;
case SHUTDOWN_CLIENT:
log.error("Encountered the following exception during processing " +
@@ -940,7 +950,7 @@ public class KafkaStreams implements AutoCloseable {
time,
globalThreadId,
delegatingStateRestoreListener,
- streamsUncaughtExceptionHandler
+ exception -> defaultStreamsUncaughtExceptionHandler(exception, false)
);
globalThreadState = globalStreamThread.state();
}
diff --git a/streams/src/main/java/org/apache/kafka/streams/processor/internals/ClientUtils.java b/streams/src/main/java/org/apache/kafka/streams/processor/internals/ClientUtils.java
index a5807b5..b47b68f 100644
--- a/streams/src/main/java/org/apache/kafka/streams/processor/internals/ClientUtils.java
+++ b/streams/src/main/java/org/apache/kafka/streams/processor/internals/ClientUtils.java
@@ -161,4 +161,9 @@ public class ClientUtils {
}
return getEndOffsets(fetchEndOffsetsFuture(partitions, adminClient));
}
+
+ public static String extractThreadId(final String fullThreadName) {
+ final int index = fullThreadName.indexOf("StreamThread-");
+ return fullThreadName.substring(index);
+ }
}
diff --git a/streams/src/main/java/org/apache/kafka/streams/processor/internals/RepartitionTopics.java b/streams/src/main/java/org/apache/kafka/streams/processor/internals/RepartitionTopics.java
index 2951451..44be19e 100644
--- a/streams/src/main/java/org/apache/kafka/streams/processor/internals/RepartitionTopics.java
+++ b/streams/src/main/java/org/apache/kafka/streams/processor/internals/RepartitionTopics.java
@@ -21,9 +21,16 @@ import org.apache.kafka.common.Node;
import org.apache.kafka.common.PartitionInfo;
import org.apache.kafka.common.TopicPartition;
import org.apache.kafka.common.utils.LogContext;
+import org.apache.kafka.streams.errors.MissingSourceTopicException;
+import org.apache.kafka.streams.errors.StreamsException;
import org.apache.kafka.streams.errors.TaskAssignmentException;
+import org.apache.kafka.streams.processor.TaskId;
import org.apache.kafka.streams.processor.internals.InternalTopologyBuilder.TopicsInfo;
+import org.apache.kafka.streams.processor.internals.TopologyMetadata.Subtopology;
import org.apache.kafka.streams.processor.internals.assignment.CopartitionedTopicsEnforcer;
+
+import java.util.LinkedList;
+import java.util.Queue;
import org.slf4j.Logger;
import java.util.Collection;
@@ -35,6 +42,8 @@ import java.util.Optional;
import java.util.Set;
import java.util.stream.Collectors;
+import static org.apache.kafka.streams.processor.internals.TopologyMetadata.getTopologyNameOrElseUnnamed;
+
public class RepartitionTopics {
private final InternalTopicManager internalTopicManager;
@@ -43,7 +52,7 @@ public class RepartitionTopics {
private final CopartitionedTopicsEnforcer copartitionedTopicsEnforcer;
private final Logger log;
private final Map<TopicPartition, PartitionInfo> topicPartitionInfos = new HashMap<>();
- private final Map<String, Set<String>> missingUserInputTopicsPerTopology = new HashMap<>();
+ private final Map<Subtopology, Set<String>> missingInputTopicsBySubtopology = new HashMap<>();
public RepartitionTopics(final TopologyMetadata topologyMetadata,
final InternalTopicManager internalTopicManager,
@@ -58,16 +67,11 @@ public class RepartitionTopics {
log = logContext.logger(getClass());
}
- /**
- * @return true iff setup was completed successfully and all user input topics were verified to exist
- */
- public boolean setup() {
- final Map<String, Collection<TopicsInfo>> topicGroups = topologyMetadata.topicGroupsByTopology();
- final Map<String, InternalTopicConfig> repartitionTopicMetadata
- = computeRepartitionTopicConfig(topicGroups, clusterMetadata);
+ public void setup() {
+ final Map<String, InternalTopicConfig> repartitionTopicMetadata = computeRepartitionTopicConfig(clusterMetadata);
if (repartitionTopicMetadata.isEmpty()) {
- if (missingUserInputTopicsPerTopology.isEmpty()) {
+ if (missingInputTopicsBySubtopology.isEmpty()) {
log.info("Skipping the repartition topic validation since there are no repartition topics.");
} else {
log.info("Skipping the repartition topic validation since all topologies containing repartition"
@@ -97,12 +101,27 @@ public class RepartitionTopics {
}
}
}
+ }
- return missingUserInputTopicsPerTopology.isEmpty();
+ public Set<String> topologiesWithMissingInputTopics() {
+ return missingInputTopicsBySubtopology.keySet()
+ .stream()
+ .map(s -> getTopologyNameOrElseUnnamed(s.namedTopology))
+ .collect(Collectors.toSet());
}
- public Map<String, Set<String>> missingUserInputTopicsPerTopology() {
- return Collections.unmodifiableMap(missingUserInputTopicsPerTopology);
+ public Queue<StreamsException> missingSourceTopicExceptions() {
+ return missingInputTopicsBySubtopology.entrySet().stream().map(entry -> {
+ final Set<String> missingSourceTopics = entry.getValue();
+ final int subtopologyId = entry.getKey().nodeGroupId;
+ final String topologyName = entry.getKey().namedTopology;
+
+ return new StreamsException(
+ new MissingSourceTopicException(String.format(
+ "Missing source topics %s for subtopology %s of topology %s",
+ missingSourceTopics, subtopologyId, topologyName)),
+ new TaskId(subtopologyId, 0, topologyName));
+ }).collect(Collectors.toCollection(LinkedList::new));
}
public Map<TopicPartition, PartitionInfo> topicPartitionsInfo() {
@@ -110,36 +129,46 @@ public class RepartitionTopics {
}
/**
- * @param topicGroups information about the topic groups (subtopologies) in this application
- * @param clusterMetadata cluster metadata, eg which topics exist on the brokers
+ * @param clusterMetadata cluster metadata, eg which topics exist on the brokers
*/
- private Map<String, InternalTopicConfig> computeRepartitionTopicConfig(final Map<String, Collection<TopicsInfo>> topicGroups,
- final Cluster clusterMetadata) {
+ private Map<String, InternalTopicConfig> computeRepartitionTopicConfig(final Cluster clusterMetadata) {
final Set<TopicsInfo> allTopicsInfo = new HashSet<>();
final Map<String, InternalTopicConfig> allRepartitionTopicConfigs = new HashMap<>();
- for (final Map.Entry<String, Collection<TopicsInfo>> topology : topicGroups.entrySet()) {
- final String topologyName = topology.getKey();
- final Set<String> missingSourceTopics = new HashSet<>();
- final Map<String, InternalTopicConfig> repartitionTopicConfigsPerTopology = new HashMap<>();
- for (final TopicsInfo topicsInfo : topology.getValue()) {
- missingSourceTopics.addAll(computeMissingExternalSourceTopics(topicsInfo, clusterMetadata));
- repartitionTopicConfigsPerTopology.putAll(
+ for (final Map.Entry<String, Map<Subtopology, TopicsInfo>> topologyEntry : topologyMetadata.topologyToSubtopologyTopicsInfoMap().entrySet()) {
+ final String topologyName = topologyMetadata.hasNamedTopologies() ? topologyEntry.getKey() : null;
+
+ final Set<TopicsInfo> topicsInfoForTopology = new HashSet<>();
+ final Set<String> missingSourceTopicsForTopology = new HashSet<>();
+ final Map<String, InternalTopicConfig> repartitionTopicConfigsForTopology = new HashMap<>();
+
+ for (final Map.Entry<Subtopology, TopicsInfo> subtopologyEntry : topologyEntry.getValue().entrySet()) {
+ final TopicsInfo topicsInfo = subtopologyEntry.getValue();
+
+ topicsInfoForTopology.add(topicsInfo);
+ repartitionTopicConfigsForTopology.putAll(
topicsInfo.repartitionSourceTopics
.values()
.stream()
.collect(Collectors.toMap(InternalTopicConfig::name, topicConfig -> topicConfig)));
+
+ final Set<String> missingSourceTopicsForSubtopology = computeMissingExternalSourceTopics(topicsInfo, clusterMetadata);
+ missingSourceTopicsForTopology.addAll(missingSourceTopicsForSubtopology);
+ if (!missingSourceTopicsForSubtopology.isEmpty()) {
+ missingInputTopicsBySubtopology.put(subtopologyEntry.getKey(), missingSourceTopicsForSubtopology);
+ log.error("Subtopology {} was missing source topics {} and will be excluded from the current assignment, "
+ + "this can be due to the consumer client's metadata being stale or because they have "
+ + "not been created yet. Please verify that you have created all input topics; if they "
+ + "do exist, you just need to wait for the metadata to be updated, at which time a new "
+ + "rebalance will be kicked off automatically and the topology will be retried at that time."
+ + topologyName, missingSourceTopicsForTopology);
+ }
}
- if (missingSourceTopics.isEmpty()) {
- allRepartitionTopicConfigs.putAll(repartitionTopicConfigsPerTopology);
- allTopicsInfo.addAll(topology.getValue());
+ if (missingSourceTopicsForTopology.isEmpty()) {
+ allRepartitionTopicConfigs.putAll(repartitionTopicConfigsForTopology);
+ allTopicsInfo.addAll(topicsInfoForTopology);
} else {
- missingUserInputTopicsPerTopology.put(topologyName, missingSourceTopics);
- log.error("Topology {} was missing source topics {} and will be excluded from the current assignment, "
- + "this can be due to the consumer client's metadata being stale or because they have "
- + "not been created yet. Please verify that you have created all input topics; if they "
- + "do exist, you just need to wait for the metadata to be updated, at which time a new "
- + "rebalance will be kicked off automatically and the topology will be retried at that time."
- + topologyName, missingSourceTopics);
+ log.debug("Skipping repartition topic validation for entire topology {} due to missing source topics {}",
+ topologyName, missingSourceTopicsForTopology);
}
}
setRepartitionSourceTopicPartitionCount(allRepartitionTopicConfigs, allTopicsInfo, clusterMetadata);
diff --git a/streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamThread.java b/streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamThread.java
index a8d2c18..fa3e85b 100644
--- a/streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamThread.java
+++ b/streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamThread.java
@@ -47,6 +47,9 @@ import org.apache.kafka.streams.processor.internals.assignment.ReferenceContaine
import org.apache.kafka.streams.processor.internals.metrics.StreamsMetricsImpl;
import org.apache.kafka.streams.processor.internals.metrics.ThreadMetrics;
import org.apache.kafka.streams.state.internals.ThreadCache;
+
+import java.util.Queue;
+import java.util.function.BiConsumer;
import org.slf4j.Logger;
import java.time.Duration;
@@ -298,12 +301,15 @@ public class StreamThread extends Thread {
private final TopologyMetadata topologyMetadata;
private final java.util.function.Consumer<Long> cacheResizer;
- private java.util.function.Consumer<Throwable> streamsUncaughtExceptionHandler;
+ private BiConsumer<Throwable, Boolean> streamsUncaughtExceptionHandler;
private final Runnable shutdownErrorHook;
// These must be Atomic references as they are shared and used to signal between the assignor and the stream thread
private final AtomicInteger assignmentErrorCode;
private final AtomicLong nextProbingRebalanceMs;
+ // recoverable errors (don't require killing thread) that we need to invoke the exception
+ // handler for, eg MissingSourceTopicException with named topologies
+ private final Queue<StreamsException> nonFatalExceptionsToHandle;
// These are used to signal from outside the stream thread, but the variables themselves are internal to the thread
private final AtomicLong cacheResizeSize = new AtomicLong(-1L);
@@ -324,7 +330,7 @@ public class StreamThread extends Thread {
final StateRestoreListener userStateRestoreListener,
final int threadIdx,
final Runnable shutdownErrorHook,
- final java.util.function.Consumer<Throwable> streamsUncaughtExceptionHandler) {
+ final BiConsumer<Throwable, Boolean> streamsUncaughtExceptionHandler) {
final String threadId = clientId + "-StreamThread-" + threadIdx;
final String logPrefix = String.format("stream-thread [%s] ", threadId);
@@ -419,6 +425,7 @@ public class StreamThread extends Thread {
logContext,
referenceContainer.assignmentErrorCode,
referenceContainer.nextScheduledRebalanceMs,
+ referenceContainer.nonFatalExceptionsToHandle,
shutdownErrorHook,
streamsUncaughtExceptionHandler,
cache::resize
@@ -479,8 +486,9 @@ public class StreamThread extends Thread {
final LogContext logContext,
final AtomicInteger assignmentErrorCode,
final AtomicLong nextProbingRebalanceMs,
+ final Queue<StreamsException> nonFatalExceptionsToHandle,
final Runnable shutdownErrorHook,
- final java.util.function.Consumer<Throwable> streamsUncaughtExceptionHandler,
+ final BiConsumer<Throwable, Boolean> streamsUncaughtExceptionHandler,
final java.util.function.Consumer<Long> cacheResizer) {
super(threadId);
this.stateLock = new Object();
@@ -538,6 +546,7 @@ public class StreamThread extends Thread {
this.changelogReader = changelogReader;
this.originalReset = originalReset;
this.nextProbingRebalanceMs = nextProbingRebalanceMs;
+ this.nonFatalExceptionsToHandle = nonFatalExceptionsToHandle;
this.getGroupInstanceID = mainConsumer.groupMetadata().groupInstanceId();
this.pollTime = Duration.ofMillis(config.getLong(StreamsConfig.POLL_MS_CONFIG));
@@ -575,7 +584,7 @@ public class StreamThread extends Thread {
cleanRun = runLoop();
} catch (final Throwable e) {
failedStreamThreadSensor.record();
- this.streamsUncaughtExceptionHandler.accept(e);
+ this.streamsUncaughtExceptionHandler.accept(e, false);
} finally {
completeShutdown(cleanRun);
}
@@ -641,7 +650,7 @@ public class StreamThread extends Thread {
StreamsConfig.EXACTLY_ONCE_V2, StreamsConfig.EXACTLY_ONCE_BETA);
}
failedStreamThreadSensor.record();
- this.streamsUncaughtExceptionHandler.accept(new StreamsException(e));
+ this.streamsUncaughtExceptionHandler.accept(new StreamsException(e), false);
return false;
} catch (final StreamsException e) {
throw e;
@@ -657,7 +666,7 @@ public class StreamThread extends Thread {
*
* @param streamsUncaughtExceptionHandler the user handler wrapped in shell to execute the action
*/
- public void setStreamsUncaughtExceptionHandler(final java.util.function.Consumer<Throwable> streamsUncaughtExceptionHandler) {
+ public void setStreamsUncaughtExceptionHandler(final BiConsumer<Throwable, Boolean> streamsUncaughtExceptionHandler) {
this.streamsUncaughtExceptionHandler = streamsUncaughtExceptionHandler;
}
@@ -969,6 +978,10 @@ public class StreamThread extends Thread {
pollRecordsSensor.record(numRecords, now);
taskManager.addRecordsToTasks(records);
}
+
+ while (!nonFatalExceptionsToHandle.isEmpty()) {
+ streamsUncaughtExceptionHandler.accept(nonFatalExceptionsToHandle.poll(), true);
+ }
return pollLatency;
}
diff --git a/streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamsPartitionAssignor.java b/streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamsPartitionAssignor.java
index bcfa94b..80c506c 100644
--- a/streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamsPartitionAssignor.java
+++ b/streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamsPartitionAssignor.java
@@ -178,6 +178,7 @@ public class StreamsPartitionAssignor implements ConsumerPartitionAssignor, Conf
private PartitionGrouper partitionGrouper;
private AtomicInteger assignmentErrorCode;
private AtomicLong nextScheduledRebalanceMs;
+ private Queue<StreamsException> nonFatalExceptionsToHandle;
private Time time;
protected int usedSubscriptionMetadataVersion = LATEST_SUPPORTED_VERSION;
@@ -212,6 +213,7 @@ public class StreamsPartitionAssignor implements ConsumerPartitionAssignor, Conf
streamsMetadataState = Objects.requireNonNull(referenceContainer.streamsMetadataState, "StreamsMetadataState was not specified");
assignmentErrorCode = referenceContainer.assignmentErrorCode;
nextScheduledRebalanceMs = referenceContainer.nextScheduledRebalanceMs;
+ nonFatalExceptionsToHandle = referenceContainer.nonFatalExceptionsToHandle;
time = Objects.requireNonNull(referenceContainer.time, "Time was not specified");
assignmentConfigs = assignorConfiguration.assignmentConfigs();
partitionGrouper = new PartitionGrouper();
@@ -382,7 +384,6 @@ public class StreamsPartitionAssignor implements ConsumerPartitionAssignor, Conf
// the maximum of the depending sub-topologies source topics' number of partitions
final RepartitionTopics repartitionTopics = prepareRepartitionTopics(metadata);
final Map<TopicPartition, PartitionInfo> allRepartitionTopicPartitions = repartitionTopics.topicPartitionsInfo();
- final Map<String, Set<String>> missingUserInputTopicsPerTopology = repartitionTopics.missingUserInputTopicsPerTopology();
final Cluster fullMetadata = metadata.withPartitions(allRepartitionTopicPartitions);
log.debug("Created repartition topics {} from the parsed topology.", allRepartitionTopicPartitions.values());
@@ -392,7 +393,7 @@ public class StreamsPartitionAssignor implements ConsumerPartitionAssignor, Conf
// construct the assignment of tasks to clients
final Map<Subtopology, TopicsInfo> topicGroups =
- taskManager.topologyMetadata().subtopologyTopicsInfoMapExcluding(missingUserInputTopicsPerTopology.keySet());
+ taskManager.topologyMetadata().subtopologyTopicsInfoMapExcluding(repartitionTopics.topologiesWithMissingInputTopics());
final Set<String> allSourceTopics = new HashSet<>();
final Map<Subtopology, Set<String>> sourceTopicsByGroup = new HashMap<>();
@@ -511,15 +512,19 @@ public class StreamsPartitionAssignor implements ConsumerPartitionAssignor, Conf
metadata,
logPrefix
);
- final boolean isMissingInputTopics = !repartitionTopics.setup();
+ repartitionTopics.setup();
+ final boolean isMissingInputTopics = !repartitionTopics.missingSourceTopicExceptions().isEmpty();
if (isMissingInputTopics) {
if (!taskManager.topologyMetadata().hasNamedTopologies()) {
throw new MissingSourceTopicException("Missing source topics.");
+ } else {
+ nonFatalExceptionsToHandle.addAll(repartitionTopics.missingSourceTopicExceptions());
}
}
return repartitionTopics;
}
+
/**
* Populates the taskForPartition and tasksForTopicGroup maps, and checks that partitions are assigned to exactly
* one task.
diff --git a/streams/src/main/java/org/apache/kafka/streams/processor/internals/TopologyMetadata.java b/streams/src/main/java/org/apache/kafka/streams/processor/internals/TopologyMetadata.java
index c3da7ce..3ff6e66 100644
--- a/streams/src/main/java/org/apache/kafka/streams/processor/internals/TopologyMetadata.java
+++ b/streams/src/main/java/org/apache/kafka/streams/processor/internals/TopologyMetadata.java
@@ -450,34 +450,33 @@ public class TopologyMetadata {
return lookupBuilderForNamedTopology(topologyName).sourceTopicsForStore(storeName);
}
- private String getTopologyNameOrElseUnnamed(final String topologyName) {
+ public static String getTopologyNameOrElseUnnamed(final String topologyName) {
return topologyName == null ? UNNAMED_TOPOLOGY : topologyName;
}
/**
* @param topologiesToExclude the names of any topologies to exclude from the returned topic groups,
* eg because they have missing source topics and can't be processed yet
+ *
+ * @return flattened map of all subtopologies (from all topologies) to topics info
*/
public Map<Subtopology, TopicsInfo> subtopologyTopicsInfoMapExcluding(final Set<String> topologiesToExclude) {
- final Map<Subtopology, TopicsInfo> topicGroups = new HashMap<>();
- for (final InternalTopologyBuilder builder : builders.values()) {
- if (!topologiesToExclude.contains(builder.topologyName())) {
- topicGroups.putAll(builder.subtopologyToTopicsInfo());
+ final Map<Subtopology, TopicsInfo> subtopologyTopicsInfo = new HashMap<>();
+ applyToEachBuilder(b -> {
+ if (!topologiesToExclude.contains(b.topologyName())) {
+ subtopologyTopicsInfo.putAll(b.subtopologyToTopicsInfo());
}
- }
- return topicGroups;
+ });
+ return subtopologyTopicsInfo;
}
/**
- * @return map from topologies with missing external source topics to the set of missing topic names,
- * keyed by topology name or
+ * @return map from topology to its subtopologies and their topics info
*/
- public Map<String, Collection<TopicsInfo>> topicGroupsByTopology() {
- final Map<String, Collection<TopicsInfo>> topicGroups = new HashMap<>();
- applyToEachBuilder(
- b -> topicGroups.put(getTopologyNameOrElseUnnamed(b.topologyName()), b.subtopologyToTopicsInfo().values())
- );
- return topicGroups;
+ public Map<String, Map<Subtopology, TopicsInfo>> topologyToSubtopologyTopicsInfoMap() {
+ final Map<String, Map<Subtopology, TopicsInfo>> topologyToSubtopologyTopicsInfoMap = new HashMap<>();
+ applyToEachBuilder(b -> topologyToSubtopologyTopicsInfoMap.put(b.topologyName(), b.subtopologyToTopicsInfo()));
+ return topologyToSubtopologyTopicsInfoMap;
}
public Map<String, List<String>> nodeToSourceTopics(final TaskId task) {
diff --git a/streams/src/main/java/org/apache/kafka/streams/processor/internals/assignment/ReferenceContainer.java b/streams/src/main/java/org/apache/kafka/streams/processor/internals/assignment/ReferenceContainer.java
index fbf65e5..9b46eeb 100644
--- a/streams/src/main/java/org/apache/kafka/streams/processor/internals/assignment/ReferenceContainer.java
+++ b/streams/src/main/java/org/apache/kafka/streams/processor/internals/assignment/ReferenceContainer.java
@@ -19,9 +19,12 @@ package org.apache.kafka.streams.processor.internals.assignment;
import org.apache.kafka.clients.admin.Admin;
import org.apache.kafka.clients.consumer.Consumer;
import org.apache.kafka.common.utils.Time;
+import org.apache.kafka.streams.errors.StreamsException;
import org.apache.kafka.streams.processor.internals.StreamsMetadataState;
import org.apache.kafka.streams.processor.internals.TaskManager;
+import java.util.LinkedList;
+import java.util.Queue;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.concurrent.atomic.AtomicLong;
@@ -32,5 +35,6 @@ public class ReferenceContainer {
public StreamsMetadataState streamsMetadataState;
public final AtomicInteger assignmentErrorCode = new AtomicInteger();
public final AtomicLong nextScheduledRebalanceMs = new AtomicLong(Long.MAX_VALUE);
+ public final Queue<StreamsException> nonFatalExceptionsToHandle = new LinkedList<>();
public Time time;
}
diff --git a/streams/src/test/java/org/apache/kafka/streams/integration/NamedTopologyIntegrationTest.java b/streams/src/test/java/org/apache/kafka/streams/integration/NamedTopologyIntegrationTest.java
index 9ce39fd..73b4085 100644
--- a/streams/src/test/java/org/apache/kafka/streams/integration/NamedTopologyIntegrationTest.java
+++ b/streams/src/test/java/org/apache/kafka/streams/integration/NamedTopologyIntegrationTest.java
@@ -31,6 +31,9 @@ import org.apache.kafka.streams.KeyValue;
import org.apache.kafka.streams.LagInfo;
import org.apache.kafka.streams.StreamsConfig;
import org.apache.kafka.streams.StreamsMetadata;
+import org.apache.kafka.streams.errors.MissingSourceTopicException;
+import org.apache.kafka.streams.errors.StreamsException;
+import org.apache.kafka.streams.errors.StreamsUncaughtExceptionHandler;
import org.apache.kafka.streams.integration.utils.EmbeddedKafkaCluster;
import org.apache.kafka.streams.integration.utils.IntegrationTestUtils;
import org.apache.kafka.streams.kstream.Consumed;
@@ -52,6 +55,10 @@ import org.apache.kafka.streams.state.internals.StreamsMetadataImpl;
import org.apache.kafka.streams.utils.UniqueTopicSerdeScope;
import org.apache.kafka.test.TestUtils;
+import java.util.HashMap;
+import java.util.LinkedList;
+import java.util.Map;
+import java.util.Queue;
import org.junit.After;
import org.junit.AfterClass;
import org.junit.Before;
@@ -63,7 +70,6 @@ import java.time.Duration;
import java.util.Collection;
import java.util.Iterator;
import java.util.List;
-import java.util.Map;
import java.util.Optional;
import java.util.Properties;
import java.util.Set;
@@ -76,10 +82,14 @@ import static org.apache.kafka.streams.KeyValue.pair;
import static org.apache.kafka.streams.integration.utils.IntegrationTestUtils.safeUniqueTestName;
import static org.apache.kafka.streams.integration.utils.IntegrationTestUtils.waitForApplicationState;
import static org.apache.kafka.streams.integration.utils.IntegrationTestUtils.waitUntilMinKeyValueRecordsReceived;
+import static org.apache.kafka.streams.processor.internals.ClientUtils.extractThreadId;
+import static org.apache.kafka.test.TestUtils.retryOnExceptionWithTimeout;
import static org.hamcrest.CoreMatchers.equalTo;
import static org.hamcrest.CoreMatchers.is;
import static org.hamcrest.CoreMatchers.not;
+import static org.hamcrest.CoreMatchers.notNullValue;
+import static org.hamcrest.CoreMatchers.nullValue;
import static org.hamcrest.MatcherAssert.assertThat;
import static java.util.Arrays.asList;
import static java.util.Collections.singleton;
@@ -183,7 +193,6 @@ public class NamedTopologyIntegrationTest {
// builders for the 2nd Streams instance
private NamedTopologyBuilder topology1Builder2;
private NamedTopologyBuilder topology2Builder2;
- private NamedTopologyBuilder topology3Builder2;
private Properties configProps(final String appId, final String host) {
final Properties streamsConfiguration = new Properties();
@@ -226,7 +235,6 @@ public class NamedTopologyIntegrationTest {
streams2 = new KafkaStreamsNamedTopologyWrapper(props2, clientSupplier);
topology1Builder2 = streams2.newNamedTopologyBuilder(TOPOLOGY_1);
topology2Builder2 = streams2.newNamedTopologyBuilder(TOPOLOGY_2);
- topology3Builder2 = streams2.newNamedTopologyBuilder(TOPOLOGY_3);
}
@After
@@ -666,6 +674,10 @@ public class NamedTopologyIntegrationTest {
topology1Builder.stream(EXISTING_STREAM).groupBy((k, v) -> k).count(IN_MEMORY_STORE).toStream().to(OUTPUT_STREAM_1);
topology1Builder2.stream(EXISTING_STREAM).groupBy((k, v) -> k).count(IN_MEMORY_STORE).toStream().to(OUTPUT_STREAM_1);
+ final TrackingExceptionHandler handler = new TrackingExceptionHandler();
+ streams.setUncaughtExceptionHandler(handler);
+ streams2.setUncaughtExceptionHandler(handler);
+
streams.start(topology1Builder.build());
streams2.start(topology1Builder2.build());
waitForApplicationState(asList(streams, streams2), State.RUNNING, Duration.ofSeconds(30));
@@ -674,9 +686,18 @@ public class NamedTopologyIntegrationTest {
topology2Builder.stream(NEW_STREAM).groupBy((k, v) -> k).count(IN_MEMORY_STORE).toStream().to(OUTPUT_STREAM_2);
topology2Builder2.stream(NEW_STREAM).groupBy((k, v) -> k).count(IN_MEMORY_STORE).toStream().to(OUTPUT_STREAM_2);
+ assertThat(handler.nextError(TOPOLOGY_2), nullValue());
+
streams.addNamedTopology(topology2Builder.build());
streams2.addNamedTopology(topology2Builder2.build());
+ // verify that the missing source topics were noticed and the handler invoked
+ retryOnExceptionWithTimeout(() -> {
+ final Throwable error = handler.nextError(TOPOLOGY_2);
+ assertThat(error, notNullValue());
+ assertThat(error.getCause().getClass(), is(MissingSourceTopicException.class));
+ });
+
// make sure the original topology can continue processing while waiting on the new source topics
produceToInputTopics(EXISTING_STREAM, singletonList(pair("A", 30L)));
assertThat(waitUntilMinKeyValueRecordsReceived(consumerConfig, OUTPUT_STREAM_1, 1), equalTo(singletonList(pair("A", 3L))));
@@ -684,6 +705,22 @@ public class NamedTopologyIntegrationTest {
CLUSTER.createTopic(NEW_STREAM, 2, 1);
produceToInputTopics(NEW_STREAM, STANDARD_INPUT_DATA);
assertThat(waitUntilMinKeyValueRecordsReceived(consumerConfig, OUTPUT_STREAM_2, 3), equalTo(COUNT_OUTPUT_DATA));
+
+ // Make sure the threads were not actually killed and replaced
+ assertThat(streams.metadataForLocalThreads().size(), equalTo(2));
+ assertThat(streams2.metadataForLocalThreads().size(), equalTo(2));
+
+ final Set<String> localThreadsNames = streams.metadataForLocalThreads().stream()
+ .map(t -> extractThreadId(t.threadName()))
+ .collect(Collectors.toSet());
+ final Set<String> localThreadsNames2 = streams2.metadataForLocalThreads().stream()
+ .map(t -> extractThreadId(t.threadName()))
+ .collect(Collectors.toSet());
+
+ assertThat(localThreadsNames.contains("StreamThread-1"), is(true));
+ assertThat(localThreadsNames.contains("StreamThread-2"), is(true));
+ assertThat(localThreadsNames2.contains("StreamThread-1"), is(true));
+ assertThat(localThreadsNames2.contains("StreamThread-2"), is(true));
} finally {
CLUSTER.deleteTopicsAndWait(EXISTING_STREAM, NEW_STREAM);
}
@@ -695,15 +732,41 @@ public class NamedTopologyIntegrationTest {
topology1Builder.stream(NEW_STREAM).groupBy((k, v) -> k).count(IN_MEMORY_STORE).toStream().to(OUTPUT_STREAM_1);
topology1Builder2.stream(NEW_STREAM).groupBy((k, v) -> k).count(IN_MEMORY_STORE).toStream().to(OUTPUT_STREAM_1);
+ final TrackingExceptionHandler handler = new TrackingExceptionHandler();
+ streams.setUncaughtExceptionHandler(handler);
+ streams2.setUncaughtExceptionHandler(handler);
+
streams.start(topology1Builder.build());
streams2.start(topology1Builder2.build());
waitForApplicationState(asList(streams, streams2), State.RUNNING, Duration.ofSeconds(30));
+ retryOnExceptionWithTimeout(() -> {
+ final Throwable error = handler.nextError(TOPOLOGY_1);
+ assertThat(error, notNullValue());
+ assertThat(error.getCause().getClass(), is(MissingSourceTopicException.class));
+ });
+
try {
CLUSTER.createTopic(NEW_STREAM, 2, 1);
produceToInputTopics(NEW_STREAM, STANDARD_INPUT_DATA);
assertThat(waitUntilMinKeyValueRecordsReceived(consumerConfig, OUTPUT_STREAM_1, 3), equalTo(COUNT_OUTPUT_DATA));
+
+ // Make sure the threads were not actually killed and replaced
+ assertThat(streams.metadataForLocalThreads().size(), equalTo(2));
+ assertThat(streams2.metadataForLocalThreads().size(), equalTo(2));
+
+ final Set<String> localThreadsNames = streams.metadataForLocalThreads().stream()
+ .map(t -> extractThreadId(t.threadName()))
+ .collect(Collectors.toSet());
+ final Set<String> localThreadsNames2 = streams2.metadataForLocalThreads().stream()
+ .map(t -> extractThreadId(t.threadName()))
+ .collect(Collectors.toSet());
+
+ assertThat(localThreadsNames.contains("StreamThread-1"), is(true));
+ assertThat(localThreadsNames.contains("StreamThread-2"), is(true));
+ assertThat(localThreadsNames2.contains("StreamThread-1"), is(true));
+ assertThat(localThreadsNames2.contains("StreamThread-2"), is(true));
} finally {
CLUSTER.deleteTopicsAndWait(NEW_STREAM);
}
@@ -778,4 +841,29 @@ public class NamedTopologyIntegrationTest {
CLUSTER.time
);
}
+
+ private static class TrackingExceptionHandler implements StreamsUncaughtExceptionHandler {
+ private final Map<String, Queue<Throwable>> newErrorsByTopology = new HashMap<>();
+
+ @Override
+ public synchronized StreamThreadExceptionResponse handle(final Throwable exception) {
+ final String topologyName =
+ exception instanceof StreamsException && ((StreamsException) exception).taskId().isPresent() ?
+ ((StreamsException) exception).taskId().get().topologyName()
+ : null;
+
+ newErrorsByTopology.computeIfAbsent(topologyName, t -> new LinkedList<>()).add(exception);
+ if (exception.getCause() instanceof MissingSourceTopicException) {
+ return StreamThreadExceptionResponse.REPLACE_THREAD;
+ } else {
+ return StreamThreadExceptionResponse.SHUTDOWN_APPLICATION;
+ }
+ }
+
+ public synchronized Throwable nextError(final String topologyName) {
+ return newErrorsByTopology.containsKey(topologyName) ?
+ newErrorsByTopology.get(topologyName).poll() :
+ null;
+ }
+ }
}
diff --git a/streams/src/test/java/org/apache/kafka/streams/processor/internals/RepartitionTopicsTest.java b/streams/src/test/java/org/apache/kafka/streams/processor/internals/RepartitionTopicsTest.java
index bacab10..5a020d0 100644
--- a/streams/src/test/java/org/apache/kafka/streams/processor/internals/RepartitionTopicsTest.java
+++ b/streams/src/test/java/org/apache/kafka/streams/processor/internals/RepartitionTopicsTest.java
@@ -21,7 +21,9 @@ import org.apache.kafka.common.Node;
import org.apache.kafka.common.PartitionInfo;
import org.apache.kafka.common.TopicPartition;
import org.apache.kafka.streams.StreamsConfig;
+import org.apache.kafka.streams.errors.StreamsException;
import org.apache.kafka.streams.errors.TaskAssignmentException;
+import org.apache.kafka.streams.processor.TaskId;
import org.apache.kafka.streams.processor.internals.InternalTopologyBuilder.TopicsInfo;
import org.apache.kafka.streams.processor.internals.assignment.CopartitionedTopicsEnforcer;
import org.apache.kafka.streams.processor.internals.testutil.DummyStreamsConfig;
@@ -54,6 +56,7 @@ import static org.easymock.EasyMock.replay;
import static org.easymock.EasyMock.verify;
import static org.hamcrest.CoreMatchers.equalTo;
import static org.hamcrest.CoreMatchers.is;
+import static org.hamcrest.CoreMatchers.notNullValue;
import static org.hamcrest.CoreMatchers.nullValue;
import static org.hamcrest.MatcherAssert.assertThat;
import static org.junit.Assert.assertThrows;
@@ -145,6 +148,9 @@ public class RepartitionTopicsTest {
verifyRepartitionTopicPartitionInfo(topicPartitionsInfo, REPARTITION_TOPIC_NAME1, 3);
verifyRepartitionTopicPartitionInfo(topicPartitionsInfo, REPARTITION_TOPIC_NAME2, 0);
verifyRepartitionTopicPartitionInfo(topicPartitionsInfo, REPARTITION_TOPIC_NAME2, 1);
+
+ assertThat(repartitionTopics.topologiesWithMissingInputTopics().isEmpty(), is(true));
+ assertThat(repartitionTopics.missingSourceTopicExceptions().isEmpty(), is(true));
}
@Test
@@ -168,12 +174,16 @@ public class RepartitionTopicsTest {
clusterMetadata,
"[test] "
);
+ repartitionTopics.setup();
- assertThat(repartitionTopics.setup(), equalTo(false));
assertThat(
- repartitionTopics.missingUserInputTopicsPerTopology(),
- equalTo(Collections.singletonMap(UNNAMED_TOPOLOGY, missingSourceTopics))
+ repartitionTopics.topologiesWithMissingInputTopics(),
+ equalTo(Collections.singleton(UNNAMED_TOPOLOGY))
);
+ final StreamsException exception = repartitionTopics.missingSourceTopicExceptions().poll();
+ assertThat(exception, notNullValue());
+ assertThat(exception.taskId().isPresent(), is(true));
+ assertThat(exception.taskId().get(), equalTo(new TaskId(0, 0)));
}
@Test
@@ -204,6 +214,8 @@ public class RepartitionTopicsTest {
final TaskAssignmentException exception = assertThrows(TaskAssignmentException.class, repartitionTopics::setup);
assertThat(exception.getMessage(), is("Failed to compute number of partitions for all repartition topics, make sure all user input topics are created and all Pattern subscriptions match at least one topic in the cluster"));
+ assertThat(repartitionTopics.topologiesWithMissingInputTopics().isEmpty(), is(true));
+ assertThat(repartitionTopics.missingSourceTopicExceptions().isEmpty(), is(true));
}
@Test
@@ -245,6 +257,8 @@ public class RepartitionTopicsTest {
exception.getMessage(),
is("No partition count found for source topic " + SOURCE_TOPIC_NAME1 + ", but it should have been.")
);
+ assertThat(repartitionTopics.topologiesWithMissingInputTopics().isEmpty(), is(true));
+ assertThat(repartitionTopics.missingSourceTopicExceptions().isEmpty(), is(true));
}
@Test
@@ -299,6 +313,9 @@ public class RepartitionTopicsTest {
verifyRepartitionTopicPartitionInfo(topicPartitionsInfo, REPARTITION_WITHOUT_PARTITION_COUNT, 0);
verifyRepartitionTopicPartitionInfo(topicPartitionsInfo, REPARTITION_WITHOUT_PARTITION_COUNT, 1);
verifyRepartitionTopicPartitionInfo(topicPartitionsInfo, REPARTITION_WITHOUT_PARTITION_COUNT, 2);
+
+ assertThat(repartitionTopics.topologiesWithMissingInputTopics().isEmpty(), is(true));
+ assertThat(repartitionTopics.missingSourceTopicExceptions().isEmpty(), is(true));
}
@Test
@@ -354,6 +371,9 @@ public class RepartitionTopicsTest {
verifyRepartitionTopicPartitionInfo(topicPartitionsInfo, REPARTITION_WITHOUT_PARTITION_COUNT, 1);
verifyRepartitionTopicPartitionInfo(topicPartitionsInfo, REPARTITION_WITHOUT_PARTITION_COUNT, 2);
verifyRepartitionTopicPartitionInfo(topicPartitionsInfo, REPARTITION_WITHOUT_PARTITION_COUNT, 3);
+
+ assertThat(repartitionTopics.topologiesWithMissingInputTopics().isEmpty(), is(true));
+ assertThat(repartitionTopics.missingSourceTopicExceptions().isEmpty(), is(true));
}
@Test
@@ -381,6 +401,9 @@ public class RepartitionTopicsTest {
verify(internalTopicManager, internalTopologyBuilder);
final Map<TopicPartition, PartitionInfo> topicPartitionsInfo = repartitionTopics.topicPartitionsInfo();
assertThat(topicPartitionsInfo, is(Collections.emptyMap()));
+
+ assertThat(repartitionTopics.topologiesWithMissingInputTopics().isEmpty(), is(true));
+ assertThat(repartitionTopics.missingSourceTopicExceptions().isEmpty(), is(true));
}
private void verifyRepartitionTopicPartitionInfo(final Map<TopicPartition, PartitionInfo> topicPartitionsInfo,
diff --git a/streams/src/test/java/org/apache/kafka/streams/processor/internals/StreamThreadTest.java b/streams/src/test/java/org/apache/kafka/streams/processor/internals/StreamThreadTest.java
index 20fab01..5b29961 100644
--- a/streams/src/test/java/org/apache/kafka/streams/processor/internals/StreamThreadTest.java
+++ b/streams/src/test/java/org/apache/kafka/streams/processor/internals/StreamThreadTest.java
@@ -79,6 +79,8 @@ import org.apache.kafka.test.MockStateRestoreListener;
import org.apache.kafka.test.MockTimestampExtractor;
import org.apache.kafka.test.StreamsTestUtils;
import org.apache.kafka.test.TestUtils;
+
+import java.util.function.BiConsumer;
import org.easymock.EasyMock;
import org.junit.Assert;
import org.junit.Before;
@@ -157,7 +159,7 @@ public class StreamThreadTest {
private final InternalStreamsBuilder internalStreamsBuilder = new InternalStreamsBuilder(internalTopologyBuilder);
private StreamsMetadataState streamsMetadataState;
- private final static java.util.function.Consumer<Throwable> HANDLER = e -> {
+ private final static BiConsumer<Throwable, Boolean> HANDLER = (e, b) -> {
if (e instanceof RuntimeException) {
throw (RuntimeException) e;
} else if (e instanceof Error) {
@@ -446,28 +448,9 @@ public class StreamThreadTest {
final TaskManager taskManager = mockTaskManagerCommit(consumer, 1, 1);
EasyMock.replay(consumer, consumerGroupMetadata);
- final StreamsMetricsImpl streamsMetrics =
- new StreamsMetricsImpl(metrics, CLIENT_ID, StreamsConfig.METRICS_LATEST, mockTime);
final TopologyMetadata topologyMetadata = new TopologyMetadata(internalTopologyBuilder, config);
- final StreamThread thread = new StreamThread(
- mockTime,
- config,
- null,
- consumer,
- consumer,
- null,
- null,
- taskManager,
- streamsMetrics,
- topologyMetadata,
- CLIENT_ID,
- new LogContext(""),
- new AtomicInteger(),
- new AtomicLong(Long.MAX_VALUE),
- null,
- HANDLER,
- null
- );
+ topologyMetadata.buildAndRewriteTopology();
+ final StreamThread thread = buildStreamThread(consumer, taskManager, config, topologyMetadata);
thread.setNow(mockTime.milliseconds());
thread.maybeCommit();
mockTime.sleep(commitInterval - 10L);
@@ -698,29 +681,10 @@ public class StreamThreadTest {
EasyMock.replay(consumer, consumerGroupMetadata);
final TaskManager taskManager = mockTaskManagerCommit(consumer, 1, 0);
- final StreamsMetricsImpl streamsMetrics =
- new StreamsMetricsImpl(metrics, CLIENT_ID, StreamsConfig.METRICS_LATEST, mockTime);
final TopologyMetadata topologyMetadata = new TopologyMetadata(internalTopologyBuilder, config);
topologyMetadata.buildAndRewriteTopology();
- final StreamThread thread = new StreamThread(
- mockTime,
- config,
- null,
- consumer,
- consumer,
- null,
- null,
- taskManager,
- streamsMetrics,
- topologyMetadata,
- CLIENT_ID,
- new LogContext(""),
- new AtomicInteger(),
- new AtomicLong(Long.MAX_VALUE),
- null,
- HANDLER,
- null
- );
+
+ final StreamThread thread = buildStreamThread(consumer, taskManager, config, topologyMetadata);
thread.setNow(mockTime.milliseconds());
thread.maybeCommit();
mockTime.sleep(commitInterval - 10L);
@@ -746,9 +710,6 @@ public class StreamThreadTest {
expect(consumerGroupMetadata.groupInstanceId()).andReturn(Optional.empty());
EasyMock.replay(consumer, consumerGroupMetadata);
- final StreamsMetricsImpl streamsMetrics =
- new StreamsMetricsImpl(metrics, CLIENT_ID, StreamsConfig.METRICS_LATEST, mockTime);
-
final AtomicBoolean committed = new AtomicBoolean(false);
final TaskManager taskManager = new TaskManager(
null,
@@ -771,28 +732,9 @@ public class StreamThreadTest {
return 1;
}
};
-
final TopologyMetadata topologyMetadata = new TopologyMetadata(internalTopologyBuilder, config);
topologyMetadata.buildAndRewriteTopology();
- final StreamThread thread = new StreamThread(
- mockTime,
- config,
- null,
- consumer,
- consumer,
- changelogReader,
- null,
- taskManager,
- streamsMetrics,
- topologyMetadata,
- CLIENT_ID,
- new LogContext(""),
- new AtomicInteger(),
- new AtomicLong(Long.MAX_VALUE),
- null,
- HANDLER,
- null
- );
+ final StreamThread thread = buildStreamThread(consumer, taskManager, config, topologyMetadata);
thread.setNow(mockTime.milliseconds());
thread.maybeCommit();
@@ -863,25 +805,7 @@ public class StreamThreadTest {
};
taskManager.setMainConsumer(consumer);
- final StreamThread thread = new StreamThread(
- mockTime,
- config,
- null,
- consumer,
- consumer,
- changelogReader,
- null,
- taskManager,
- streamsMetrics,
- topologyMetadata,
- CLIENT_ID,
- new LogContext(""),
- new AtomicInteger(),
- new AtomicLong(Long.MAX_VALUE),
- null,
- HANDLER,
- null
- );
+ final StreamThread thread = buildStreamThread(consumer, taskManager, config, topologyMetadata);
thread.updateThreadMetadata("adminClientId");
thread.setState(StreamThread.State.STARTING);
@@ -1148,25 +1072,8 @@ public class StreamThreadTest {
new StreamsMetricsImpl(metrics, CLIENT_ID, StreamsConfig.METRICS_LATEST, mockTime);
final TopologyMetadata topologyMetadata = new TopologyMetadata(internalTopologyBuilder, config);
topologyMetadata.buildAndRewriteTopology();
- final StreamThread thread = new StreamThread(
- mockTime,
- config,
- null,
- consumer,
- consumer,
- null,
- null,
- taskManager,
- streamsMetrics,
- topologyMetadata,
- CLIENT_ID,
- new LogContext(""),
- new AtomicInteger(),
- new AtomicLong(Long.MAX_VALUE),
- null,
- HANDLER,
- null
- ).updateThreadMetadata(getSharedAdminClientId(CLIENT_ID));
+ final StreamThread thread = buildStreamThread(consumer, taskManager, config, topologyMetadata)
+ .updateThreadMetadata(getSharedAdminClientId(CLIENT_ID));
thread.setStateListener(
(t, newState, oldState) -> {
if (oldState == StreamThread.State.CREATED && newState == StreamThread.State.STARTING) {
@@ -1223,6 +1130,7 @@ public class StreamThreadTest {
new LogContext(""),
new AtomicInteger(),
new AtomicLong(Long.MAX_VALUE),
+ new LinkedList<>(),
null,
HANDLER,
null
@@ -1255,25 +1163,8 @@ public class StreamThreadTest {
new StreamsMetricsImpl(metrics, CLIENT_ID, StreamsConfig.METRICS_LATEST, mockTime);
final TopologyMetadata topologyMetadata = new TopologyMetadata(internalTopologyBuilder, config);
topologyMetadata.buildAndRewriteTopology();
- final StreamThread thread = new StreamThread(
- mockTime,
- config,
- null,
- consumer,
- consumer,
- null,
- null,
- taskManager,
- streamsMetrics,
- topologyMetadata,
- CLIENT_ID,
- new LogContext(""),
- new AtomicInteger(),
- new AtomicLong(Long.MAX_VALUE),
- null,
- HANDLER,
- null
- ).updateThreadMetadata(getSharedAdminClientId(CLIENT_ID));
+ final StreamThread thread = buildStreamThread(consumer, taskManager, config, topologyMetadata)
+ .updateThreadMetadata(getSharedAdminClientId(CLIENT_ID));
thread.shutdown();
verify(taskManager);
}
@@ -1295,25 +1186,8 @@ public class StreamThreadTest {
new StreamsMetricsImpl(metrics, CLIENT_ID, StreamsConfig.METRICS_LATEST, mockTime);
final TopologyMetadata topologyMetadata = new TopologyMetadata(internalTopologyBuilder, config);
topologyMetadata.buildAndRewriteTopology();
- final StreamThread thread = new StreamThread(
- mockTime,
- config,
- null,
- consumer,
- consumer,
- null,
- null,
- taskManager,
- streamsMetrics,
- topologyMetadata,
- CLIENT_ID,
- new LogContext(""),
- new AtomicInteger(),
- new AtomicLong(Long.MAX_VALUE),
- null,
- HANDLER,
- null
- ).updateThreadMetadata(getSharedAdminClientId(CLIENT_ID));
+ final StreamThread thread = buildStreamThread(consumer, taskManager, config, topologyMetadata)
+ .updateThreadMetadata(getSharedAdminClientId(CLIENT_ID));
thread.shutdown();
// Execute the run method. Verification of the mock will check that shutdown was only done once
thread.run();
@@ -2205,25 +2079,8 @@ public class StreamThreadTest {
new StreamsMetricsImpl(metrics, CLIENT_ID, StreamsConfig.METRICS_LATEST, mockTime);
final TopologyMetadata topologyMetadata = new TopologyMetadata(internalTopologyBuilder, config);
topologyMetadata.buildAndRewriteTopology();
- final StreamThread thread = new StreamThread(
- mockTime,
- config,
- null,
- consumer,
- consumer,
- null,
- null,
- taskManager,
- streamsMetrics,
- topologyMetadata,
- CLIENT_ID,
- new LogContext(""),
- new AtomicInteger(),
- new AtomicLong(Long.MAX_VALUE),
- null,
- HANDLER,
- null
- ).updateThreadMetadata(getSharedAdminClientId(CLIENT_ID));
+ final StreamThread thread = buildStreamThread(consumer, taskManager, config, topologyMetadata)
+ .updateThreadMetadata(getSharedAdminClientId(CLIENT_ID));
consumer.schedulePollTask(() -> {
thread.setState(StreamThread.State.PARTITIONS_REVOKED);
@@ -2255,25 +2112,8 @@ public class StreamThreadTest {
new StreamsMetricsImpl(metrics, CLIENT_ID, StreamsConfig.METRICS_LATEST, mockTime);
final TopologyMetadata topologyMetadata = new TopologyMetadata(internalTopologyBuilder, config);
topologyMetadata.buildAndRewriteTopology();
- final StreamThread thread = new StreamThread(
- mockTime,
- config,
- null,
- consumer,
- consumer,
- null,
- null,
- taskManager,
- streamsMetrics,
- topologyMetadata,
- CLIENT_ID,
- new LogContext(""),
- new AtomicInteger(),
- new AtomicLong(Long.MAX_VALUE),
- null,
- HANDLER,
- null
- ).updateThreadMetadata(getSharedAdminClientId(CLIENT_ID));
+ final StreamThread thread = buildStreamThread(consumer, taskManager, config, topologyMetadata)
+ .updateThreadMetadata(getSharedAdminClientId(CLIENT_ID));
consumer.schedulePollTask(() -> {
thread.setState(StreamThread.State.PARTITIONS_REVOKED);
@@ -2333,6 +2173,7 @@ public class StreamThreadTest {
new LogContext(""),
new AtomicInteger(),
new AtomicLong(Long.MAX_VALUE),
+ new LinkedList<>(),
null,
HANDLER,
null
@@ -2399,6 +2240,7 @@ public class StreamThreadTest {
new LogContext(""),
new AtomicInteger(),
new AtomicLong(Long.MAX_VALUE),
+ new LinkedList<>(),
null,
HANDLER,
null
@@ -2412,7 +2254,7 @@ public class StreamThreadTest {
final AtomicBoolean exceptionHandlerInvoked = new AtomicBoolean(false);
- thread.setStreamsUncaughtExceptionHandler(e -> exceptionHandlerInvoked.set(true));
+ thread.setStreamsUncaughtExceptionHandler((e, b) -> exceptionHandlerInvoked.set(true));
thread.run();
verify(taskManager);
@@ -2473,6 +2315,7 @@ public class StreamThreadTest {
new LogContext(""),
new AtomicInteger(),
new AtomicLong(Long.MAX_VALUE),
+ new LinkedList<>(),
null,
HANDLER,
null
@@ -2542,6 +2385,7 @@ public class StreamThreadTest {
new LogContext(""),
new AtomicInteger(),
new AtomicLong(Long.MAX_VALUE),
+ new LinkedList<>(),
null,
HANDLER,
null
@@ -2609,6 +2453,7 @@ public class StreamThreadTest {
new LogContext(""),
new AtomicInteger(),
new AtomicLong(Long.MAX_VALUE),
+ new LinkedList<>(),
null,
HANDLER,
null
@@ -2663,25 +2508,7 @@ public class StreamThreadTest {
new StreamsMetricsImpl(metrics, CLIENT_ID, StreamsConfig.METRICS_LATEST, mockTime);
final TopologyMetadata topologyMetadata = new TopologyMetadata(internalTopologyBuilder, config);
topologyMetadata.buildAndRewriteTopology();
- final StreamThread thread = new StreamThread(
- mockTime,
- config,
- null,
- consumer,
- consumer,
- null,
- null,
- taskManager,
- streamsMetrics,
- topologyMetadata,
- CLIENT_ID,
- new LogContext(""),
- new AtomicInteger(),
- new AtomicLong(Long.MAX_VALUE),
- null,
- HANDLER,
- null
- );
+ final StreamThread thread = buildStreamThread(consumer, taskManager, config, topologyMetadata);
EasyMock.replay(task1, task2, task3, taskManager);
@@ -2807,25 +2634,7 @@ public class StreamThreadTest {
new StreamsMetricsImpl(metrics, CLIENT_ID, StreamsConfig.METRICS_LATEST, mockTime);
final TopologyMetadata topologyMetadata = new TopologyMetadata(internalTopologyBuilder, config);
topologyMetadata.buildAndRewriteTopology();
- final StreamThread thread = new StreamThread(
- mockTime,
- new StreamsConfig(configProps(true)),
- null,
- consumer,
- consumer,
- null,
- null,
- taskManager,
- streamsMetrics,
- topologyMetadata,
- CLIENT_ID,
- new LogContext(""),
- new AtomicInteger(),
- new AtomicLong(Long.MAX_VALUE),
- null,
- HANDLER,
- null
- );
+ final StreamThread thread = buildStreamThread(consumer, taskManager, config, topologyMetadata);
assertThat(dummyProducerMetrics, is(thread.producerMetrics()));
}
@@ -2865,6 +2674,7 @@ public class StreamThreadTest {
new LogContext(""),
new AtomicInteger(),
new AtomicLong(Long.MAX_VALUE),
+ new LinkedList<>(),
null,
HANDLER,
null
@@ -2921,8 +2731,9 @@ public class StreamThreadTest {
new LogContext(""),
new AtomicInteger(),
new AtomicLong(Long.MAX_VALUE),
+ new LinkedList<>(),
null,
- e -> { },
+ (e, b) -> { },
null
) {
@Override
@@ -3016,4 +2827,33 @@ public class StreamThreadTest {
}
return null;
}
+
+ private StreamThread buildStreamThread(final Consumer<byte[], byte[]> consumer,
+ final TaskManager taskManager,
+ final StreamsConfig config,
+ final TopologyMetadata topologyMetadata) {
+ final StreamsMetricsImpl streamsMetrics =
+ new StreamsMetricsImpl(metrics, CLIENT_ID, StreamsConfig.METRICS_LATEST, mockTime);
+
+ return new StreamThread(
+ mockTime,
+ config,
+ null,
+ consumer,
+ consumer,
+ changelogReader,
+ null,
+ taskManager,
+ streamsMetrics,
+ topologyMetadata,
+ CLIENT_ID,
+ new LogContext(""),
+ new AtomicInteger(),
+ new AtomicLong(Long.MAX_VALUE),
+ new LinkedList<>(),
+ null,
+ HANDLER,
+ null
+ );
+ }
}