You are viewing a plain text version of this content. The canonical link for it is here.
Posted to jira@kafka.apache.org by GitBox <gi...@apache.org> on 2021/12/14 06:17:39 UTC

[GitHub] [kafka] ableegoldman opened a new pull request #11600: KAFKA-12648: handle MissingSourceTopicException for named topologies

ableegoldman opened a new pull request #11600:
URL: https://github.com/apache/kafka/pull/11600


   Another source of flakiness we found in the NamedTopologyIntegrationTest was an ocasional MissingSourceTopicException that was causing the application to shut down. We created all source topics ahead of time in the tests, leading us to discover this [race condition](https://issues.apache.org/jira/browse/KAFKA-13543) in the consumer client which can lead to spurious MissingSourceTopicExceptions when the metadata hasn't finished updating after a change in the consumer's subscription.
   
   In addition to finding a workaround for this bug, throwing this MissingSourceTopicException and shutting down the entire app is itself a bug in the NamedTopology feature -- we should not stop all clients and prevent any further processing of the completely valid topologies just because one (or more) topologies were added that are missing their source topics. We can just remove those topologies from the assignment for the time being, and wait until the metadata has finished updating or the user has created the input topics to start assigning tasks from them.
   
   So, this PR does two things:
   a) Avoid throwing a MissingSourceTopicException inside the #assign method when named topologies are used, and just remove those topologies which are missing any of their input topics from the assignment. 
   b) Trigger the uncaught exception handler with a MissingSourceTopicException for each of the topologies that are missing topics, but don't shut down the thread -- we just want to make sure this issue is made visible to the user.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscribe@kafka.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [kafka] ableegoldman commented on a change in pull request #11600: KAFKA-12648: handle MissingSourceTopicException for named topologies

Posted by GitBox <gi...@apache.org>.
ableegoldman commented on a change in pull request #11600:
URL: https://github.com/apache/kafka/pull/11600#discussion_r781821011



##########
File path: streams/src/main/java/org/apache/kafka/streams/processor/internals/TopologyMetadata.java
##########
@@ -436,9 +436,25 @@ public String getStoreForChangelogTopic(final String topicName) {
         return sourceTopics;
     }
 
-    public Map<Subtopology, TopicsInfo> topicGroups() {
+    /**
+     * @param topologiesToExclude the names of any topologies to exclude from the returned topic groups,
+     *                            eg because they have missing source topics and can't be processed yet
+     */
+    public Map<Subtopology, TopicsInfo> topicGroups(final Set<String> topologiesToExclude) {

Review comment:
       This method is used a pretty large number of times and I don't want to bloat the PR with style changes, but I totally agree that this name is confusing. I used to never remember what it was and would have to click through to the method implementation and skim the code to figure out what it was doing anytime it came up in the code. Anyways I can file a followup ticket for this




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscribe@kafka.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [kafka] ableegoldman commented on a change in pull request #11600: KAFKA-12648: handle MissingSourceTopicException for named topologies

Posted by GitBox <gi...@apache.org>.
ableegoldman commented on a change in pull request #11600:
URL: https://github.com/apache/kafka/pull/11600#discussion_r781948695



##########
File path: streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamsPartitionAssignor.java
##########
@@ -380,15 +380,37 @@ public GroupAssignment assign(final Cluster metadata, final GroupSubscription gr
             // parse the topology to determine the repartition source topics,
             // making sure they are created with the number of partitions as
             // the maximum of the depending sub-topologies source topics' number of partitions
-            final Map<TopicPartition, PartitionInfo> allRepartitionTopicPartitions = prepareRepartitionTopics(metadata);
+            final RepartitionTopics repartitionTopics = new RepartitionTopics(
+                taskManager.topologyMetadata(),
+                internalTopicManager,
+                copartitionedTopicsEnforcer,
+                metadata,
+                logPrefix
+            );
+
+            final Map<String, Set<String>> missingExternalSourceTopicsPerTopology = repartitionTopics.setup();
+            if (!missingExternalSourceTopicsPerTopology.isEmpty()) {
+                log.error("The following source topics are missing/unknown: {}. Please make sure all source topics " +
+                              "have been pre-created before starting the Streams application. ",
+                          taskManager.topologyMetadata().hasNamedTopologies() ?
+                              missingExternalSourceTopicsPerTopology :
+                              missingExternalSourceTopicsPerTopology.values()
+                );
+                if (!taskManager.topologyMetadata().hasNamedTopologies()) {
+                    throw new MissingSourceTopicException("Missing source topics.");
+                }
+            }
+
+            final Map<TopicPartition, PartitionInfo> allRepartitionTopicPartitions = repartitionTopics.topicPartitionsInfo();

Review comment:
       ok, ok, you guys convinced me like 4 comments ago :P




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscribe@kafka.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [kafka] ableegoldman commented on a change in pull request #11600: KAFKA-12648: handle MissingSourceTopicException for named topologies

Posted by GitBox <gi...@apache.org>.
ableegoldman commented on a change in pull request #11600:
URL: https://github.com/apache/kafka/pull/11600#discussion_r786451847



##########
File path: streams/src/test/java/org/apache/kafka/streams/integration/NamedTopologyIntegrationTest.java
##########
@@ -589,6 +580,52 @@ public void shouldAddToEmptyInitialTopologyRemoveResetOffsetsThenAddSameNamedTop
         CLUSTER.deleteTopicsAndWait(SUM_OUTPUT, COUNT_OUTPUT);
     }
 
+    @Test
+    public void shouldContinueProcessingOtherTopologiesWhenNewTopologyHasMissingInputTopics() throws Exception {
+        setupSecondKafkaStreams();
+        topology1Builder.stream(INPUT_STREAM_1).groupBy((k, v) -> k).count(IN_MEMORY_STORE).toStream().to(OUTPUT_STREAM_1);
+        topology1Builder2.stream(INPUT_STREAM_1).groupBy((k, v) -> k).count(IN_MEMORY_STORE).toStream().to(OUTPUT_STREAM_1);
+
+        streams.start(topology1Builder.build());
+        streams2.start(topology1Builder2.build());
+        waitForApplicationState(asList(streams, streams2), State.RUNNING, Duration.ofSeconds(30));
+        assertThat(waitUntilMinKeyValueRecordsReceived(consumerConfig, OUTPUT_STREAM_1, 3), equalTo(COUNT_OUTPUT_DATA));
+
+        topology2Builder.stream(NEW_STREAM).groupBy((k, v) -> k).count(IN_MEMORY_STORE).toStream().to(OUTPUT_STREAM_2);
+        topology2Builder2.stream(NEW_STREAM).groupBy((k, v) -> k).count(IN_MEMORY_STORE).toStream().to(OUTPUT_STREAM_2);
+
+        streams.addNamedTopology(topology2Builder.build());
+        streams2.addNamedTopology(topology2Builder2.build());
+

Review comment:
       Ah good catch, added to the test




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscribe@kafka.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [kafka] cadonna commented on a change in pull request #11600: KAFKA-12648: handle MissingSourceTopicException for named topologies

Posted by GitBox <gi...@apache.org>.
cadonna commented on a change in pull request #11600:
URL: https://github.com/apache/kafka/pull/11600#discussion_r786702599



##########
File path: streams/src/test/java/org/apache/kafka/streams/processor/internals/RepartitionTopicsTest.java
##########
@@ -162,14 +169,17 @@ public void shouldThrowMissingSourceTopicException() {
             "[test] "
         );
 
-        assertThrows(MissingSourceTopicException.class, repartitionTopics::setup);
+        assertThat(repartitionTopics.setup(), equalTo(false));

Review comment:
       Could you also verify that in the cases without missing source topics `setup()` returns `true` in the other unit tests? 

##########
File path: streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamsPartitionAssignor.java
##########
@@ -488,21 +492,32 @@ private boolean checkMetadataVersions(final int minReceivedMetadataVersion,
     }
 
     /**
-     * Computes and assembles all repartition topic metadata then creates the topics if necessary.
+     * Computes and assembles all repartition topic metadata then creates the topics if necessary. Also verifies
+     * that all user input topics of each topology have been created ahead of time. If any such source topics are
+     * missing from a NamedTopology, the assignor will skip distributing its tasks until they have been created
+     * and invoke the exception handler (without killing the thread) once for each topology to alert the user of
+     * the missing topics.
+     * <p>
+     * For regular applications without named topologies, the assignor will instead send a shutdown signal to
+     * all clients so the user can identify and resolve the problem.
      *
-     * @return map from repartition topic to its partition info
+     * @return application metadata such as partition info of repartition topics, missing external topics, etc
      */
-    private Map<TopicPartition, PartitionInfo> prepareRepartitionTopics(final Cluster metadata) {
-
+    private RepartitionTopics prepareRepartitionTopics(final Cluster metadata) {
         final RepartitionTopics repartitionTopics = new RepartitionTopics(
             taskManager.topologyMetadata(),
             internalTopicManager,
             copartitionedTopicsEnforcer,
             metadata,
             logPrefix
         );
-        repartitionTopics.setup();
-        return repartitionTopics.topicPartitionsInfo();
+        final boolean isMissingInputTopics = !repartitionTopics.setup();
+        if (isMissingInputTopics) {
+            if (!taskManager.topologyMetadata().hasNamedTopologies()) {
+                throw new MissingSourceTopicException("Missing source topics.");
+            }
+        }

Review comment:
       That is just a nit that you might consider in the follow-up PR: I would not return a boolean from `setup()` since actually the setup succeeds even if the return value is `false`. I would check the return value of `RepartitionTopics#missingUserInputTopicsPerTopology()`. You could even move all this checks into `RepartitionTopics`. I think, it would make the code a bit cleaner.   




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscribe@kafka.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [kafka] ableegoldman commented on a change in pull request #11600: KAFKA-12648: handle MissingSourceTopicException for named topologies

Posted by GitBox <gi...@apache.org>.
ableegoldman commented on a change in pull request #11600:
URL: https://github.com/apache/kafka/pull/11600#discussion_r782209624



##########
File path: streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamsPartitionAssignor.java
##########
@@ -380,15 +380,37 @@ public GroupAssignment assign(final Cluster metadata, final GroupSubscription gr
             // parse the topology to determine the repartition source topics,
             // making sure they are created with the number of partitions as
             // the maximum of the depending sub-topologies source topics' number of partitions
-            final Map<TopicPartition, PartitionInfo> allRepartitionTopicPartitions = prepareRepartitionTopics(metadata);
+            final RepartitionTopics repartitionTopics = new RepartitionTopics(

Review comment:
       I wanted to break up the `prepareRepartitinTopics` call into explicit steps so I could get the missing source topics returned by the middle step, but I think there's a better way to achieve what we need here so I'll clean it up




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscribe@kafka.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [kafka] ableegoldman commented on a change in pull request #11600: KAFKA-12648: handle MissingSourceTopicException for named topologies

Posted by GitBox <gi...@apache.org>.
ableegoldman commented on a change in pull request #11600:
URL: https://github.com/apache/kafka/pull/11600#discussion_r781817617



##########
File path: streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamThread.java
##########
@@ -1137,6 +1138,8 @@ private void completeShutdown(final boolean cleanRun) {
 
         log.info("Shutting down");
 
+        topologyMetadata.unregisterThread(threadMetadata.threadName());

Review comment:
       The `#addNamedTopology` or `#removeNamedTopology` APIs will block until all local threads have ack'ed the topology change. I moved the un-registration to the beginning of the shutdown to prevent it from hanging if we get stuck during one of the other cleanup tasks (IIRC there is an edge case bug in the producer that can cause it to hang permanently during #close), and also to avoid blocking unnecessarily while the thread completes its shutdown, since we don't care if a removed thread gets the update or not




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscribe@kafka.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [kafka] wcarlson5 commented on a change in pull request #11600: KAFKA-12648: handle MissingSourceTopicException for named topologies

Posted by GitBox <gi...@apache.org>.
wcarlson5 commented on a change in pull request #11600:
URL: https://github.com/apache/kafka/pull/11600#discussion_r768947553



##########
File path: streams/src/main/java/org/apache/kafka/streams/processor/internals/TopologyMetadata.java
##########
@@ -436,9 +436,25 @@ public String getStoreForChangelogTopic(final String topicName) {
         return sourceTopics;
     }
 
-    public Map<Subtopology, TopicsInfo> topicGroups() {
+    /**
+     * @param topologiesToExclude the names of any topologies to exclude from the returned topic groups,
+     *                            eg because they have missing source topics and can't be processed yet
+     */
+    public Map<Subtopology, TopicsInfo> topicGroups(final Set<String> topologiesToExclude) {

Review comment:
       nit: can we update the name to be more descriptive?




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscribe@kafka.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [kafka] guozhangwang commented on a change in pull request #11600: KAFKA-12648: handle MissingSourceTopicException for named topologies

Posted by GitBox <gi...@apache.org>.
guozhangwang commented on a change in pull request #11600:
URL: https://github.com/apache/kafka/pull/11600#discussion_r769154123



##########
File path: streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamsPartitionAssignor.java
##########
@@ -317,7 +317,7 @@ public GroupAssignment assign(final Cluster metadata, final GroupSubscription gr
         int minSupportedMetadataVersion = LATEST_SUPPORTED_VERSION;
 
         boolean shutdownRequested = false;
-        boolean assignementErrorFound = false;
+        boolean assignmentErrorFound = false;

Review comment:
       Ah good find :)

##########
File path: streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamsPartitionAssignor.java
##########
@@ -380,15 +380,37 @@ public GroupAssignment assign(final Cluster metadata, final GroupSubscription gr
             // parse the topology to determine the repartition source topics,
             // making sure they are created with the number of partitions as
             // the maximum of the depending sub-topologies source topics' number of partitions
-            final Map<TopicPartition, PartitionInfo> allRepartitionTopicPartitions = prepareRepartitionTopics(metadata);
+            final RepartitionTopics repartitionTopics = new RepartitionTopics(

Review comment:
       Why inline this function? Maybe we can still avoid `StreamsPartitionAssignor` to exclude the function length code if we extra enough LOC?

##########
File path: streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamsPartitionAssignor.java
##########
@@ -380,15 +380,37 @@ public GroupAssignment assign(final Cluster metadata, final GroupSubscription gr
             // parse the topology to determine the repartition source topics,
             // making sure they are created with the number of partitions as
             // the maximum of the depending sub-topologies source topics' number of partitions
-            final Map<TopicPartition, PartitionInfo> allRepartitionTopicPartitions = prepareRepartitionTopics(metadata);
+            final RepartitionTopics repartitionTopics = new RepartitionTopics(
+                taskManager.topologyMetadata(),
+                internalTopicManager,
+                copartitionedTopicsEnforcer,
+                metadata,
+                logPrefix
+            );
+
+            final Map<String, Set<String>> missingExternalSourceTopicsPerTopology = repartitionTopics.setup();
+            if (!missingExternalSourceTopicsPerTopology.isEmpty()) {
+                log.error("The following source topics are missing/unknown: {}. Please make sure all source topics " +
+                              "have been pre-created before starting the Streams application. ",
+                          taskManager.topologyMetadata().hasNamedTopologies() ?
+                              missingExternalSourceTopicsPerTopology :
+                              missingExternalSourceTopicsPerTopology.values()
+                );
+                if (!taskManager.topologyMetadata().hasNamedTopologies()) {
+                    throw new MissingSourceTopicException("Missing source topics.");
+                }

Review comment:
       I think that's covered in the stream-thread where we would poll much longer when all tasks have missing topics.




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscribe@kafka.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [kafka] ableegoldman commented on a change in pull request #11600: KAFKA-12648: handle MissingSourceTopicException for named topologies

Posted by GitBox <gi...@apache.org>.
ableegoldman commented on a change in pull request #11600:
URL: https://github.com/apache/kafka/pull/11600#discussion_r782215325



##########
File path: streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamsPartitionAssignor.java
##########
@@ -380,15 +380,37 @@ public GroupAssignment assign(final Cluster metadata, final GroupSubscription gr
             // parse the topology to determine the repartition source topics,
             // making sure they are created with the number of partitions as
             // the maximum of the depending sub-topologies source topics' number of partitions
-            final Map<TopicPartition, PartitionInfo> allRepartitionTopicPartitions = prepareRepartitionTopics(metadata);
+            final RepartitionTopics repartitionTopics = new RepartitionTopics(
+                taskManager.topologyMetadata(),
+                internalTopicManager,
+                copartitionedTopicsEnforcer,
+                metadata,
+                logPrefix
+            );
+
+            final Map<String, Set<String>> missingExternalSourceTopicsPerTopology = repartitionTopics.setup();
+            if (!missingExternalSourceTopicsPerTopology.isEmpty()) {
+                log.error("The following source topics are missing/unknown: {}. Please make sure all source topics " +
+                              "have been pre-created before starting the Streams application. ",
+                          taskManager.topologyMetadata().hasNamedTopologies() ?
+                              missingExternalSourceTopicsPerTopology :
+                              missingExternalSourceTopicsPerTopology.values()
+                );

Review comment:
       Good point, I think I changed how things worked halfway through this PR and forgot to update/put some things back the way they were when the changes were no longer necessary (like this and inlining the `prepareRepartitionTopics` method)




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscribe@kafka.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [kafka] cadonna commented on a change in pull request #11600: KAFKA-12648: handle MissingSourceTopicException for named topologies

Posted by GitBox <gi...@apache.org>.
cadonna commented on a change in pull request #11600:
URL: https://github.com/apache/kafka/pull/11600#discussion_r785917415



##########
File path: streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamsPartitionAssignor.java
##########
@@ -488,23 +492,37 @@ private boolean checkMetadataVersions(final int minReceivedMetadataVersion,
     }
 
     /**
-     * Computes and assembles all repartition topic metadata then creates the topics if necessary.
+     * Computes and assembles all repartition topic metadata then creates the topics if necessary. Also verifies
+     * that all user input topics of each topology have been created ahead of time. If any such source topics are
+     * missing from a NamedTopology, the assignor will skip distributing its tasks until they have been created
+     * and invoke the exception handler (without killing the thread) once for each topology to alert the user of
+     * the missing topics.
+     * <p>
+     * For regular applications without named topologies, the assignor will instead send a shutdown signal to
+     * all clients so the user can identify and resolve the problem.
      *
-     * @return map from repartition topic to its partition info
+     * @return application metadata such as partition info of repartition topics, missing external topics, etc
      */
-    private Map<TopicPartition, PartitionInfo> prepareRepartitionTopics(final Cluster metadata) {
-
+    private RepartitionTopics prepareRepartitionTopics(final Cluster metadata) {
         final RepartitionTopics repartitionTopics = new RepartitionTopics(
             taskManager.topologyMetadata(),
             internalTopicManager,
             copartitionedTopicsEnforcer,
             metadata,
             logPrefix
         );
-        repartitionTopics.setup();
-        return repartitionTopics.topicPartitionsInfo();
+        final boolean isMissingInputTopics = !repartitionTopics.setup();
+        if (isMissingInputTopics) {
+            if (!taskManager.topologyMetadata().hasNamedTopologies()) {
+                throw new MissingSourceTopicException("Missing source topics.");
+            } else {
+
+            }

Review comment:
       Did you intend to leave the the else branch empty? If yes, I would simply remove it.

##########
File path: streams/src/main/java/org/apache/kafka/streams/processor/internals/RepartitionTopics.java
##########
@@ -59,50 +58,92 @@ public RepartitionTopics(final TopologyMetadata topologyMetadata,
         log = logContext.logger(getClass());
     }
 
-    public void setup() {
-        final Map<Subtopology, TopicsInfo> topicGroups = topologyMetadata.topicGroups();
-        final Map<String, InternalTopicConfig> repartitionTopicMetadata = computeRepartitionTopicConfig(topicGroups, clusterMetadata);
-
-        // ensure the co-partitioning topics within the group have the same number of partitions,
-        // and enforce the number of partitions for those repartition topics to be the same if they
-        // are co-partitioned as well.
-        ensureCopartitioning(topologyMetadata.copartitionGroups(), repartitionTopicMetadata, clusterMetadata);
-
-        // make sure the repartition source topics exist with the right number of partitions,
-        // create these topics if necessary
-        internalTopicManager.makeReady(repartitionTopicMetadata);
-
-        // augment the metadata with the newly computed number of partitions for all the
-        // repartition source topics
-        for (final Map.Entry<String, InternalTopicConfig> entry : repartitionTopicMetadata.entrySet()) {
-            final String topic = entry.getKey();
-            final int numPartitions = entry.getValue().numberOfPartitions().orElse(-1);
-
-            for (int partition = 0; partition < numPartitions; partition++) {
-                topicPartitionInfos.put(
-                    new TopicPartition(topic, partition),
-                    new PartitionInfo(topic, partition, null, new Node[0], new Node[0])
-                );
+    /**
+     * @return   true iff setup was completed successfully and all user input topics were verified to exist
+     */
+    public boolean setup() {
+        final Map<String, Collection<TopicsInfo>> topicGroups = topologyMetadata.topicGroupsByTopology();
+        final Map<String, InternalTopicConfig> repartitionTopicMetadata
+            = computeRepartitionTopicConfig(topicGroups, clusterMetadata);
+
+        if (repartitionTopicMetadata.isEmpty()) {
+            if (missingUserInputTopicsPerTopology.isEmpty()) {
+                log.info("Skipping the repartition topic validation since there are no repartition topics.");
+            } else {
+                log.info("Skipping the repartition topic validation since all topologies containing repartition"
+                             + "topics are missing external user source topics and cannot be processed.");
+            }
+        } else {
+            // ensure the co-partitioning topics within the group have the same number of partitions,
+            // and enforce the number of partitions for those repartition topics to be the same if they
+            // are co-partitioned as well.
+            ensureCopartitioning(topologyMetadata.copartitionGroups(), repartitionTopicMetadata, clusterMetadata);
+
+            // make sure the repartition source topics exist with the right number of partitions,
+            // create these topics if necessary
+            internalTopicManager.makeReady(repartitionTopicMetadata);
+
+            // augment the metadata with the newly computed number of partitions for all the
+            // repartition source topics
+            for (final Map.Entry<String, InternalTopicConfig> entry : repartitionTopicMetadata.entrySet()) {
+                final String topic = entry.getKey();
+                final int numPartitions = entry.getValue().numberOfPartitions().orElse(-1);
+
+                for (int partition = 0; partition < numPartitions; partition++) {
+                    topicPartitionInfos.put(
+                        new TopicPartition(topic, partition),
+                        new PartitionInfo(topic, partition, null, new Node[0], new Node[0])
+                    );
+                }
             }
         }
+
+        return missingUserInputTopicsPerTopology.isEmpty();
+    }
+
+    public Map<String, Set<String>> missingUserInputTopicsPerTopology() {
+        return Collections.unmodifiableMap(missingUserInputTopicsPerTopology);
     }
 
     public Map<TopicPartition, PartitionInfo> topicPartitionsInfo() {
         return Collections.unmodifiableMap(topicPartitionInfos);
     }
 
-    private Map<String, InternalTopicConfig> computeRepartitionTopicConfig(final Map<Subtopology, TopicsInfo> topicGroups,
+    /**
+     * @param topicGroups                            information about the topic groups (subtopologies) in this application
+     * @param clusterMetadata                        cluster metadata, eg which topics exist on the brokers
+     */
+    private Map<String, InternalTopicConfig> computeRepartitionTopicConfig(final Map<String, Collection<TopicsInfo>> topicGroups,
                                                                            final Cluster clusterMetadata) {
-
-        final Map<String, InternalTopicConfig> repartitionTopicConfigs = new HashMap<>();
-        for (final TopicsInfo topicsInfo : topicGroups.values()) {
-            checkIfExternalSourceTopicsExist(topicsInfo, clusterMetadata);
-            repartitionTopicConfigs.putAll(topicsInfo.repartitionSourceTopics.values().stream()
-                .collect(Collectors.toMap(InternalTopicConfig::name, topicConfig -> topicConfig)));
+        final Set<TopicsInfo> allTopicsInfo = new HashSet<>();
+        final Map<String, InternalTopicConfig> allRepartitionTopicConfigs = new HashMap<>();
+        for (final Map.Entry<String, Collection<TopicsInfo>> topology : topicGroups.entrySet()) {
+            final String topologyName = topology.getKey();
+            final Set<String> missingSourceTopics = new HashSet<>();
+            final Map<String, InternalTopicConfig> repartitionTopicConfigsPerTopology = new HashMap<>();
+            for (final TopicsInfo topicsInfo : topology.getValue()) {
+                missingSourceTopics.addAll(computeMissingExternalSourceTopics(topicsInfo, clusterMetadata));
+                repartitionTopicConfigsPerTopology.putAll(
+                    topicsInfo.repartitionSourceTopics
+                        .values()
+                        .stream()
+                        .collect(Collectors.toMap(InternalTopicConfig::name, topicConfig -> topicConfig)));
+            }
+            if (missingSourceTopics.isEmpty()) {
+                allRepartitionTopicConfigs.putAll(repartitionTopicConfigsPerTopology);
+                allTopicsInfo.addAll(topology.getValue());
+            } else {
+                missingUserInputTopicsPerTopology.put(topologyName, missingSourceTopics);
+                log.error("Topology {} was missing source topics {} and will be excluded from the current assignment, "
+                              + "this can be due to the consumer client's metadata being stale or because they have "
+                              + "not been created yet. Please verify that you have created all input topics. When the "
+                              + "metadata is updated a new rebalance will be kicked off automatically and the topology "
+                              + "will retried at that time.", topologyName, missingSourceTopics);

Review comment:
       Just commenting again here since my previous comments got outdated due to unrelated changes:
   https://github.com/apache/kafka/pull/11600#discussion_r768605785
   https://github.com/apache/kafka/pull/11600#discussion_r768604129
   
   
   

##########
File path: streams/src/test/java/org/apache/kafka/streams/integration/NamedTopologyIntegrationTest.java
##########
@@ -589,6 +580,52 @@ public void shouldAddToEmptyInitialTopologyRemoveResetOffsetsThenAddSameNamedTop
         CLUSTER.deleteTopicsAndWait(SUM_OUTPUT, COUNT_OUTPUT);
     }
 
+    @Test
+    public void shouldContinueProcessingOtherTopologiesWhenNewTopologyHasMissingInputTopics() throws Exception {
+        setupSecondKafkaStreams();
+        topology1Builder.stream(INPUT_STREAM_1).groupBy((k, v) -> k).count(IN_MEMORY_STORE).toStream().to(OUTPUT_STREAM_1);
+        topology1Builder2.stream(INPUT_STREAM_1).groupBy((k, v) -> k).count(IN_MEMORY_STORE).toStream().to(OUTPUT_STREAM_1);
+
+        streams.start(topology1Builder.build());
+        streams2.start(topology1Builder2.build());
+        waitForApplicationState(asList(streams, streams2), State.RUNNING, Duration.ofSeconds(30));
+        assertThat(waitUntilMinKeyValueRecordsReceived(consumerConfig, OUTPUT_STREAM_1, 3), equalTo(COUNT_OUTPUT_DATA));
+
+        topology2Builder.stream(NEW_STREAM).groupBy((k, v) -> k).count(IN_MEMORY_STORE).toStream().to(OUTPUT_STREAM_2);
+        topology2Builder2.stream(NEW_STREAM).groupBy((k, v) -> k).count(IN_MEMORY_STORE).toStream().to(OUTPUT_STREAM_2);
+
+        streams.addNamedTopology(topology2Builder.build());
+        streams2.addNamedTopology(topology2Builder2.build());
+

Review comment:
       Shouldn't you verify if topology 1 still produces output records at this point? When I read the test name I would expect that verification here.

##########
File path: streams/src/test/java/org/apache/kafka/streams/integration/NamedTopologyIntegrationTest.java
##########
@@ -589,6 +580,52 @@ public void shouldAddToEmptyInitialTopologyRemoveResetOffsetsThenAddSameNamedTop
         CLUSTER.deleteTopicsAndWait(SUM_OUTPUT, COUNT_OUTPUT);
     }
 
+    @Test
+    public void shouldContinueProcessingOtherTopologiesWhenNewTopologyHasMissingInputTopics() throws Exception {
+        setupSecondKafkaStreams();
+        topology1Builder.stream(INPUT_STREAM_1).groupBy((k, v) -> k).count(IN_MEMORY_STORE).toStream().to(OUTPUT_STREAM_1);
+        topology1Builder2.stream(INPUT_STREAM_1).groupBy((k, v) -> k).count(IN_MEMORY_STORE).toStream().to(OUTPUT_STREAM_1);
+
+        streams.start(topology1Builder.build());
+        streams2.start(topology1Builder2.build());
+        waitForApplicationState(asList(streams, streams2), State.RUNNING, Duration.ofSeconds(30));
+        assertThat(waitUntilMinKeyValueRecordsReceived(consumerConfig, OUTPUT_STREAM_1, 3), equalTo(COUNT_OUTPUT_DATA));
+
+        topology2Builder.stream(NEW_STREAM).groupBy((k, v) -> k).count(IN_MEMORY_STORE).toStream().to(OUTPUT_STREAM_2);
+        topology2Builder2.stream(NEW_STREAM).groupBy((k, v) -> k).count(IN_MEMORY_STORE).toStream().to(OUTPUT_STREAM_2);
+
+        streams.addNamedTopology(topology2Builder.build());
+        streams2.addNamedTopology(topology2Builder2.build());
+
+        try {
+            CLUSTER.createTopic(NEW_STREAM, 2, 1);
+            produceToInputTopics(NEW_STREAM, STANDARD_INPUT_DATA);
+            assertThat(waitUntilMinKeyValueRecordsReceived(consumerConfig, OUTPUT_STREAM_2, 3), equalTo(COUNT_OUTPUT_DATA));
+        } finally {
+            CLUSTER.deleteTopicsAndWait(NEW_STREAM);
+        }
+    }
+
+    @Test
+    public void shouldWaitForMissingInputTopicsToBeCreatedWhileOtherTopologyContinuesProcessing() throws Exception {

Review comment:
       The content of this test does not fit with the name of the test. There is no other topology that continues processing. 




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscribe@kafka.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [kafka] ableegoldman commented on a change in pull request #11600: KAFKA-12648: handle MissingSourceTopicException for named topologies

Posted by GitBox <gi...@apache.org>.
ableegoldman commented on a change in pull request #11600:
URL: https://github.com/apache/kafka/pull/11600#discussion_r786445650



##########
File path: streams/src/main/java/org/apache/kafka/streams/processor/internals/RepartitionTopics.java
##########
@@ -85,24 +86,51 @@ public void setup() {
                 );
             }
         }
+
+        return missingExternalSourceTopicsPerTopology;
     }
 
     public Map<TopicPartition, PartitionInfo> topicPartitionsInfo() {
         return Collections.unmodifiableMap(topicPartitionInfos);
     }
 
-    private Map<String, InternalTopicConfig> computeRepartitionTopicConfig(final Map<Subtopology, TopicsInfo> topicGroups,
-                                                                           final Cluster clusterMetadata) {
-
-        final Map<String, InternalTopicConfig> repartitionTopicConfigs = new HashMap<>();
-        for (final TopicsInfo topicsInfo : topicGroups.values()) {
-            checkIfExternalSourceTopicsExist(topicsInfo, clusterMetadata);
-            repartitionTopicConfigs.putAll(topicsInfo.repartitionSourceTopics.values().stream()
-                .collect(Collectors.toMap(InternalTopicConfig::name, topicConfig -> topicConfig)));
+    /**
+     * @param topicGroups                            information about the topic groups (subtopologies) in this application
+     * @param clusterMetadata                        cluster metadata, eg which topics exist on the brokers
+     * @param topologiesWithMissingSourceTopics  set of missing user input topics, to be filled in by this method
+     */
+    private Map<String, InternalTopicConfig> computeRepartitionTopicConfig(final Map<String, Collection<TopicsInfo>> topicGroups,
+                                                                           final Cluster clusterMetadata,
+                                                                           final Map<String, Set<String>> topologiesWithMissingSourceTopics) {
+        final Set<TopicsInfo> allTopicsInfo = new HashSet<>();
+        final Map<String, InternalTopicConfig> allRepartitionTopicConfigs = new HashMap<>();
+        for (final Map.Entry<String, Collection<TopicsInfo>> topology : topicGroups.entrySet()) {
+            final String topologyName = topology.getKey();
+            final Set<String> missingSourceTopicsPerTopology = new HashSet<>();
+            final Map<String, InternalTopicConfig> repartitionTopicConfigsPerTopology = new HashMap<>();
+            for (final TopicsInfo topicsInfo : topology.getValue()) {
+                missingSourceTopicsPerTopology.addAll(computeMissingExternalSourceTopics(topicsInfo, clusterMetadata));
+                repartitionTopicConfigsPerTopology.putAll(
+                    topicsInfo.repartitionSourceTopics
+                        .values()
+                        .stream()
+                        .collect(Collectors.toMap(InternalTopicConfig::name, topicConfig -> topicConfig)));
+            }
+            if (missingSourceTopicsPerTopology.isEmpty()) {
+                allRepartitionTopicConfigs.putAll(repartitionTopicConfigsPerTopology);
+                allTopicsInfo.addAll(topology.getValue());
+            } else {
+                topologiesWithMissingSourceTopics.put(topologyName, missingSourceTopicsPerTopology);
+                log.error("Topology {} was missing source topics {} and will be excluded from the current assignment, "
+                              + "this can be due to the consumer client's metadata being stale or because they have "
+                              + "not been created yet. Please verify that you have created all input topics. When the "
+                              + "metadata is updated a new rebalance will be kicked off automatically and the topology "
+                              + "will retried at that time.", topologyName, missingSourceTopicsPerTopology);

Review comment:
       That's what I was trying to get across, but I can try to be more clear




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscribe@kafka.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [kafka] guozhangwang commented on pull request #11600: KAFKA-12648: handle MissingSourceTopicException for named topologies

Posted by GitBox <gi...@apache.org>.
guozhangwang commented on pull request #11600:
URL: https://github.com/apache/kafka/pull/11600#issuecomment-993971910


   @ableegoldman I checked the source code trying to reason what's the difference between 1) subscribe for the first time, and then poll to potentially rebalance, 2) re-subscribe, and then poll to potentially rebalance.
   
   For 1) we have the logic to enforce metadata being fresh until we call `ensureActiveGroupensureActiveGroup`, so that's cleared as long as all members `subscribe` with the same topics.
   
   For 2) the same logic is there, so the member kicking start the rebalance with the new join-group should always have the updated metadata, it's just that the leader who's actually triggering assign may not get the `subscribe` call yet --- since if the leader has triggered `subscribe`, it will set the `needPartialUpdate` flag in metadata and upon being requested to re-join, it would still wait until metadata to be refreshed.
   
   So that case in 2) would be similar to 1) where you have two members starting up, one that called `subscribe(A,B)` while the other one called `subscribe(A)` but the second one is picked as the leader. So in Streams' scenario, it's similar to two instances with different topologies but configured with the same `group id` --- in the past we treat it as fatal since instances cannot change their topology after starting up, but now they can, and we do not know if this is just temporary that soon the instances would change their topologies to match, or never --- so I think we should now treat it as transient. But the question is then, do we still need part1 of the PR if we do part2 to not treat it as final? If we just expose the `MissingSourceTopicException` to the handler and explain that "this may be transient, but we do not know" but now killing the thread, would that be sufficient?


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscribe@kafka.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [kafka] cadonna commented on pull request #11600: KAFKA-12648: handle MissingSourceTopicException for named topologies

Posted by GitBox <gi...@apache.org>.
cadonna commented on pull request #11600:
URL: https://github.com/apache/kafka/pull/11600#issuecomment-996549823


   @ableegoldman There are checkstyle errors in the builds.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscribe@kafka.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [kafka] ableegoldman commented on a change in pull request #11600: KAFKA-12648: handle MissingSourceTopicException for named topologies

Posted by GitBox <gi...@apache.org>.
ableegoldman commented on a change in pull request #11600:
URL: https://github.com/apache/kafka/pull/11600#discussion_r781814872



##########
File path: checkstyle/suppressions.xml
##########
@@ -158,7 +158,7 @@
               files="(KafkaStreams|KStreamImpl|KTableImpl|InternalTopologyBuilder|StreamsPartitionAssignor|StreamThread).java"/>
 
     <suppress checks="MethodLength"
-              files="KTableImpl.java"/>
+              files="(KTableImpl|StreamsPartitionAssignor).java"/>

Review comment:
       I would prefer to leave out any nontrivial refactoring such as breaking up the assignor's ridiculously long `assign` method, though I do agree that it needs to be broken up (again -- I actually did a pretty hefty refactoring to clean this method up a while back, I guess it's just gotten out of control again since then :/ )
   
   Filed a followup ticket to revisit this




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscribe@kafka.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [kafka] ableegoldman merged pull request #11600: KAFKA-12648: handle MissingSourceTopicException for named topologies

Posted by GitBox <gi...@apache.org>.
ableegoldman merged pull request #11600:
URL: https://github.com/apache/kafka/pull/11600


   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscribe@kafka.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [kafka] ableegoldman commented on a change in pull request #11600: KAFKA-12648: handle MissingSourceTopicException for named topologies

Posted by GitBox <gi...@apache.org>.
ableegoldman commented on a change in pull request #11600:
URL: https://github.com/apache/kafka/pull/11600#discussion_r786448672



##########
File path: streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamsPartitionAssignor.java
##########
@@ -488,23 +492,37 @@ private boolean checkMetadataVersions(final int minReceivedMetadataVersion,
     }
 
     /**
-     * Computes and assembles all repartition topic metadata then creates the topics if necessary.
+     * Computes and assembles all repartition topic metadata then creates the topics if necessary. Also verifies
+     * that all user input topics of each topology have been created ahead of time. If any such source topics are
+     * missing from a NamedTopology, the assignor will skip distributing its tasks until they have been created
+     * and invoke the exception handler (without killing the thread) once for each topology to alert the user of
+     * the missing topics.
+     * <p>
+     * For regular applications without named topologies, the assignor will instead send a shutdown signal to
+     * all clients so the user can identify and resolve the problem.
      *
-     * @return map from repartition topic to its partition info
+     * @return application metadata such as partition info of repartition topics, missing external topics, etc
      */
-    private Map<TopicPartition, PartitionInfo> prepareRepartitionTopics(final Cluster metadata) {
-
+    private RepartitionTopics prepareRepartitionTopics(final Cluster metadata) {
         final RepartitionTopics repartitionTopics = new RepartitionTopics(
             taskManager.topologyMetadata(),
             internalTopicManager,
             copartitionedTopicsEnforcer,
             metadata,
             logPrefix
         );
-        repartitionTopics.setup();
-        return repartitionTopics.topicPartitionsInfo();
+        final boolean isMissingInputTopics = !repartitionTopics.setup();
+        if (isMissingInputTopics) {
+            if (!taskManager.topologyMetadata().hasNamedTopologies()) {
+                throw new MissingSourceTopicException("Missing source topics.");
+            } else {
+
+            }

Review comment:
       I was working on debugging a test before pushing the changes to invoke the uncaught exception handler, I think I'm just going to split that out into a separate PR after all. I'll clean this up for this PR




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscribe@kafka.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [kafka] ableegoldman commented on pull request #11600: KAFKA-12648: handle MissingSourceTopicException for named topologies

Posted by GitBox <gi...@apache.org>.
ableegoldman commented on pull request #11600:
URL: https://github.com/apache/kafka/pull/11600#issuecomment-1015777307


   Test failures are unrelated, will address @cadonna 's remaining feedback in the followup PR https://github.com/apache/kafka/pull/11686 -- going to merge
   
   failures:
   - kafka.server.ReplicaManagerTest.[1] usesTopicIds=true
   - kafka.server.LogDirFailureTest.testProduceErrorFromFailureOnLogRoll()


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscribe@kafka.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [kafka] cadonna commented on a change in pull request #11600: KAFKA-12648: handle MissingSourceTopicException for named topologies

Posted by GitBox <gi...@apache.org>.
cadonna commented on a change in pull request #11600:
URL: https://github.com/apache/kafka/pull/11600#discussion_r768599214



##########
File path: checkstyle/suppressions.xml
##########
@@ -158,7 +158,7 @@
               files="(KafkaStreams|KStreamImpl|KTableImpl|InternalTopologyBuilder|StreamsPartitionAssignor|StreamThread).java"/>
 
     <suppress checks="MethodLength"
-              files="KTableImpl.java"/>
+              files="(KTableImpl|StreamsPartitionAssignor).java"/>

Review comment:
       Sorry for the standard question: Is this really necessary?

##########
File path: streams/src/main/java/org/apache/kafka/streams/processor/internals/RepartitionTopics.java
##########
@@ -85,24 +86,51 @@ public void setup() {
                 );
             }
         }
+
+        return missingExternalSourceTopicsPerTopology;
     }
 
     public Map<TopicPartition, PartitionInfo> topicPartitionsInfo() {
         return Collections.unmodifiableMap(topicPartitionInfos);
     }
 
-    private Map<String, InternalTopicConfig> computeRepartitionTopicConfig(final Map<Subtopology, TopicsInfo> topicGroups,
-                                                                           final Cluster clusterMetadata) {
-
-        final Map<String, InternalTopicConfig> repartitionTopicConfigs = new HashMap<>();
-        for (final TopicsInfo topicsInfo : topicGroups.values()) {
-            checkIfExternalSourceTopicsExist(topicsInfo, clusterMetadata);
-            repartitionTopicConfigs.putAll(topicsInfo.repartitionSourceTopics.values().stream()
-                .collect(Collectors.toMap(InternalTopicConfig::name, topicConfig -> topicConfig)));
+    /**
+     * @param topicGroups                            information about the topic groups (subtopologies) in this application
+     * @param clusterMetadata                        cluster metadata, eg which topics exist on the brokers
+     * @param topologiesWithMissingSourceTopics  set of missing user input topics, to be filled in by this method
+     */
+    private Map<String, InternalTopicConfig> computeRepartitionTopicConfig(final Map<String, Collection<TopicsInfo>> topicGroups,
+                                                                           final Cluster clusterMetadata,
+                                                                           final Map<String, Set<String>> topologiesWithMissingSourceTopics) {
+        final Set<TopicsInfo> allTopicsInfo = new HashSet<>();
+        final Map<String, InternalTopicConfig> allRepartitionTopicConfigs = new HashMap<>();
+        for (final Map.Entry<String, Collection<TopicsInfo>> topology : topicGroups.entrySet()) {
+            final String topologyName = topology.getKey();
+            final Set<String> missingSourceTopicsPerTopology = new HashSet<>();
+            final Map<String, InternalTopicConfig> repartitionTopicConfigsPerTopology = new HashMap<>();
+            for (final TopicsInfo topicsInfo : topology.getValue()) {
+                missingSourceTopicsPerTopology.addAll(computeMissingExternalSourceTopics(topicsInfo, clusterMetadata));
+                repartitionTopicConfigsPerTopology.putAll(
+                    topicsInfo.repartitionSourceTopics
+                        .values()
+                        .stream()
+                        .collect(Collectors.toMap(InternalTopicConfig::name, topicConfig -> topicConfig)));
+            }
+            if (missingSourceTopicsPerTopology.isEmpty()) {
+                allRepartitionTopicConfigs.putAll(repartitionTopicConfigsPerTopology);
+                allTopicsInfo.addAll(topology.getValue());
+            } else {
+                topologiesWithMissingSourceTopics.put(topologyName, missingSourceTopicsPerTopology);
+                log.error("Topology {} was missing source topics {} and will be excluded from the current assignment, "
+                              + "this can be due to the consumer client's metadata being stale or because they have "
+                              + "not been created yet. Please verify that you have created all input topics. When the "
+                              + "metadata is updated a new rebalance will be kicked off automatically and the topology "
+                              + "will retried at that time.", topologyName, missingSourceTopicsPerTopology);

Review comment:
       ```suggestion
                                 + "will be retried at that time.", topologyName, missingSourceTopicsPerTopology);
   ```

##########
File path: streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamsPartitionAssignor.java
##########
@@ -380,15 +380,37 @@ public GroupAssignment assign(final Cluster metadata, final GroupSubscription gr
             // parse the topology to determine the repartition source topics,
             // making sure they are created with the number of partitions as
             // the maximum of the depending sub-topologies source topics' number of partitions
-            final Map<TopicPartition, PartitionInfo> allRepartitionTopicPartitions = prepareRepartitionTopics(metadata);
+            final RepartitionTopics repartitionTopics = new RepartitionTopics(
+                taskManager.topologyMetadata(),
+                internalTopicManager,
+                copartitionedTopicsEnforcer,
+                metadata,
+                logPrefix
+            );
+
+            final Map<String, Set<String>> missingExternalSourceTopicsPerTopology = repartitionTopics.setup();
+            if (!missingExternalSourceTopicsPerTopology.isEmpty()) {
+                log.error("The following source topics are missing/unknown: {}. Please make sure all source topics " +
+                              "have been pre-created before starting the Streams application. ",
+                          taskManager.topologyMetadata().hasNamedTopologies() ?
+                              missingExternalSourceTopicsPerTopology :
+                              missingExternalSourceTopicsPerTopology.values()
+                );

Review comment:
       Why do you log again the same error with other words? Might be confusing during debugging and bloats the logs. 

##########
File path: streams/src/main/java/org/apache/kafka/streams/processor/internals/RepartitionTopics.java
##########
@@ -85,24 +86,51 @@ public void setup() {
                 );
             }
         }
+
+        return missingExternalSourceTopicsPerTopology;
     }
 
     public Map<TopicPartition, PartitionInfo> topicPartitionsInfo() {
         return Collections.unmodifiableMap(topicPartitionInfos);
     }
 
-    private Map<String, InternalTopicConfig> computeRepartitionTopicConfig(final Map<Subtopology, TopicsInfo> topicGroups,
-                                                                           final Cluster clusterMetadata) {
-
-        final Map<String, InternalTopicConfig> repartitionTopicConfigs = new HashMap<>();
-        for (final TopicsInfo topicsInfo : topicGroups.values()) {
-            checkIfExternalSourceTopicsExist(topicsInfo, clusterMetadata);
-            repartitionTopicConfigs.putAll(topicsInfo.repartitionSourceTopics.values().stream()
-                .collect(Collectors.toMap(InternalTopicConfig::name, topicConfig -> topicConfig)));
+    /**
+     * @param topicGroups                            information about the topic groups (subtopologies) in this application
+     * @param clusterMetadata                        cluster metadata, eg which topics exist on the brokers
+     * @param topologiesWithMissingSourceTopics  set of missing user input topics, to be filled in by this method
+     */
+    private Map<String, InternalTopicConfig> computeRepartitionTopicConfig(final Map<String, Collection<TopicsInfo>> topicGroups,
+                                                                           final Cluster clusterMetadata,
+                                                                           final Map<String, Set<String>> topologiesWithMissingSourceTopics) {
+        final Set<TopicsInfo> allTopicsInfo = new HashSet<>();
+        final Map<String, InternalTopicConfig> allRepartitionTopicConfigs = new HashMap<>();
+        for (final Map.Entry<String, Collection<TopicsInfo>> topology : topicGroups.entrySet()) {
+            final String topologyName = topology.getKey();
+            final Set<String> missingSourceTopicsPerTopology = new HashSet<>();
+            final Map<String, InternalTopicConfig> repartitionTopicConfigsPerTopology = new HashMap<>();
+            for (final TopicsInfo topicsInfo : topology.getValue()) {
+                missingSourceTopicsPerTopology.addAll(computeMissingExternalSourceTopics(topicsInfo, clusterMetadata));
+                repartitionTopicConfigsPerTopology.putAll(
+                    topicsInfo.repartitionSourceTopics
+                        .values()
+                        .stream()
+                        .collect(Collectors.toMap(InternalTopicConfig::name, topicConfig -> topicConfig)));
+            }
+            if (missingSourceTopicsPerTopology.isEmpty()) {
+                allRepartitionTopicConfigs.putAll(repartitionTopicConfigsPerTopology);
+                allTopicsInfo.addAll(topology.getValue());
+            } else {
+                topologiesWithMissingSourceTopics.put(topologyName, missingSourceTopicsPerTopology);
+                log.error("Topology {} was missing source topics {} and will be excluded from the current assignment, "
+                              + "this can be due to the consumer client's metadata being stale or because they have "
+                              + "not been created yet. Please verify that you have created all input topics. When the "
+                              + "metadata is updated a new rebalance will be kicked off automatically and the topology "
+                              + "will retried at that time.", topologyName, missingSourceTopicsPerTopology);

Review comment:
       Should we be clearer here that if users created the input topics and this error occurs because of client's metadata being stale, they actually do not need to do anything except wait, while if they did not create the input topics they should create them? At least that is how I understand it.

##########
File path: streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamThread.java
##########
@@ -1137,6 +1138,8 @@ private void completeShutdown(final boolean cleanRun) {
 
         log.info("Shutting down");
 
+        topologyMetadata.unregisterThread(threadMetadata.threadName());

Review comment:
       Why this move? Just curious...

##########
File path: streams/src/main/java/org/apache/kafka/streams/processor/internals/RepartitionTopics.java
##########
@@ -85,24 +86,51 @@ public void setup() {
                 );
             }
         }
+
+        return missingExternalSourceTopicsPerTopology;
     }
 
     public Map<TopicPartition, PartitionInfo> topicPartitionsInfo() {
         return Collections.unmodifiableMap(topicPartitionInfos);
     }
 
-    private Map<String, InternalTopicConfig> computeRepartitionTopicConfig(final Map<Subtopology, TopicsInfo> topicGroups,
-                                                                           final Cluster clusterMetadata) {
-
-        final Map<String, InternalTopicConfig> repartitionTopicConfigs = new HashMap<>();
-        for (final TopicsInfo topicsInfo : topicGroups.values()) {
-            checkIfExternalSourceTopicsExist(topicsInfo, clusterMetadata);
-            repartitionTopicConfigs.putAll(topicsInfo.repartitionSourceTopics.values().stream()
-                .collect(Collectors.toMap(InternalTopicConfig::name, topicConfig -> topicConfig)));
+    /**
+     * @param topicGroups                            information about the topic groups (subtopologies) in this application
+     * @param clusterMetadata                        cluster metadata, eg which topics exist on the brokers
+     * @param topologiesWithMissingSourceTopics  set of missing user input topics, to be filled in by this method
+     */
+    private Map<String, InternalTopicConfig> computeRepartitionTopicConfig(final Map<String, Collection<TopicsInfo>> topicGroups,
+                                                                           final Cluster clusterMetadata,
+                                                                           final Map<String, Set<String>> topologiesWithMissingSourceTopics) {

Review comment:
       nit: Could you rename `topologiesWithMissingSourceTopics` to `missingExternalSourceTopicsPerTopology` or the other way around. It makes reading the code a bit easier. 

##########
File path: streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamsPartitionAssignor.java
##########
@@ -380,15 +380,37 @@ public GroupAssignment assign(final Cluster metadata, final GroupSubscription gr
             // parse the topology to determine the repartition source topics,
             // making sure they are created with the number of partitions as
             // the maximum of the depending sub-topologies source topics' number of partitions
-            final Map<TopicPartition, PartitionInfo> allRepartitionTopicPartitions = prepareRepartitionTopics(metadata);
+            final RepartitionTopics repartitionTopics = new RepartitionTopics(
+                taskManager.topologyMetadata(),
+                internalTopicManager,
+                copartitionedTopicsEnforcer,
+                metadata,
+                logPrefix
+            );
+
+            final Map<String, Set<String>> missingExternalSourceTopicsPerTopology = repartitionTopics.setup();
+            if (!missingExternalSourceTopicsPerTopology.isEmpty()) {
+                log.error("The following source topics are missing/unknown: {}. Please make sure all source topics " +
+                              "have been pre-created before starting the Streams application. ",
+                          taskManager.topologyMetadata().hasNamedTopologies() ?
+                              missingExternalSourceTopicsPerTopology :
+                              missingExternalSourceTopicsPerTopology.values()
+                );
+                if (!taskManager.topologyMetadata().hasNamedTopologies()) {
+                    throw new MissingSourceTopicException("Missing source topics.");
+                }
+            }
+
+            final Map<TopicPartition, PartitionInfo> allRepartitionTopicPartitions = repartitionTopics.topicPartitionsInfo();

Review comment:
       Could you put this block again into a method and remove the checkstyle suppression?

##########
File path: streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamsPartitionAssignor.java
##########
@@ -380,15 +380,37 @@ public GroupAssignment assign(final Cluster metadata, final GroupSubscription gr
             // parse the topology to determine the repartition source topics,
             // making sure they are created with the number of partitions as
             // the maximum of the depending sub-topologies source topics' number of partitions
-            final Map<TopicPartition, PartitionInfo> allRepartitionTopicPartitions = prepareRepartitionTopics(metadata);
+            final RepartitionTopics repartitionTopics = new RepartitionTopics(
+                taskManager.topologyMetadata(),
+                internalTopicManager,
+                copartitionedTopicsEnforcer,
+                metadata,
+                logPrefix
+            );
+
+            final Map<String, Set<String>> missingExternalSourceTopicsPerTopology = repartitionTopics.setup();
+            if (!missingExternalSourceTopicsPerTopology.isEmpty()) {
+                log.error("The following source topics are missing/unknown: {}. Please make sure all source topics " +
+                              "have been pre-created before starting the Streams application. ",
+                          taskManager.topologyMetadata().hasNamedTopologies() ?
+                              missingExternalSourceTopicsPerTopology :
+                              missingExternalSourceTopicsPerTopology.values()
+                );
+                if (!taskManager.topologyMetadata().hasNamedTopologies()) {
+                    throw new MissingSourceTopicException("Missing source topics.");
+                }

Review comment:
       What happens if all named topologies have missing source topics? Do you account for that somewhere else?




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscribe@kafka.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [kafka] ableegoldman commented on a change in pull request #11600: KAFKA-12648: handle MissingSourceTopicException for named topologies

Posted by GitBox <gi...@apache.org>.
ableegoldman commented on a change in pull request #11600:
URL: https://github.com/apache/kafka/pull/11600#discussion_r781827295



##########
File path: streams/src/main/java/org/apache/kafka/streams/processor/internals/TopologyMetadata.java
##########
@@ -436,9 +436,25 @@ public String getStoreForChangelogTopic(final String topicName) {
         return sourceTopics;
     }
 
-    public Map<Subtopology, TopicsInfo> topicGroups() {
+    /**
+     * @param topologiesToExclude the names of any topologies to exclude from the returned topic groups,
+     *                            eg because they have missing source topics and can't be processed yet
+     */
+    public Map<Subtopology, TopicsInfo> topicGroups(final Set<String> topologiesToExclude) {

Review comment:
       Filed https://issues.apache.org/jira/browse/KAFKA-13590




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscribe@kafka.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [kafka] ableegoldman commented on a change in pull request #11600: KAFKA-12648: handle MissingSourceTopicException for named topologies

Posted by GitBox <gi...@apache.org>.
ableegoldman commented on a change in pull request #11600:
URL: https://github.com/apache/kafka/pull/11600#discussion_r781814872



##########
File path: checkstyle/suppressions.xml
##########
@@ -158,7 +158,7 @@
               files="(KafkaStreams|KStreamImpl|KTableImpl|InternalTopologyBuilder|StreamsPartitionAssignor|StreamThread).java"/>
 
     <suppress checks="MethodLength"
-              files="KTableImpl.java"/>
+              files="(KTableImpl|StreamsPartitionAssignor).java"/>

Review comment:
       I would prefer to leave out any nontrivial refactoring such as breaking up the assignor's ridiculously long `assign` method, though I do agree that it needs to be broken up (again -- I actually did a pretty hefty refactoring to clean this method up a while back, I guess it's just gotten out of control again since then :/ )
   
   edit: nevermind, I will clean it up in this PR and remove the suppression. Lol it's hard to argue against something you and Guozhang left like 5 comments to request 




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscribe@kafka.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [kafka] ableegoldman commented on a change in pull request #11600: KAFKA-12648: handle MissingSourceTopicException for named topologies

Posted by GitBox <gi...@apache.org>.
ableegoldman commented on a change in pull request #11600:
URL: https://github.com/apache/kafka/pull/11600#discussion_r781818108



##########
File path: streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamsPartitionAssignor.java
##########
@@ -380,15 +380,37 @@ public GroupAssignment assign(final Cluster metadata, final GroupSubscription gr
             // parse the topology to determine the repartition source topics,
             // making sure they are created with the number of partitions as
             // the maximum of the depending sub-topologies source topics' number of partitions
-            final Map<TopicPartition, PartitionInfo> allRepartitionTopicPartitions = prepareRepartitionTopics(metadata);
+            final RepartitionTopics repartitionTopics = new RepartitionTopics(
+                taskManager.topologyMetadata(),
+                internalTopicManager,
+                copartitionedTopicsEnforcer,
+                metadata,
+                logPrefix
+            );
+
+            final Map<String, Set<String>> missingExternalSourceTopicsPerTopology = repartitionTopics.setup();
+            if (!missingExternalSourceTopicsPerTopology.isEmpty()) {
+                log.error("The following source topics are missing/unknown: {}. Please make sure all source topics " +
+                              "have been pre-created before starting the Streams application. ",
+                          taskManager.topologyMetadata().hasNamedTopologies() ?
+                              missingExternalSourceTopicsPerTopology :
+                              missingExternalSourceTopicsPerTopology.values()
+                );
+                if (!taskManager.topologyMetadata().hasNamedTopologies()) {
+                    throw new MissingSourceTopicException("Missing source topics.");
+                }

Review comment:
       Yep (what Guozhang said)




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscribe@kafka.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org