You are viewing a plain text version of this content. The canonical link for it is here.
Posted to jira@kafka.apache.org by GitBox <gi...@apache.org> on 2021/12/14 18:38:49 UTC

[GitHub] [kafka] wcarlson5 commented on a change in pull request #11601: KAFKA-12648: Minor fixes for input topic management

wcarlson5 commented on a change in pull request #11601:
URL: https://github.com/apache/kafka/pull/11601#discussion_r768916262



##########
File path: streams/src/main/java/org/apache/kafka/streams/processor/internals/InternalTopologyBuilder.java
##########
@@ -2067,20 +2073,24 @@ private boolean hasSubscriptionUpdates() {
         return !subscriptionUpdates.isEmpty();
     }
 
-    synchronized void addSubscribedTopicsFromAssignment(final List<TopicPartition> partitions, final String logPrefix) {
+    /**
+     * @return the set of topics that were newly

Review comment:
       that were newly what?

##########
File path: streams/src/main/java/org/apache/kafka/streams/processor/internals/InternalTopologyBuilder.java
##########
@@ -1348,14 +1346,7 @@ private String decorateTopic(final String topic) {
     }
 
     void initializeSubscription() {

Review comment:
       Do we need this empty method really?

##########
File path: streams/src/main/java/org/apache/kafka/streams/processor/internals/InternalTopologyBuilder.java
##########
@@ -1383,14 +1374,29 @@ boolean usesPatternSubscription() {
         return !nodeToSourcePatterns.isEmpty();
     }
 
-    synchronized Collection<String> sourceTopicCollection() {
+    boolean matchesSubscribedPattern(final String topic) {
+        return nodeToSourcePatterns.entrySet().stream().anyMatch(p -> p.getValue().pattern().matches(topic));
+    }
+
+    /**
+     * NOTE: this should not be invoked until the topology has been built via
+     * {@link #buildTopology()}, otherwise the collection will be uninitialized
+     *
+     * @return all user and internal source topics in this topology,
+     *         or {@code null} if uninitialized because {@link #buildTopology()} hs not been called yet
+     */
+    public synchronized List<String> sourceTopicCollection() {

Review comment:
       The method says Collection but now returns a list...

##########
File path: streams/src/main/java/org/apache/kafka/streams/processor/internals/InternalTopologyBuilder.java
##########
@@ -2067,20 +2073,24 @@ private boolean hasSubscriptionUpdates() {
         return !subscriptionUpdates.isEmpty();
     }
 
-    synchronized void addSubscribedTopicsFromAssignment(final List<TopicPartition> partitions, final String logPrefix) {
+    /**
+     * @return the set of topics that were newly
+     */
+    synchronized Set<String> addSubscribedTopicsFromAssignment(final Set<String> topics, final String logPrefix) {
         if (usesPatternSubscription()) {

Review comment:
       I am a little concerned with this. Are pattern subscriptions the only way to get new topics? what about a namedtopolgy that hasn't been added to this client yet? will this just mean the client will ignore those tasks? 

##########
File path: streams/src/main/java/org/apache/kafka/streams/processor/internals/TopologyMetadata.java
##########
@@ -447,11 +453,106 @@ public String getStoreForChangelogTopic(final String topicName) {
     }
 
     void addSubscribedTopicsFromMetadata(final Set<String> topics, final String logPrefix) {
-        applyToEachBuilder(b -> b.addSubscribedTopicsFromMetadata(topics, logPrefix));
+        if (usesPatternSubscription()) {
+            final Map<String, Set<String>> newTopicsByTopology = new HashMap<>();
+            final Set<String> duplicateInputTopics = new HashSet<>();
+            final Set<String> subscriptionTopicsWithUnknownSource = new HashSet<>();
+            for (final String topic : topics) {
+                final Set<String> subscribingTopologies = new HashSet<>();
+                applyToEachBuilder(b -> {
+                    if (b.sourceTopicCollection().contains(topic) || b.matchesSubscribedPattern(topic)) {
+                        subscribingTopologies.add(getTopologyNameOrElseUnnamed(b.topologyName()));
+                    }
+                });
+                if (subscribingTopologies.size() > 1) {
+                    log.error("{}Subscribed topic {} matches more than one topology: {}",
+                        logPrefix, topic, subscribingTopologies);
+                    duplicateInputTopics.add(topic);
+                } else if (subscribingTopologies.isEmpty()) {
+                    log.error("{}Topic {} is subscribed to by the consumer, but no topology can be identified " +
+                        "that is subscribing to that topic or to a matching Pattern", logPrefix, topic);
+                    subscriptionTopicsWithUnknownSource.add(topic);
+                }
+                for (final String topology : subscribingTopologies) {
+                    if (!allInputTopics.contains(topic)) {
+                        newTopicsByTopology
+                            .computeIfAbsent(topology, t -> new HashSet<>())
+                            .add(topic);
+                    }
+                }
+            }
+
+            if (!duplicateInputTopics.isEmpty()) {
+                throw new IllegalStateException("The following topics are subscribed to " +
+                    "by multiple topologies: " + duplicateInputTopics);
+            } else if (subscriptionTopicsWithUnknownSource.isEmpty()) {
+                throw new IllegalStateException("The following topics appear in the subscription " +

Review comment:
       You might already be on it, but can we add tests for both these cases?

##########
File path: streams/src/main/java/org/apache/kafka/streams/processor/internals/TopologyMetadata.java
##########
@@ -447,11 +453,106 @@ public String getStoreForChangelogTopic(final String topicName) {
     }
 
     void addSubscribedTopicsFromMetadata(final Set<String> topics, final String logPrefix) {
-        applyToEachBuilder(b -> b.addSubscribedTopicsFromMetadata(topics, logPrefix));
+        if (usesPatternSubscription()) {
+            final Map<String, Set<String>> newTopicsByTopology = new HashMap<>();
+            final Set<String> duplicateInputTopics = new HashSet<>();
+            final Set<String> subscriptionTopicsWithUnknownSource = new HashSet<>();
+            for (final String topic : topics) {
+                final Set<String> subscribingTopologies = new HashSet<>();
+                applyToEachBuilder(b -> {
+                    if (b.sourceTopicCollection().contains(topic) || b.matchesSubscribedPattern(topic)) {
+                        subscribingTopologies.add(getTopologyNameOrElseUnnamed(b.topologyName()));
+                    }
+                });
+                if (subscribingTopologies.size() > 1) {
+                    log.error("{}Subscribed topic {} matches more than one topology: {}",
+                        logPrefix, topic, subscribingTopologies);
+                    duplicateInputTopics.add(topic);
+                } else if (subscribingTopologies.isEmpty()) {
+                    log.error("{}Topic {} is subscribed to by the consumer, but no topology can be identified " +
+                        "that is subscribing to that topic or to a matching Pattern", logPrefix, topic);
+                    subscriptionTopicsWithUnknownSource.add(topic);
+                }
+                for (final String topology : subscribingTopologies) {
+                    if (!allInputTopics.contains(topic)) {
+                        newTopicsByTopology
+                            .computeIfAbsent(topology, t -> new HashSet<>())
+                            .add(topic);
+                    }
+                }
+            }
+
+            if (!duplicateInputTopics.isEmpty()) {
+                throw new IllegalStateException("The following topics are subscribed to " +
+                    "by multiple topologies: " + duplicateInputTopics);
+            } else if (subscriptionTopicsWithUnknownSource.isEmpty()) {

Review comment:
       if not empty?

##########
File path: streams/src/main/java/org/apache/kafka/streams/processor/internals/TopologyMetadata.java
##########
@@ -447,11 +453,106 @@ public String getStoreForChangelogTopic(final String topicName) {
     }
 
     void addSubscribedTopicsFromMetadata(final Set<String> topics, final String logPrefix) {
-        applyToEachBuilder(b -> b.addSubscribedTopicsFromMetadata(topics, logPrefix));
+        if (usesPatternSubscription()) {
+            final Map<String, Set<String>> newTopicsByTopology = new HashMap<>();
+            final Set<String> duplicateInputTopics = new HashSet<>();
+            final Set<String> subscriptionTopicsWithUnknownSource = new HashSet<>();
+            for (final String topic : topics) {
+                final Set<String> subscribingTopologies = new HashSet<>();
+                applyToEachBuilder(b -> {
+                    if (b.sourceTopicCollection().contains(topic) || b.matchesSubscribedPattern(topic)) {
+                        subscribingTopologies.add(getTopologyNameOrElseUnnamed(b.topologyName()));
+                    }
+                });
+                if (subscribingTopologies.size() > 1) {
+                    log.error("{}Subscribed topic {} matches more than one topology: {}",
+                        logPrefix, topic, subscribingTopologies);
+                    duplicateInputTopics.add(topic);
+                } else if (subscribingTopologies.isEmpty()) {
+                    log.error("{}Topic {} is subscribed to by the consumer, but no topology can be identified " +
+                        "that is subscribing to that topic or to a matching Pattern", logPrefix, topic);
+                    subscriptionTopicsWithUnknownSource.add(topic);
+                }
+                for (final String topology : subscribingTopologies) {
+                    if (!allInputTopics.contains(topic)) {
+                        newTopicsByTopology
+                            .computeIfAbsent(topology, t -> new HashSet<>())
+                            .add(topic);
+                    }
+                }
+            }
+
+            if (!duplicateInputTopics.isEmpty()) {
+                throw new IllegalStateException("The following topics are subscribed to " +
+                    "by multiple topologies: " + duplicateInputTopics);
+            } else if (subscriptionTopicsWithUnknownSource.isEmpty()) {
+                throw new IllegalStateException("The following topics appear in the subscription " +
+                    "but can't be located in the topology: " + subscriptionTopicsWithUnknownSource);
+            }
+
+            updateAndVerifyNewInputTopics(newTopicsByTopology);
+            for (final Map.Entry<String, Set<String>> topology : newTopicsByTopology.entrySet()) {
+                lookupBuilderForNamedTopology(topology.getKey())
+                    .addSubscribedTopicsFromMetadata(topology.getValue(), logPrefix);
+            }
+        } else {
+            if (!topics.equals(sourceTopicCollection())) {
+                log.error("{}Consumer's subscription does not match the known source topics.\n" +
+                        "consumer subscription: {}\n" +
+                        "topics in topology metadata: {}",
+                    logPrefix, topics, sourceTopicCollection());
+                throw new IllegalStateException("Consumer subscribed topics and topology's known topics do not match.");
+            }
+        }
     }
 
-    void addSubscribedTopicsFromAssignment(final List<TopicPartition> partitions, final String logPrefix) {
-        applyToEachBuilder(b -> b.addSubscribedTopicsFromAssignment(partitions, logPrefix));
+    void addSubscribedTopicsFromAssignment(final Map<TaskId, Set<TopicPartition>> tasks, final String logPrefix) {
+        final Map<String, Set<String>> assignedTopicsByTopology = new HashMap<>();
+
+        for (final Map.Entry<TaskId, Set<TopicPartition>> task : tasks.entrySet()) {
+            final String topologyName = getTopologyNameOrElseUnnamed(task.getKey().topologyName());
+            assignedTopicsByTopology
+                .computeIfAbsent(topologyName, t -> new HashSet<>())
+                .addAll(task.getValue().stream().map(TopicPartition::topic).collect(Collectors.toSet()));
+        }
+        for (final Map.Entry<String, Set<String>> assignedTopics : assignedTopicsByTopology.entrySet()) {
+            final Set<String> newTopics =
+                lookupBuilderForNamedTopology(assignedTopics.getKey())
+                    .addSubscribedTopicsFromAssignment(assignedTopics.getValue(), logPrefix);
+            assignedTopics.getValue().retainAll(newTopics);
+        }
+        updateAndVerifyNewInputTopics(assignedTopicsByTopology);
+    }
+
+    private String getTopologyNameOrElseUnnamed(final String topologyName) {
+        return topologyName == null ? UNNAMED_TOPOLOGY : topologyName;
+    }
+
+    private void updateAndVerifyNewInputTopics(final Map<String, Set<String>> newTopicsByTopology) {
+        final Set<String> duplicateInputTopics = new HashSet<>();
+
+        for (final Map.Entry<String, Set<String>> newTopics : newTopicsByTopology.entrySet()) {
+            final String topologyName = newTopics.getKey();
+            for (final String newTopic : newTopics.getValue()) {
+                if (allInputTopics.contains(newTopic)) {
+                    duplicateInputTopics.add(newTopic);
+                    final String namedTopologyErrorMessagePrefix = UNNAMED_TOPOLOGY.equals(topologyName) ?
+                        "" :
+                        "Topology {}: ";
+                    log.error("{}Cannot add topic {} to the subscription as the application is " +
+                        "already consuming from this topic elsewhere in the topology",
+                        namedTopologyErrorMessagePrefix, newTopic);
+                } else {
+                    allInputTopics.add(newTopic);
+                }
+            }
+        }
+        if (!duplicateInputTopics.isEmpty()) {

Review comment:
       This is unrelated to this PR but the check brings up a good point. Maybe instead of throwing an error on the pattern subscription finding an overlapping topic and then restarting the thread and trying again. Maybe we can pause a topology until the topics are not longer overlapping?
   
   EDIT: I can see this in the other PR




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscribe@kafka.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org