You are viewing a plain text version of this content. The canonical link for it is here.
Posted to jira@kafka.apache.org by GitBox <gi...@apache.org> on 2021/12/15 01:00:39 UTC

[GitHub] [kafka] guozhangwang commented on a change in pull request #11600: KAFKA-12648: handle MissingSourceTopicException for named topologies

guozhangwang commented on a change in pull request #11600:
URL: https://github.com/apache/kafka/pull/11600#discussion_r769154123



##########
File path: streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamsPartitionAssignor.java
##########
@@ -317,7 +317,7 @@ public GroupAssignment assign(final Cluster metadata, final GroupSubscription gr
         int minSupportedMetadataVersion = LATEST_SUPPORTED_VERSION;
 
         boolean shutdownRequested = false;
-        boolean assignementErrorFound = false;
+        boolean assignmentErrorFound = false;

Review comment:
       Ah good find :)

##########
File path: streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamsPartitionAssignor.java
##########
@@ -380,15 +380,37 @@ public GroupAssignment assign(final Cluster metadata, final GroupSubscription gr
             // parse the topology to determine the repartition source topics,
             // making sure they are created with the number of partitions as
             // the maximum of the depending sub-topologies source topics' number of partitions
-            final Map<TopicPartition, PartitionInfo> allRepartitionTopicPartitions = prepareRepartitionTopics(metadata);
+            final RepartitionTopics repartitionTopics = new RepartitionTopics(

Review comment:
       Why inline this function? Maybe we can still avoid `StreamsPartitionAssignor` to exclude the function length code if we extra enough LOC?

##########
File path: streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamsPartitionAssignor.java
##########
@@ -380,15 +380,37 @@ public GroupAssignment assign(final Cluster metadata, final GroupSubscription gr
             // parse the topology to determine the repartition source topics,
             // making sure they are created with the number of partitions as
             // the maximum of the depending sub-topologies source topics' number of partitions
-            final Map<TopicPartition, PartitionInfo> allRepartitionTopicPartitions = prepareRepartitionTopics(metadata);
+            final RepartitionTopics repartitionTopics = new RepartitionTopics(
+                taskManager.topologyMetadata(),
+                internalTopicManager,
+                copartitionedTopicsEnforcer,
+                metadata,
+                logPrefix
+            );
+
+            final Map<String, Set<String>> missingExternalSourceTopicsPerTopology = repartitionTopics.setup();
+            if (!missingExternalSourceTopicsPerTopology.isEmpty()) {
+                log.error("The following source topics are missing/unknown: {}. Please make sure all source topics " +
+                              "have been pre-created before starting the Streams application. ",
+                          taskManager.topologyMetadata().hasNamedTopologies() ?
+                              missingExternalSourceTopicsPerTopology :
+                              missingExternalSourceTopicsPerTopology.values()
+                );
+                if (!taskManager.topologyMetadata().hasNamedTopologies()) {
+                    throw new MissingSourceTopicException("Missing source topics.");
+                }

Review comment:
       I think that's covered in the stream-thread where we would poll much longer when all tasks have missing topics.




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscribe@kafka.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org