You are viewing a plain text version of this content. The canonical link for it is here.
Posted to jira@kafka.apache.org by GitBox <gi...@apache.org> on 2021/12/14 06:38:10 UTC

[GitHub] [kafka] ableegoldman opened a new pull request #11601: KAFKA-12648: Minor fixes for input topic management

ableegoldman opened a new pull request #11601:
URL: https://github.com/apache/kafka/pull/11601


   While working on [#11600](https://github.com/apache/kafka/pull/11600) I noticed a few issues with how we manage topics in the TopologyMetadata, particularly surrounding the code to update and track input topics. This PR cleans that up and adds some further verification when processing possible updates from the subscription metadata or assignment


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscribe@kafka.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [kafka] ableegoldman commented on a change in pull request #11601: KAFKA-12648: Minor fixes for input topic management

Posted by GitBox <gi...@apache.org>.
ableegoldman commented on a change in pull request #11601:
URL: https://github.com/apache/kafka/pull/11601#discussion_r769107915



##########
File path: streams/src/main/java/org/apache/kafka/streams/processor/internals/InternalTopologyBuilder.java
##########
@@ -2067,20 +2073,24 @@ private boolean hasSubscriptionUpdates() {
         return !subscriptionUpdates.isEmpty();
     }
 
-    synchronized void addSubscribedTopicsFromAssignment(final List<TopicPartition> partitions, final String logPrefix) {
+    /**
+     * @return the set of topics that were newly
+     */
+    synchronized Set<String> addSubscribedTopicsFromAssignment(final Set<String> topics, final String logPrefix) {
         if (usesPatternSubscription()) {

Review comment:
       We don't want/need to add topics from a named topology (I mean "modular topology" 😉 ) to the subscription if it hasn't been added to the client yet, since it won't be able to process or even correctly direct any records it consumes from those topics until it knows about the actual topology. It doesn't necessarily hurt, the thread would just end up prematurely consuming records for that topic causing them to "waste" memory until the topology is added.




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscribe@kafka.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [kafka] ableegoldman commented on a change in pull request #11601: KAFKA-12648: Minor fixes for input topic management

Posted by GitBox <gi...@apache.org>.
ableegoldman commented on a change in pull request #11601:
URL: https://github.com/apache/kafka/pull/11601#discussion_r769108804



##########
File path: streams/src/main/java/org/apache/kafka/streams/processor/internals/InternalTopologyBuilder.java
##########
@@ -2067,20 +2073,24 @@ private boolean hasSubscriptionUpdates() {
         return !subscriptionUpdates.isEmpty();
     }
 
-    synchronized void addSubscribedTopicsFromAssignment(final List<TopicPartition> partitions, final String logPrefix) {
+    /**
+     * @return the set of topics that were newly
+     */
+    synchronized Set<String> addSubscribedTopicsFromAssignment(final Set<String> topics, final String logPrefix) {
         if (usesPatternSubscription()) {

Review comment:
       But anyways, the main fix in this PR is to make sure we only update each InternalTopologyBuilder with the topics that particular topology is subscribed to, so we shouldn't be invoking this method on topics that don't yet have a corresponding modular topology to begin with




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscribe@kafka.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [kafka] ableegoldman commented on a change in pull request #11601: KAFKA-12648: Minor fixes for input topic management

Posted by GitBox <gi...@apache.org>.
ableegoldman commented on a change in pull request #11601:
URL: https://github.com/apache/kafka/pull/11601#discussion_r769104352



##########
File path: streams/src/main/java/org/apache/kafka/streams/processor/internals/TopologyMetadata.java
##########
@@ -447,11 +453,106 @@ public String getStoreForChangelogTopic(final String topicName) {
     }
 
     void addSubscribedTopicsFromMetadata(final Set<String> topics, final String logPrefix) {
-        applyToEachBuilder(b -> b.addSubscribedTopicsFromMetadata(topics, logPrefix));
+        if (usesPatternSubscription()) {
+            final Map<String, Set<String>> newTopicsByTopology = new HashMap<>();
+            final Set<String> duplicateInputTopics = new HashSet<>();
+            final Set<String> subscriptionTopicsWithUnknownSource = new HashSet<>();
+            for (final String topic : topics) {
+                final Set<String> subscribingTopologies = new HashSet<>();
+                applyToEachBuilder(b -> {
+                    if (b.sourceTopicCollection().contains(topic) || b.matchesSubscribedPattern(topic)) {
+                        subscribingTopologies.add(getTopologyNameOrElseUnnamed(b.topologyName()));
+                    }
+                });
+                if (subscribingTopologies.size() > 1) {
+                    log.error("{}Subscribed topic {} matches more than one topology: {}",
+                        logPrefix, topic, subscribingTopologies);
+                    duplicateInputTopics.add(topic);
+                } else if (subscribingTopologies.isEmpty()) {
+                    log.error("{}Topic {} is subscribed to by the consumer, but no topology can be identified " +
+                        "that is subscribing to that topic or to a matching Pattern", logPrefix, topic);
+                    subscriptionTopicsWithUnknownSource.add(topic);
+                }
+                for (final String topology : subscribingTopologies) {
+                    if (!allInputTopics.contains(topic)) {
+                        newTopicsByTopology
+                            .computeIfAbsent(topology, t -> new HashSet<>())
+                            .add(topic);
+                    }
+                }
+            }
+
+            if (!duplicateInputTopics.isEmpty()) {
+                throw new IllegalStateException("The following topics are subscribed to " +
+                    "by multiple topologies: " + duplicateInputTopics);
+            } else if (subscriptionTopicsWithUnknownSource.isEmpty()) {
+                throw new IllegalStateException("The following topics appear in the subscription " +

Review comment:
       Yep, just haven't had time to do tests for this yet -- clearly they're needed 🙂 




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscribe@kafka.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [kafka] wcarlson5 commented on a change in pull request #11601: KAFKA-12648: Minor fixes for input topic management

Posted by GitBox <gi...@apache.org>.
wcarlson5 commented on a change in pull request #11601:
URL: https://github.com/apache/kafka/pull/11601#discussion_r768916262



##########
File path: streams/src/main/java/org/apache/kafka/streams/processor/internals/InternalTopologyBuilder.java
##########
@@ -2067,20 +2073,24 @@ private boolean hasSubscriptionUpdates() {
         return !subscriptionUpdates.isEmpty();
     }
 
-    synchronized void addSubscribedTopicsFromAssignment(final List<TopicPartition> partitions, final String logPrefix) {
+    /**
+     * @return the set of topics that were newly

Review comment:
       that were newly what?

##########
File path: streams/src/main/java/org/apache/kafka/streams/processor/internals/InternalTopologyBuilder.java
##########
@@ -1348,14 +1346,7 @@ private String decorateTopic(final String topic) {
     }
 
     void initializeSubscription() {

Review comment:
       Do we need this empty method really?

##########
File path: streams/src/main/java/org/apache/kafka/streams/processor/internals/InternalTopologyBuilder.java
##########
@@ -1383,14 +1374,29 @@ boolean usesPatternSubscription() {
         return !nodeToSourcePatterns.isEmpty();
     }
 
-    synchronized Collection<String> sourceTopicCollection() {
+    boolean matchesSubscribedPattern(final String topic) {
+        return nodeToSourcePatterns.entrySet().stream().anyMatch(p -> p.getValue().pattern().matches(topic));
+    }
+
+    /**
+     * NOTE: this should not be invoked until the topology has been built via
+     * {@link #buildTopology()}, otherwise the collection will be uninitialized
+     *
+     * @return all user and internal source topics in this topology,
+     *         or {@code null} if uninitialized because {@link #buildTopology()} hs not been called yet
+     */
+    public synchronized List<String> sourceTopicCollection() {

Review comment:
       The method says Collection but now returns a list...

##########
File path: streams/src/main/java/org/apache/kafka/streams/processor/internals/InternalTopologyBuilder.java
##########
@@ -2067,20 +2073,24 @@ private boolean hasSubscriptionUpdates() {
         return !subscriptionUpdates.isEmpty();
     }
 
-    synchronized void addSubscribedTopicsFromAssignment(final List<TopicPartition> partitions, final String logPrefix) {
+    /**
+     * @return the set of topics that were newly
+     */
+    synchronized Set<String> addSubscribedTopicsFromAssignment(final Set<String> topics, final String logPrefix) {
         if (usesPatternSubscription()) {

Review comment:
       I am a little concerned with this. Are pattern subscriptions the only way to get new topics? what about a namedtopolgy that hasn't been added to this client yet? will this just mean the client will ignore those tasks? 

##########
File path: streams/src/main/java/org/apache/kafka/streams/processor/internals/TopologyMetadata.java
##########
@@ -447,11 +453,106 @@ public String getStoreForChangelogTopic(final String topicName) {
     }
 
     void addSubscribedTopicsFromMetadata(final Set<String> topics, final String logPrefix) {
-        applyToEachBuilder(b -> b.addSubscribedTopicsFromMetadata(topics, logPrefix));
+        if (usesPatternSubscription()) {
+            final Map<String, Set<String>> newTopicsByTopology = new HashMap<>();
+            final Set<String> duplicateInputTopics = new HashSet<>();
+            final Set<String> subscriptionTopicsWithUnknownSource = new HashSet<>();
+            for (final String topic : topics) {
+                final Set<String> subscribingTopologies = new HashSet<>();
+                applyToEachBuilder(b -> {
+                    if (b.sourceTopicCollection().contains(topic) || b.matchesSubscribedPattern(topic)) {
+                        subscribingTopologies.add(getTopologyNameOrElseUnnamed(b.topologyName()));
+                    }
+                });
+                if (subscribingTopologies.size() > 1) {
+                    log.error("{}Subscribed topic {} matches more than one topology: {}",
+                        logPrefix, topic, subscribingTopologies);
+                    duplicateInputTopics.add(topic);
+                } else if (subscribingTopologies.isEmpty()) {
+                    log.error("{}Topic {} is subscribed to by the consumer, but no topology can be identified " +
+                        "that is subscribing to that topic or to a matching Pattern", logPrefix, topic);
+                    subscriptionTopicsWithUnknownSource.add(topic);
+                }
+                for (final String topology : subscribingTopologies) {
+                    if (!allInputTopics.contains(topic)) {
+                        newTopicsByTopology
+                            .computeIfAbsent(topology, t -> new HashSet<>())
+                            .add(topic);
+                    }
+                }
+            }
+
+            if (!duplicateInputTopics.isEmpty()) {
+                throw new IllegalStateException("The following topics are subscribed to " +
+                    "by multiple topologies: " + duplicateInputTopics);
+            } else if (subscriptionTopicsWithUnknownSource.isEmpty()) {
+                throw new IllegalStateException("The following topics appear in the subscription " +

Review comment:
       You might already be on it, but can we add tests for both these cases?

##########
File path: streams/src/main/java/org/apache/kafka/streams/processor/internals/TopologyMetadata.java
##########
@@ -447,11 +453,106 @@ public String getStoreForChangelogTopic(final String topicName) {
     }
 
     void addSubscribedTopicsFromMetadata(final Set<String> topics, final String logPrefix) {
-        applyToEachBuilder(b -> b.addSubscribedTopicsFromMetadata(topics, logPrefix));
+        if (usesPatternSubscription()) {
+            final Map<String, Set<String>> newTopicsByTopology = new HashMap<>();
+            final Set<String> duplicateInputTopics = new HashSet<>();
+            final Set<String> subscriptionTopicsWithUnknownSource = new HashSet<>();
+            for (final String topic : topics) {
+                final Set<String> subscribingTopologies = new HashSet<>();
+                applyToEachBuilder(b -> {
+                    if (b.sourceTopicCollection().contains(topic) || b.matchesSubscribedPattern(topic)) {
+                        subscribingTopologies.add(getTopologyNameOrElseUnnamed(b.topologyName()));
+                    }
+                });
+                if (subscribingTopologies.size() > 1) {
+                    log.error("{}Subscribed topic {} matches more than one topology: {}",
+                        logPrefix, topic, subscribingTopologies);
+                    duplicateInputTopics.add(topic);
+                } else if (subscribingTopologies.isEmpty()) {
+                    log.error("{}Topic {} is subscribed to by the consumer, but no topology can be identified " +
+                        "that is subscribing to that topic or to a matching Pattern", logPrefix, topic);
+                    subscriptionTopicsWithUnknownSource.add(topic);
+                }
+                for (final String topology : subscribingTopologies) {
+                    if (!allInputTopics.contains(topic)) {
+                        newTopicsByTopology
+                            .computeIfAbsent(topology, t -> new HashSet<>())
+                            .add(topic);
+                    }
+                }
+            }
+
+            if (!duplicateInputTopics.isEmpty()) {
+                throw new IllegalStateException("The following topics are subscribed to " +
+                    "by multiple topologies: " + duplicateInputTopics);
+            } else if (subscriptionTopicsWithUnknownSource.isEmpty()) {

Review comment:
       if not empty?

##########
File path: streams/src/main/java/org/apache/kafka/streams/processor/internals/TopologyMetadata.java
##########
@@ -447,11 +453,106 @@ public String getStoreForChangelogTopic(final String topicName) {
     }
 
     void addSubscribedTopicsFromMetadata(final Set<String> topics, final String logPrefix) {
-        applyToEachBuilder(b -> b.addSubscribedTopicsFromMetadata(topics, logPrefix));
+        if (usesPatternSubscription()) {
+            final Map<String, Set<String>> newTopicsByTopology = new HashMap<>();
+            final Set<String> duplicateInputTopics = new HashSet<>();
+            final Set<String> subscriptionTopicsWithUnknownSource = new HashSet<>();
+            for (final String topic : topics) {
+                final Set<String> subscribingTopologies = new HashSet<>();
+                applyToEachBuilder(b -> {
+                    if (b.sourceTopicCollection().contains(topic) || b.matchesSubscribedPattern(topic)) {
+                        subscribingTopologies.add(getTopologyNameOrElseUnnamed(b.topologyName()));
+                    }
+                });
+                if (subscribingTopologies.size() > 1) {
+                    log.error("{}Subscribed topic {} matches more than one topology: {}",
+                        logPrefix, topic, subscribingTopologies);
+                    duplicateInputTopics.add(topic);
+                } else if (subscribingTopologies.isEmpty()) {
+                    log.error("{}Topic {} is subscribed to by the consumer, but no topology can be identified " +
+                        "that is subscribing to that topic or to a matching Pattern", logPrefix, topic);
+                    subscriptionTopicsWithUnknownSource.add(topic);
+                }
+                for (final String topology : subscribingTopologies) {
+                    if (!allInputTopics.contains(topic)) {
+                        newTopicsByTopology
+                            .computeIfAbsent(topology, t -> new HashSet<>())
+                            .add(topic);
+                    }
+                }
+            }
+
+            if (!duplicateInputTopics.isEmpty()) {
+                throw new IllegalStateException("The following topics are subscribed to " +
+                    "by multiple topologies: " + duplicateInputTopics);
+            } else if (subscriptionTopicsWithUnknownSource.isEmpty()) {
+                throw new IllegalStateException("The following topics appear in the subscription " +
+                    "but can't be located in the topology: " + subscriptionTopicsWithUnknownSource);
+            }
+
+            updateAndVerifyNewInputTopics(newTopicsByTopology);
+            for (final Map.Entry<String, Set<String>> topology : newTopicsByTopology.entrySet()) {
+                lookupBuilderForNamedTopology(topology.getKey())
+                    .addSubscribedTopicsFromMetadata(topology.getValue(), logPrefix);
+            }
+        } else {
+            if (!topics.equals(sourceTopicCollection())) {
+                log.error("{}Consumer's subscription does not match the known source topics.\n" +
+                        "consumer subscription: {}\n" +
+                        "topics in topology metadata: {}",
+                    logPrefix, topics, sourceTopicCollection());
+                throw new IllegalStateException("Consumer subscribed topics and topology's known topics do not match.");
+            }
+        }
     }
 
-    void addSubscribedTopicsFromAssignment(final List<TopicPartition> partitions, final String logPrefix) {
-        applyToEachBuilder(b -> b.addSubscribedTopicsFromAssignment(partitions, logPrefix));
+    void addSubscribedTopicsFromAssignment(final Map<TaskId, Set<TopicPartition>> tasks, final String logPrefix) {
+        final Map<String, Set<String>> assignedTopicsByTopology = new HashMap<>();
+
+        for (final Map.Entry<TaskId, Set<TopicPartition>> task : tasks.entrySet()) {
+            final String topologyName = getTopologyNameOrElseUnnamed(task.getKey().topologyName());
+            assignedTopicsByTopology
+                .computeIfAbsent(topologyName, t -> new HashSet<>())
+                .addAll(task.getValue().stream().map(TopicPartition::topic).collect(Collectors.toSet()));
+        }
+        for (final Map.Entry<String, Set<String>> assignedTopics : assignedTopicsByTopology.entrySet()) {
+            final Set<String> newTopics =
+                lookupBuilderForNamedTopology(assignedTopics.getKey())
+                    .addSubscribedTopicsFromAssignment(assignedTopics.getValue(), logPrefix);
+            assignedTopics.getValue().retainAll(newTopics);
+        }
+        updateAndVerifyNewInputTopics(assignedTopicsByTopology);
+    }
+
+    private String getTopologyNameOrElseUnnamed(final String topologyName) {
+        return topologyName == null ? UNNAMED_TOPOLOGY : topologyName;
+    }
+
+    private void updateAndVerifyNewInputTopics(final Map<String, Set<String>> newTopicsByTopology) {
+        final Set<String> duplicateInputTopics = new HashSet<>();
+
+        for (final Map.Entry<String, Set<String>> newTopics : newTopicsByTopology.entrySet()) {
+            final String topologyName = newTopics.getKey();
+            for (final String newTopic : newTopics.getValue()) {
+                if (allInputTopics.contains(newTopic)) {
+                    duplicateInputTopics.add(newTopic);
+                    final String namedTopologyErrorMessagePrefix = UNNAMED_TOPOLOGY.equals(topologyName) ?
+                        "" :
+                        "Topology {}: ";
+                    log.error("{}Cannot add topic {} to the subscription as the application is " +
+                        "already consuming from this topic elsewhere in the topology",
+                        namedTopologyErrorMessagePrefix, newTopic);
+                } else {
+                    allInputTopics.add(newTopic);
+                }
+            }
+        }
+        if (!duplicateInputTopics.isEmpty()) {

Review comment:
       This is unrelated to this PR but the check brings up a good point. Maybe instead of throwing an error on the pattern subscription finding an overlapping topic and then restarting the thread and trying again. Maybe we can pause a topology until the topics are not longer overlapping?
   
   EDIT: I can see this in the other PR




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscribe@kafka.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [kafka] guozhangwang commented on a change in pull request #11601: KAFKA-12648: Minor fixes for input topic management

Posted by GitBox <gi...@apache.org>.
guozhangwang commented on a change in pull request #11601:
URL: https://github.com/apache/kafka/pull/11601#discussion_r769136716



##########
File path: streams/src/main/java/org/apache/kafka/streams/processor/internals/InternalTopologyBuilder.java
##########
@@ -1383,14 +1370,25 @@ boolean usesPatternSubscription() {
         return !nodeToSourcePatterns.isEmpty();
     }
 
-    synchronized Collection<String> sourceTopicCollection() {
+    boolean matchesSubscribedPattern(final String topic) {
+        return nodeToSourcePatterns.entrySet().stream().anyMatch(p -> p.getValue().pattern().matches(topic));
+    }
+
+    /**
+     * @return full names of all user and internal source topics in this topology, including prefix for internal topics
+     */
+    public synchronized List<String> sourceTopicCollection() {
+        if (sourceTopicCollection == null) {
+            log.debug("No source topics using pattern subscription found, initializing consumer's subscription collection.");
+            sourceTopicCollection = maybeDecorateInternalSourceTopics(sourceTopicNames);
+            Collections.sort(sourceTopicCollection);
+        }
         return sourceTopicCollection;
     }
 
     synchronized String sourceTopicsPatternString() {
-        // With a NamedTopology, it may be that this topology does not use pattern subscription but another one does
-        // in which case we would need to initialize the pattern string where we would otherwise have not
-        if (sourceTopicPatternString == null && hasNamedTopology()) {

Review comment:
       Why we do not need the second condition now?

##########
File path: streams/src/main/java/org/apache/kafka/streams/processor/internals/TopologyMetadata.java
##########
@@ -465,13 +570,13 @@ private InternalTopologyBuilder lookupBuilderForTask(final TaskId task) {
     }
 
     /**
-     * @return the InternalTopologyBuilder for a NamedTopology, or null if no such NamedTopology exists
+     * @return the InternalTopologyBuilder for a Topology, or null if no such Topology exists

Review comment:
       Shall we change the function name as well?

##########
File path: streams/src/main/java/org/apache/kafka/streams/processor/internals/TopologyMetadata.java
##########
@@ -447,11 +453,106 @@ public String getStoreForChangelogTopic(final String topicName) {
     }
 
     void addSubscribedTopicsFromMetadata(final Set<String> topics, final String logPrefix) {
-        applyToEachBuilder(b -> b.addSubscribedTopicsFromMetadata(topics, logPrefix));
+        if (usesPatternSubscription()) {
+            final Map<String, Set<String>> newTopicsByTopology = new HashMap<>();
+            final Set<String> duplicateInputTopics = new HashSet<>();
+            final Set<String> subscriptionTopicsWithUnknownSource = new HashSet<>();
+            for (final String topic : topics) {
+                final Set<String> subscribingTopologies = new HashSet<>();
+                applyToEachBuilder(b -> {
+                    if (b.sourceTopicCollection().contains(topic) || b.matchesSubscribedPattern(topic)) {
+                        subscribingTopologies.add(getTopologyNameOrElseUnnamed(b.topologyName()));
+                    }
+                });
+                if (subscribingTopologies.size() > 1) {
+                    log.error("{}Subscribed topic {} matches more than one topology: {}",
+                        logPrefix, topic, subscribingTopologies);
+                    duplicateInputTopics.add(topic);
+                } else if (subscribingTopologies.isEmpty()) {
+                    log.error("{}Topic {} is subscribed to by the consumer, but no topology can be identified " +
+                        "that is subscribing to that topic or to a matching Pattern", logPrefix, topic);
+                    subscriptionTopicsWithUnknownSource.add(topic);
+                }
+                for (final String topology : subscribingTopologies) {
+                    if (!allInputTopics.contains(topic)) {
+                        newTopicsByTopology
+                            .computeIfAbsent(topology, t -> new HashSet<>())
+                            .add(topic);
+                    }
+                }
+            }
+
+            if (!duplicateInputTopics.isEmpty()) {
+                throw new IllegalStateException("The following topics are subscribed to " +
+                    "by multiple topologies: " + duplicateInputTopics);
+            } else if (subscriptionTopicsWithUnknownSource.isEmpty()) {
+                throw new IllegalStateException("The following topics appear in the subscription " +

Review comment:
       Should we also check that the `newTopicsByTopology.keySet` are all new topologies, i.e. they do not have existing topics?

##########
File path: streams/src/main/java/org/apache/kafka/streams/processor/internals/TopologyMetadata.java
##########
@@ -350,8 +351,13 @@ public OffsetResetStrategy offsetResetStrategy(final String topic) {
         return null;
     }
 
-    public Collection<String> sourceTopicCollection() {
-        final List<String> sourceTopics = new ArrayList<>();
+    /**
+     * NOTE: this should not be invoked until all topologies have been built via
+     * {@link #buildAndVerifyTopology(InternalTopologyBuilder)}, otherwise the
+     * {@link InternalTopologyBuilder#sourceTopicCollection()}

Review comment:
       nit: The `otherwise` sentence seems not completed?

##########
File path: streams/src/main/java/org/apache/kafka/streams/processor/internals/TopologyMetadata.java
##########
@@ -447,11 +457,106 @@ public String getStoreForChangelogTopic(final String topicName) {
     }
 
     void addSubscribedTopicsFromMetadata(final Set<String> topics, final String logPrefix) {
-        applyToEachBuilder(b -> b.addSubscribedTopicsFromMetadata(topics, logPrefix));
+        if (usesPatternSubscription()) {
+            final Map<String, Set<String>> newTopicsByTopology = new HashMap<>();
+            final Set<String> duplicateInputTopics = new HashSet<>();
+            final Set<String> subscriptionTopicsWithUnknownSource = new HashSet<>();
+            for (final String topic : topics) {
+                final Set<String> subscribingTopologies = new HashSet<>();
+                applyToEachBuilder(b -> {
+                    if (b.sourceTopicCollection().contains(topic) || b.matchesSubscribedPattern(topic)) {
+                        subscribingTopologies.add(getTopologyNameOrElseUnnamed(b.topologyName()));
+                    }
+                });
+                if (subscribingTopologies.size() > 1) {
+                    log.error("{}Subscribed topic {} matches more than one topology: {}",
+                        logPrefix, topic, subscribingTopologies);
+                    duplicateInputTopics.add(topic);
+                } else if (subscribingTopologies.isEmpty()) {
+                    log.error("{}Topic {} is subscribed to by the consumer, but no topology can be identified " +
+                        "that is subscribing to that topic or to a matching Pattern", logPrefix, topic);
+                    subscriptionTopicsWithUnknownSource.add(topic);
+                }
+                for (final String topology : subscribingTopologies) {
+                    if (!allInputTopics.contains(topic)) {
+                        newTopicsByTopology
+                            .computeIfAbsent(topology, t -> new HashSet<>())
+                            .add(topic);
+                    }
+                }
+            }
+
+            if (!duplicateInputTopics.isEmpty()) {
+                throw new IllegalStateException("The following topics are subscribed to " +
+                    "by multiple topologies: " + duplicateInputTopics);
+            } else if (!subscriptionTopicsWithUnknownSource.isEmpty()) {
+                throw new IllegalStateException("The following topics appear in the subscription " +
+                    "but can't be located in the topology: " + subscriptionTopicsWithUnknownSource);
+            }
+
+            updateAndVerifyNewInputTopics(newTopicsByTopology);
+            for (final Map.Entry<String, Set<String>> topology : newTopicsByTopology.entrySet()) {
+                lookupBuilderForNamedTopology(topology.getKey())
+                    .addSubscribedTopicsFromMetadata(topology.getValue(), logPrefix);
+            }
+        } else {
+            if (!topics.equals(sourceTopicCollection())) {
+                log.error("{}Consumer's subscription does not match the known source topics.\n" +
+                        "consumer subscription: {}\n" +
+                        "topics in topology metadata: {}",
+                    logPrefix, topics, sourceTopicCollection());
+                throw new IllegalStateException("Consumer subscribed topics and topology's known topics do not match.");
+            }
+        }
     }
 
-    void addSubscribedTopicsFromAssignment(final List<TopicPartition> partitions, final String logPrefix) {
-        applyToEachBuilder(b -> b.addSubscribedTopicsFromAssignment(partitions, logPrefix));
+    void addSubscribedTopicsFromAssignment(final Map<TaskId, Set<TopicPartition>> tasks, final String logPrefix) {
+        final Map<String, Set<String>> assignedTopicsByTopology = new HashMap<>();
+
+        for (final Map.Entry<TaskId, Set<TopicPartition>> task : tasks.entrySet()) {
+            final String topologyName = getTopologyNameOrElseUnnamed(task.getKey().topologyName());
+            // Skip updating subscription with topics if their topology is not yet known to this client,
+            // the subscription will be updated when the topology is added
+            if (builders.containsKey(topologyName)) {
+                assignedTopicsByTopology
+                    .computeIfAbsent(topologyName, t -> new HashSet<>())
+                    .addAll(task.getValue().stream().map(TopicPartition::topic).collect(Collectors.toSet()));
+            }
+        }
+        for (final Map.Entry<String, Set<String>> assignedTopics : assignedTopicsByTopology.entrySet()) {
+            final Set<String> newTopics =
+                lookupBuilderForNamedTopology(assignedTopics.getKey())

Review comment:
       Ditto here, should we check that before any new topics are added to the topology, it's topic set should be empty?

##########
File path: streams/src/main/java/org/apache/kafka/streams/processor/internals/TopologyMetadata.java
##########
@@ -447,11 +457,106 @@ public String getStoreForChangelogTopic(final String topicName) {
     }
 
     void addSubscribedTopicsFromMetadata(final Set<String> topics, final String logPrefix) {
-        applyToEachBuilder(b -> b.addSubscribedTopicsFromMetadata(topics, logPrefix));

Review comment:
       Can we remove the `addSubscribedTopicsFromMetadata` in `InternalTopologyBuilder` then?




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscribe@kafka.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [kafka] ableegoldman commented on a change in pull request #11601: KAFKA-12648: Minor fixes for input topic management

Posted by GitBox <gi...@apache.org>.
ableegoldman commented on a change in pull request #11601:
URL: https://github.com/apache/kafka/pull/11601#discussion_r769102089



##########
File path: streams/src/main/java/org/apache/kafka/streams/processor/internals/InternalTopologyBuilder.java
##########
@@ -1348,14 +1346,7 @@ private String decorateTopic(final String topic) {
     }
 
     void initializeSubscription() {

Review comment:
       Oh I just forgot to remove the method, thanks




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscribe@kafka.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org