You are viewing a plain text version of this content. The canonical link for it is here.
Posted to jira@kafka.apache.org by GitBox <gi...@apache.org> on 2021/12/14 12:52:39 UTC

[GitHub] [kafka] cadonna commented on a change in pull request #11600: KAFKA-12648: handle MissingSourceTopicException for named topologies

cadonna commented on a change in pull request #11600:
URL: https://github.com/apache/kafka/pull/11600#discussion_r768599214



##########
File path: checkstyle/suppressions.xml
##########
@@ -158,7 +158,7 @@
               files="(KafkaStreams|KStreamImpl|KTableImpl|InternalTopologyBuilder|StreamsPartitionAssignor|StreamThread).java"/>
 
     <suppress checks="MethodLength"
-              files="KTableImpl.java"/>
+              files="(KTableImpl|StreamsPartitionAssignor).java"/>

Review comment:
       Sorry for the standard question: Is this really necessary?

##########
File path: streams/src/main/java/org/apache/kafka/streams/processor/internals/RepartitionTopics.java
##########
@@ -85,24 +86,51 @@ public void setup() {
                 );
             }
         }
+
+        return missingExternalSourceTopicsPerTopology;
     }
 
     public Map<TopicPartition, PartitionInfo> topicPartitionsInfo() {
         return Collections.unmodifiableMap(topicPartitionInfos);
     }
 
-    private Map<String, InternalTopicConfig> computeRepartitionTopicConfig(final Map<Subtopology, TopicsInfo> topicGroups,
-                                                                           final Cluster clusterMetadata) {
-
-        final Map<String, InternalTopicConfig> repartitionTopicConfigs = new HashMap<>();
-        for (final TopicsInfo topicsInfo : topicGroups.values()) {
-            checkIfExternalSourceTopicsExist(topicsInfo, clusterMetadata);
-            repartitionTopicConfigs.putAll(topicsInfo.repartitionSourceTopics.values().stream()
-                .collect(Collectors.toMap(InternalTopicConfig::name, topicConfig -> topicConfig)));
+    /**
+     * @param topicGroups                            information about the topic groups (subtopologies) in this application
+     * @param clusterMetadata                        cluster metadata, eg which topics exist on the brokers
+     * @param topologiesWithMissingSourceTopics  set of missing user input topics, to be filled in by this method
+     */
+    private Map<String, InternalTopicConfig> computeRepartitionTopicConfig(final Map<String, Collection<TopicsInfo>> topicGroups,
+                                                                           final Cluster clusterMetadata,
+                                                                           final Map<String, Set<String>> topologiesWithMissingSourceTopics) {
+        final Set<TopicsInfo> allTopicsInfo = new HashSet<>();
+        final Map<String, InternalTopicConfig> allRepartitionTopicConfigs = new HashMap<>();
+        for (final Map.Entry<String, Collection<TopicsInfo>> topology : topicGroups.entrySet()) {
+            final String topologyName = topology.getKey();
+            final Set<String> missingSourceTopicsPerTopology = new HashSet<>();
+            final Map<String, InternalTopicConfig> repartitionTopicConfigsPerTopology = new HashMap<>();
+            for (final TopicsInfo topicsInfo : topology.getValue()) {
+                missingSourceTopicsPerTopology.addAll(computeMissingExternalSourceTopics(topicsInfo, clusterMetadata));
+                repartitionTopicConfigsPerTopology.putAll(
+                    topicsInfo.repartitionSourceTopics
+                        .values()
+                        .stream()
+                        .collect(Collectors.toMap(InternalTopicConfig::name, topicConfig -> topicConfig)));
+            }
+            if (missingSourceTopicsPerTopology.isEmpty()) {
+                allRepartitionTopicConfigs.putAll(repartitionTopicConfigsPerTopology);
+                allTopicsInfo.addAll(topology.getValue());
+            } else {
+                topologiesWithMissingSourceTopics.put(topologyName, missingSourceTopicsPerTopology);
+                log.error("Topology {} was missing source topics {} and will be excluded from the current assignment, "
+                              + "this can be due to the consumer client's metadata being stale or because they have "
+                              + "not been created yet. Please verify that you have created all input topics. When the "
+                              + "metadata is updated a new rebalance will be kicked off automatically and the topology "
+                              + "will retried at that time.", topologyName, missingSourceTopicsPerTopology);

Review comment:
       ```suggestion
                                 + "will be retried at that time.", topologyName, missingSourceTopicsPerTopology);
   ```

##########
File path: streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamsPartitionAssignor.java
##########
@@ -380,15 +380,37 @@ public GroupAssignment assign(final Cluster metadata, final GroupSubscription gr
             // parse the topology to determine the repartition source topics,
             // making sure they are created with the number of partitions as
             // the maximum of the depending sub-topologies source topics' number of partitions
-            final Map<TopicPartition, PartitionInfo> allRepartitionTopicPartitions = prepareRepartitionTopics(metadata);
+            final RepartitionTopics repartitionTopics = new RepartitionTopics(
+                taskManager.topologyMetadata(),
+                internalTopicManager,
+                copartitionedTopicsEnforcer,
+                metadata,
+                logPrefix
+            );
+
+            final Map<String, Set<String>> missingExternalSourceTopicsPerTopology = repartitionTopics.setup();
+            if (!missingExternalSourceTopicsPerTopology.isEmpty()) {
+                log.error("The following source topics are missing/unknown: {}. Please make sure all source topics " +
+                              "have been pre-created before starting the Streams application. ",
+                          taskManager.topologyMetadata().hasNamedTopologies() ?
+                              missingExternalSourceTopicsPerTopology :
+                              missingExternalSourceTopicsPerTopology.values()
+                );

Review comment:
       Why do you log again the same error with other words? Might be confusing during debugging and bloats the logs. 

##########
File path: streams/src/main/java/org/apache/kafka/streams/processor/internals/RepartitionTopics.java
##########
@@ -85,24 +86,51 @@ public void setup() {
                 );
             }
         }
+
+        return missingExternalSourceTopicsPerTopology;
     }
 
     public Map<TopicPartition, PartitionInfo> topicPartitionsInfo() {
         return Collections.unmodifiableMap(topicPartitionInfos);
     }
 
-    private Map<String, InternalTopicConfig> computeRepartitionTopicConfig(final Map<Subtopology, TopicsInfo> topicGroups,
-                                                                           final Cluster clusterMetadata) {
-
-        final Map<String, InternalTopicConfig> repartitionTopicConfigs = new HashMap<>();
-        for (final TopicsInfo topicsInfo : topicGroups.values()) {
-            checkIfExternalSourceTopicsExist(topicsInfo, clusterMetadata);
-            repartitionTopicConfigs.putAll(topicsInfo.repartitionSourceTopics.values().stream()
-                .collect(Collectors.toMap(InternalTopicConfig::name, topicConfig -> topicConfig)));
+    /**
+     * @param topicGroups                            information about the topic groups (subtopologies) in this application
+     * @param clusterMetadata                        cluster metadata, eg which topics exist on the brokers
+     * @param topologiesWithMissingSourceTopics  set of missing user input topics, to be filled in by this method
+     */
+    private Map<String, InternalTopicConfig> computeRepartitionTopicConfig(final Map<String, Collection<TopicsInfo>> topicGroups,
+                                                                           final Cluster clusterMetadata,
+                                                                           final Map<String, Set<String>> topologiesWithMissingSourceTopics) {
+        final Set<TopicsInfo> allTopicsInfo = new HashSet<>();
+        final Map<String, InternalTopicConfig> allRepartitionTopicConfigs = new HashMap<>();
+        for (final Map.Entry<String, Collection<TopicsInfo>> topology : topicGroups.entrySet()) {
+            final String topologyName = topology.getKey();
+            final Set<String> missingSourceTopicsPerTopology = new HashSet<>();
+            final Map<String, InternalTopicConfig> repartitionTopicConfigsPerTopology = new HashMap<>();
+            for (final TopicsInfo topicsInfo : topology.getValue()) {
+                missingSourceTopicsPerTopology.addAll(computeMissingExternalSourceTopics(topicsInfo, clusterMetadata));
+                repartitionTopicConfigsPerTopology.putAll(
+                    topicsInfo.repartitionSourceTopics
+                        .values()
+                        .stream()
+                        .collect(Collectors.toMap(InternalTopicConfig::name, topicConfig -> topicConfig)));
+            }
+            if (missingSourceTopicsPerTopology.isEmpty()) {
+                allRepartitionTopicConfigs.putAll(repartitionTopicConfigsPerTopology);
+                allTopicsInfo.addAll(topology.getValue());
+            } else {
+                topologiesWithMissingSourceTopics.put(topologyName, missingSourceTopicsPerTopology);
+                log.error("Topology {} was missing source topics {} and will be excluded from the current assignment, "
+                              + "this can be due to the consumer client's metadata being stale or because they have "
+                              + "not been created yet. Please verify that you have created all input topics. When the "
+                              + "metadata is updated a new rebalance will be kicked off automatically and the topology "
+                              + "will retried at that time.", topologyName, missingSourceTopicsPerTopology);

Review comment:
       Should we be clearer here that if users created the input topics and this error occurs because of client's metadata being stale, they actually do not need to do anything except wait, while if they did not create the input topics they should create them? At least that is how I understand it.

##########
File path: streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamThread.java
##########
@@ -1137,6 +1138,8 @@ private void completeShutdown(final boolean cleanRun) {
 
         log.info("Shutting down");
 
+        topologyMetadata.unregisterThread(threadMetadata.threadName());

Review comment:
       Why this move? Just curious...

##########
File path: streams/src/main/java/org/apache/kafka/streams/processor/internals/RepartitionTopics.java
##########
@@ -85,24 +86,51 @@ public void setup() {
                 );
             }
         }
+
+        return missingExternalSourceTopicsPerTopology;
     }
 
     public Map<TopicPartition, PartitionInfo> topicPartitionsInfo() {
         return Collections.unmodifiableMap(topicPartitionInfos);
     }
 
-    private Map<String, InternalTopicConfig> computeRepartitionTopicConfig(final Map<Subtopology, TopicsInfo> topicGroups,
-                                                                           final Cluster clusterMetadata) {
-
-        final Map<String, InternalTopicConfig> repartitionTopicConfigs = new HashMap<>();
-        for (final TopicsInfo topicsInfo : topicGroups.values()) {
-            checkIfExternalSourceTopicsExist(topicsInfo, clusterMetadata);
-            repartitionTopicConfigs.putAll(topicsInfo.repartitionSourceTopics.values().stream()
-                .collect(Collectors.toMap(InternalTopicConfig::name, topicConfig -> topicConfig)));
+    /**
+     * @param topicGroups                            information about the topic groups (subtopologies) in this application
+     * @param clusterMetadata                        cluster metadata, eg which topics exist on the brokers
+     * @param topologiesWithMissingSourceTopics  set of missing user input topics, to be filled in by this method
+     */
+    private Map<String, InternalTopicConfig> computeRepartitionTopicConfig(final Map<String, Collection<TopicsInfo>> topicGroups,
+                                                                           final Cluster clusterMetadata,
+                                                                           final Map<String, Set<String>> topologiesWithMissingSourceTopics) {

Review comment:
       nit: Could you rename `topologiesWithMissingSourceTopics` to `missingExternalSourceTopicsPerTopology` or the other way around. It makes reading the code a bit easier. 

##########
File path: streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamsPartitionAssignor.java
##########
@@ -380,15 +380,37 @@ public GroupAssignment assign(final Cluster metadata, final GroupSubscription gr
             // parse the topology to determine the repartition source topics,
             // making sure they are created with the number of partitions as
             // the maximum of the depending sub-topologies source topics' number of partitions
-            final Map<TopicPartition, PartitionInfo> allRepartitionTopicPartitions = prepareRepartitionTopics(metadata);
+            final RepartitionTopics repartitionTopics = new RepartitionTopics(
+                taskManager.topologyMetadata(),
+                internalTopicManager,
+                copartitionedTopicsEnforcer,
+                metadata,
+                logPrefix
+            );
+
+            final Map<String, Set<String>> missingExternalSourceTopicsPerTopology = repartitionTopics.setup();
+            if (!missingExternalSourceTopicsPerTopology.isEmpty()) {
+                log.error("The following source topics are missing/unknown: {}. Please make sure all source topics " +
+                              "have been pre-created before starting the Streams application. ",
+                          taskManager.topologyMetadata().hasNamedTopologies() ?
+                              missingExternalSourceTopicsPerTopology :
+                              missingExternalSourceTopicsPerTopology.values()
+                );
+                if (!taskManager.topologyMetadata().hasNamedTopologies()) {
+                    throw new MissingSourceTopicException("Missing source topics.");
+                }
+            }
+
+            final Map<TopicPartition, PartitionInfo> allRepartitionTopicPartitions = repartitionTopics.topicPartitionsInfo();

Review comment:
       Could you put this block again into a method and remove the checkstyle suppression?

##########
File path: streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamsPartitionAssignor.java
##########
@@ -380,15 +380,37 @@ public GroupAssignment assign(final Cluster metadata, final GroupSubscription gr
             // parse the topology to determine the repartition source topics,
             // making sure they are created with the number of partitions as
             // the maximum of the depending sub-topologies source topics' number of partitions
-            final Map<TopicPartition, PartitionInfo> allRepartitionTopicPartitions = prepareRepartitionTopics(metadata);
+            final RepartitionTopics repartitionTopics = new RepartitionTopics(
+                taskManager.topologyMetadata(),
+                internalTopicManager,
+                copartitionedTopicsEnforcer,
+                metadata,
+                logPrefix
+            );
+
+            final Map<String, Set<String>> missingExternalSourceTopicsPerTopology = repartitionTopics.setup();
+            if (!missingExternalSourceTopicsPerTopology.isEmpty()) {
+                log.error("The following source topics are missing/unknown: {}. Please make sure all source topics " +
+                              "have been pre-created before starting the Streams application. ",
+                          taskManager.topologyMetadata().hasNamedTopologies() ?
+                              missingExternalSourceTopicsPerTopology :
+                              missingExternalSourceTopicsPerTopology.values()
+                );
+                if (!taskManager.topologyMetadata().hasNamedTopologies()) {
+                    throw new MissingSourceTopicException("Missing source topics.");
+                }

Review comment:
       What happens if all named topologies have missing source topics? Do you account for that somewhere else?




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscribe@kafka.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org