You are viewing a plain text version of this content. The canonical link for it is here.
Posted to jira@kafka.apache.org by GitBox <gi...@apache.org> on 2022/01/17 13:34:47 UTC

[GitHub] [kafka] cadonna commented on a change in pull request #11600: KAFKA-12648: handle MissingSourceTopicException for named topologies

cadonna commented on a change in pull request #11600:
URL: https://github.com/apache/kafka/pull/11600#discussion_r785917415



##########
File path: streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamsPartitionAssignor.java
##########
@@ -488,23 +492,37 @@ private boolean checkMetadataVersions(final int minReceivedMetadataVersion,
     }
 
     /**
-     * Computes and assembles all repartition topic metadata then creates the topics if necessary.
+     * Computes and assembles all repartition topic metadata then creates the topics if necessary. Also verifies
+     * that all user input topics of each topology have been created ahead of time. If any such source topics are
+     * missing from a NamedTopology, the assignor will skip distributing its tasks until they have been created
+     * and invoke the exception handler (without killing the thread) once for each topology to alert the user of
+     * the missing topics.
+     * <p>
+     * For regular applications without named topologies, the assignor will instead send a shutdown signal to
+     * all clients so the user can identify and resolve the problem.
      *
-     * @return map from repartition topic to its partition info
+     * @return application metadata such as partition info of repartition topics, missing external topics, etc
      */
-    private Map<TopicPartition, PartitionInfo> prepareRepartitionTopics(final Cluster metadata) {
-
+    private RepartitionTopics prepareRepartitionTopics(final Cluster metadata) {
         final RepartitionTopics repartitionTopics = new RepartitionTopics(
             taskManager.topologyMetadata(),
             internalTopicManager,
             copartitionedTopicsEnforcer,
             metadata,
             logPrefix
         );
-        repartitionTopics.setup();
-        return repartitionTopics.topicPartitionsInfo();
+        final boolean isMissingInputTopics = !repartitionTopics.setup();
+        if (isMissingInputTopics) {
+            if (!taskManager.topologyMetadata().hasNamedTopologies()) {
+                throw new MissingSourceTopicException("Missing source topics.");
+            } else {
+
+            }

Review comment:
       Did you intend to leave the the else branch empty? If yes, I would simply remove it.

##########
File path: streams/src/main/java/org/apache/kafka/streams/processor/internals/RepartitionTopics.java
##########
@@ -59,50 +58,92 @@ public RepartitionTopics(final TopologyMetadata topologyMetadata,
         log = logContext.logger(getClass());
     }
 
-    public void setup() {
-        final Map<Subtopology, TopicsInfo> topicGroups = topologyMetadata.topicGroups();
-        final Map<String, InternalTopicConfig> repartitionTopicMetadata = computeRepartitionTopicConfig(topicGroups, clusterMetadata);
-
-        // ensure the co-partitioning topics within the group have the same number of partitions,
-        // and enforce the number of partitions for those repartition topics to be the same if they
-        // are co-partitioned as well.
-        ensureCopartitioning(topologyMetadata.copartitionGroups(), repartitionTopicMetadata, clusterMetadata);
-
-        // make sure the repartition source topics exist with the right number of partitions,
-        // create these topics if necessary
-        internalTopicManager.makeReady(repartitionTopicMetadata);
-
-        // augment the metadata with the newly computed number of partitions for all the
-        // repartition source topics
-        for (final Map.Entry<String, InternalTopicConfig> entry : repartitionTopicMetadata.entrySet()) {
-            final String topic = entry.getKey();
-            final int numPartitions = entry.getValue().numberOfPartitions().orElse(-1);
-
-            for (int partition = 0; partition < numPartitions; partition++) {
-                topicPartitionInfos.put(
-                    new TopicPartition(topic, partition),
-                    new PartitionInfo(topic, partition, null, new Node[0], new Node[0])
-                );
+    /**
+     * @return   true iff setup was completed successfully and all user input topics were verified to exist
+     */
+    public boolean setup() {
+        final Map<String, Collection<TopicsInfo>> topicGroups = topologyMetadata.topicGroupsByTopology();
+        final Map<String, InternalTopicConfig> repartitionTopicMetadata
+            = computeRepartitionTopicConfig(topicGroups, clusterMetadata);
+
+        if (repartitionTopicMetadata.isEmpty()) {
+            if (missingUserInputTopicsPerTopology.isEmpty()) {
+                log.info("Skipping the repartition topic validation since there are no repartition topics.");
+            } else {
+                log.info("Skipping the repartition topic validation since all topologies containing repartition"
+                             + "topics are missing external user source topics and cannot be processed.");
+            }
+        } else {
+            // ensure the co-partitioning topics within the group have the same number of partitions,
+            // and enforce the number of partitions for those repartition topics to be the same if they
+            // are co-partitioned as well.
+            ensureCopartitioning(topologyMetadata.copartitionGroups(), repartitionTopicMetadata, clusterMetadata);
+
+            // make sure the repartition source topics exist with the right number of partitions,
+            // create these topics if necessary
+            internalTopicManager.makeReady(repartitionTopicMetadata);
+
+            // augment the metadata with the newly computed number of partitions for all the
+            // repartition source topics
+            for (final Map.Entry<String, InternalTopicConfig> entry : repartitionTopicMetadata.entrySet()) {
+                final String topic = entry.getKey();
+                final int numPartitions = entry.getValue().numberOfPartitions().orElse(-1);
+
+                for (int partition = 0; partition < numPartitions; partition++) {
+                    topicPartitionInfos.put(
+                        new TopicPartition(topic, partition),
+                        new PartitionInfo(topic, partition, null, new Node[0], new Node[0])
+                    );
+                }
             }
         }
+
+        return missingUserInputTopicsPerTopology.isEmpty();
+    }
+
+    public Map<String, Set<String>> missingUserInputTopicsPerTopology() {
+        return Collections.unmodifiableMap(missingUserInputTopicsPerTopology);
     }
 
     public Map<TopicPartition, PartitionInfo> topicPartitionsInfo() {
         return Collections.unmodifiableMap(topicPartitionInfos);
     }
 
-    private Map<String, InternalTopicConfig> computeRepartitionTopicConfig(final Map<Subtopology, TopicsInfo> topicGroups,
+    /**
+     * @param topicGroups                            information about the topic groups (subtopologies) in this application
+     * @param clusterMetadata                        cluster metadata, eg which topics exist on the brokers
+     */
+    private Map<String, InternalTopicConfig> computeRepartitionTopicConfig(final Map<String, Collection<TopicsInfo>> topicGroups,
                                                                            final Cluster clusterMetadata) {
-
-        final Map<String, InternalTopicConfig> repartitionTopicConfigs = new HashMap<>();
-        for (final TopicsInfo topicsInfo : topicGroups.values()) {
-            checkIfExternalSourceTopicsExist(topicsInfo, clusterMetadata);
-            repartitionTopicConfigs.putAll(topicsInfo.repartitionSourceTopics.values().stream()
-                .collect(Collectors.toMap(InternalTopicConfig::name, topicConfig -> topicConfig)));
+        final Set<TopicsInfo> allTopicsInfo = new HashSet<>();
+        final Map<String, InternalTopicConfig> allRepartitionTopicConfigs = new HashMap<>();
+        for (final Map.Entry<String, Collection<TopicsInfo>> topology : topicGroups.entrySet()) {
+            final String topologyName = topology.getKey();
+            final Set<String> missingSourceTopics = new HashSet<>();
+            final Map<String, InternalTopicConfig> repartitionTopicConfigsPerTopology = new HashMap<>();
+            for (final TopicsInfo topicsInfo : topology.getValue()) {
+                missingSourceTopics.addAll(computeMissingExternalSourceTopics(topicsInfo, clusterMetadata));
+                repartitionTopicConfigsPerTopology.putAll(
+                    topicsInfo.repartitionSourceTopics
+                        .values()
+                        .stream()
+                        .collect(Collectors.toMap(InternalTopicConfig::name, topicConfig -> topicConfig)));
+            }
+            if (missingSourceTopics.isEmpty()) {
+                allRepartitionTopicConfigs.putAll(repartitionTopicConfigsPerTopology);
+                allTopicsInfo.addAll(topology.getValue());
+            } else {
+                missingUserInputTopicsPerTopology.put(topologyName, missingSourceTopics);
+                log.error("Topology {} was missing source topics {} and will be excluded from the current assignment, "
+                              + "this can be due to the consumer client's metadata being stale or because they have "
+                              + "not been created yet. Please verify that you have created all input topics. When the "
+                              + "metadata is updated a new rebalance will be kicked off automatically and the topology "
+                              + "will retried at that time.", topologyName, missingSourceTopics);

Review comment:
       Just commenting again here since my previous comments got outdated due to unrelated changes:
   https://github.com/apache/kafka/pull/11600#discussion_r768605785
   https://github.com/apache/kafka/pull/11600#discussion_r768604129
   
   
   

##########
File path: streams/src/test/java/org/apache/kafka/streams/integration/NamedTopologyIntegrationTest.java
##########
@@ -589,6 +580,52 @@ public void shouldAddToEmptyInitialTopologyRemoveResetOffsetsThenAddSameNamedTop
         CLUSTER.deleteTopicsAndWait(SUM_OUTPUT, COUNT_OUTPUT);
     }
 
+    @Test
+    public void shouldContinueProcessingOtherTopologiesWhenNewTopologyHasMissingInputTopics() throws Exception {
+        setupSecondKafkaStreams();
+        topology1Builder.stream(INPUT_STREAM_1).groupBy((k, v) -> k).count(IN_MEMORY_STORE).toStream().to(OUTPUT_STREAM_1);
+        topology1Builder2.stream(INPUT_STREAM_1).groupBy((k, v) -> k).count(IN_MEMORY_STORE).toStream().to(OUTPUT_STREAM_1);
+
+        streams.start(topology1Builder.build());
+        streams2.start(topology1Builder2.build());
+        waitForApplicationState(asList(streams, streams2), State.RUNNING, Duration.ofSeconds(30));
+        assertThat(waitUntilMinKeyValueRecordsReceived(consumerConfig, OUTPUT_STREAM_1, 3), equalTo(COUNT_OUTPUT_DATA));
+
+        topology2Builder.stream(NEW_STREAM).groupBy((k, v) -> k).count(IN_MEMORY_STORE).toStream().to(OUTPUT_STREAM_2);
+        topology2Builder2.stream(NEW_STREAM).groupBy((k, v) -> k).count(IN_MEMORY_STORE).toStream().to(OUTPUT_STREAM_2);
+
+        streams.addNamedTopology(topology2Builder.build());
+        streams2.addNamedTopology(topology2Builder2.build());
+

Review comment:
       Shouldn't you verify if topology 1 still produces output records at this point? When I read the test name I would expect that verification here.

##########
File path: streams/src/test/java/org/apache/kafka/streams/integration/NamedTopologyIntegrationTest.java
##########
@@ -589,6 +580,52 @@ public void shouldAddToEmptyInitialTopologyRemoveResetOffsetsThenAddSameNamedTop
         CLUSTER.deleteTopicsAndWait(SUM_OUTPUT, COUNT_OUTPUT);
     }
 
+    @Test
+    public void shouldContinueProcessingOtherTopologiesWhenNewTopologyHasMissingInputTopics() throws Exception {
+        setupSecondKafkaStreams();
+        topology1Builder.stream(INPUT_STREAM_1).groupBy((k, v) -> k).count(IN_MEMORY_STORE).toStream().to(OUTPUT_STREAM_1);
+        topology1Builder2.stream(INPUT_STREAM_1).groupBy((k, v) -> k).count(IN_MEMORY_STORE).toStream().to(OUTPUT_STREAM_1);
+
+        streams.start(topology1Builder.build());
+        streams2.start(topology1Builder2.build());
+        waitForApplicationState(asList(streams, streams2), State.RUNNING, Duration.ofSeconds(30));
+        assertThat(waitUntilMinKeyValueRecordsReceived(consumerConfig, OUTPUT_STREAM_1, 3), equalTo(COUNT_OUTPUT_DATA));
+
+        topology2Builder.stream(NEW_STREAM).groupBy((k, v) -> k).count(IN_MEMORY_STORE).toStream().to(OUTPUT_STREAM_2);
+        topology2Builder2.stream(NEW_STREAM).groupBy((k, v) -> k).count(IN_MEMORY_STORE).toStream().to(OUTPUT_STREAM_2);
+
+        streams.addNamedTopology(topology2Builder.build());
+        streams2.addNamedTopology(topology2Builder2.build());
+
+        try {
+            CLUSTER.createTopic(NEW_STREAM, 2, 1);
+            produceToInputTopics(NEW_STREAM, STANDARD_INPUT_DATA);
+            assertThat(waitUntilMinKeyValueRecordsReceived(consumerConfig, OUTPUT_STREAM_2, 3), equalTo(COUNT_OUTPUT_DATA));
+        } finally {
+            CLUSTER.deleteTopicsAndWait(NEW_STREAM);
+        }
+    }
+
+    @Test
+    public void shouldWaitForMissingInputTopicsToBeCreatedWhileOtherTopologyContinuesProcessing() throws Exception {

Review comment:
       The content of this test does not fit with the name of the test. There is no other topology that continues processing. 




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscribe@kafka.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org