You are viewing a plain text version of this content. The canonical link for it is here.
Posted to jira@kafka.apache.org by GitBox <gi...@apache.org> on 2022/01/19 10:40:50 UTC

[GitHub] [kafka] cadonna commented on a change in pull request #11686: KAFKA-12648: invoke exception handler for MissingSourceTopicException with named topologies

cadonna commented on a change in pull request #11686:
URL: https://github.com/apache/kafka/pull/11686#discussion_r787544943



##########
File path: streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamsPartitionAssignor.java
##########
@@ -515,11 +518,23 @@ private RepartitionTopics prepareRepartitionTopics(final Cluster metadata) {
         if (isMissingInputTopics) {
             if (!taskManager.topologyMetadata().hasNamedTopologies()) {
                 throw new MissingSourceTopicException("Missing source topics.");
+            } else {
+                for (final Map.Entry<String, Set<String>> topology : repartitionTopics.missingUserInputTopicsPerTopology().entrySet()) {
+                    final String topologyName = topology.getKey();
+                    final StreamsException exception = new StreamsException(
+                        new MissingSourceTopicException(String.format(
+                            "Missing source topics %s for topology %s",
+                            topology.getValue(),
+                            topologyName)),
+                        getDummyTaskIdForTopology(topologyName));

Review comment:
       Why do you need this dummy task ID?

##########
File path: streams/src/main/java/org/apache/kafka/streams/processor/internals/TopologyMetadata.java
##########
@@ -67,6 +67,9 @@
 
     private final ConcurrentNavigableMap<String, InternalTopologyBuilder> builders; // Keep sorted by topology name for readability
 
+    // Handler for recoverable StreamsExceptions which don't require killing/replacing the thread
+    private java.util.function.Consumer<Throwable> recoverableStreamsExceptionHandler;
+

Review comment:
       This is not used anywhere.

##########
File path: streams/src/test/java/org/apache/kafka/streams/integration/NamedTopologyIntegrationTest.java
##########
@@ -600,9 +614,18 @@ public void shouldContinueProcessingOtherTopologiesWhenNewTopologyHasMissingInpu
             topology2Builder.stream(NEW_STREAM).groupBy((k, v) -> k).count(IN_MEMORY_STORE).toStream().to(OUTPUT_STREAM_2);
             topology2Builder2.stream(NEW_STREAM).groupBy((k, v) -> k).count(IN_MEMORY_STORE).toStream().to(OUTPUT_STREAM_2);
 
+            assertThat(handler.nextError(TOPOLOGY_2), nullValue());
+
             streams.addNamedTopology(topology2Builder.build());
             streams2.addNamedTopology(topology2Builder2.build());
 
+            // verify that the missing source topics were noticed and the handler invoked
+            retryOnExceptionWithTimeout(() -> {
+                final Throwable error = handler.nextError(TOPOLOGY_2);
+                assertThat(error, notNullValue());
+                assertThat(error.getCause().getClass(), is(MissingSourceTopicException.class));
+            });
+

Review comment:
       Could you also verify that the stream thread was not replaced? You could use `KafkaStreams#metadataForLocalThreads()` for that.




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscribe@kafka.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org