You are viewing a plain text version of this content. The canonical link for it is here.
Posted to jira@kafka.apache.org by GitBox <gi...@apache.org> on 2020/06/05 15:32:35 UTC

[GitHub] [kafka] abbccdda commented on a change in pull request #8803: KAFKA-10102: update ProcessorTopology instead of rebuilding it

abbccdda commented on a change in pull request #8803:
URL: https://github.com/apache/kafka/pull/8803#discussion_r435990496



##########
File path: streams/src/test/java/org/apache/kafka/streams/processor/internals/ProcessorTopologyTest.java
##########
@@ -149,6 +150,29 @@ public void shouldGetTerminalNodes() {
         assertThat(processorTopology.terminalNodes(), equalTo(mkSet("processor-2", "sink-1")));
     }
 
+    @Test
+    public void shouldUpdateSourceTopicsWithNewMatchingTopic() {
+        topology.addSource("source-1", "topic-1");
+        final ProcessorTopology processorTopology = topology.getInternalBuilder("X").buildTopology();
+
+        assertNull(processorTopology.source("topic-2"));
+        processorTopology.updateSourceTopics(Collections.singletonMap("source-1", asList("topic-1", "topic-2")));
+
+        assertThat(processorTopology.source("topic-2"), instanceOf(SourceNode.class));

Review comment:
       Why don't we just verify the name of `processorTopology.source("topic-2")` to be `source-1`?

##########
File path: streams/src/main/java/org/apache/kafka/streams/processor/internals/ProcessorTopology.java
##########
@@ -131,6 +139,16 @@ public boolean hasPersistentGlobalStore() {
         return false;
     }
 
+    public void updateSourceTopics(final Map<String, List<String>> nodeToSourceTopics) {

Review comment:
       For this update function, should we also look at the `sourceNodesByName` and make sure their keysets are matching, otherwise we would throw illegal state here? I think this is legitimate insurance before @cadonna starts the topology revolution work.

##########
File path: streams/src/main/java/org/apache/kafka/streams/processor/internals/ProcessorTopology.java
##########
@@ -167,11 +185,21 @@ public String toString() {
      * @return A string representation of this instance.
      */
     public String toString(final String indent) {
+        final Map<SourceNode<?, ?>, List<String>> sourceToTopics = new HashMap<>();
+        for (final Map.Entry<String, SourceNode<?, ?>> sourceNodeEntry : sourcesByTopic.entrySet()) {
+            final String topic = sourceNodeEntry.getKey();
+            final SourceNode<?, ?> source = sourceNodeEntry.getValue();
+            sourceToTopics.computeIfAbsent(source, s -> new ArrayList<>());
+            sourceToTopics.get(source).add(topic);
+        }
+
         final StringBuilder sb = new StringBuilder(indent + "ProcessorTopology:\n");
 
         // start from sources
-        for (final SourceNode<?, ?> source : sourcesByTopic.values()) {
-            sb.append(source.toString(indent + "\t")).append(childrenToString(indent + "\t", source.children()));
+        for (final Map.Entry<SourceNode<?, ?>, List<String>> sourceNodeEntry : sourceToTopics.entrySet()) {

Review comment:
       Does the new map ensure a topological order?

##########
File path: streams/src/main/java/org/apache/kafka/streams/processor/internals/InternalTopologyBuilder.java
##########
@@ -270,16 +270,7 @@ private SourceNodeFactory(final String name,
 
         @Override
         public ProcessorNode<K, V> build() {
-            final List<String> sourceTopics = nodeToSourceTopics.get(name);
-
-            // if it is subscribed via patterns, it is possible that the topic metadata has not been updated
-            // yet and hence the map from source node to topics is stale, in this case we put the pattern as a place holder;
-            // this should only happen for debugging since during runtime this function should always be called after the metadata has updated.
-            if (sourceTopics == null) {
-                return new SourceNode<>(name, Collections.singletonList(String.valueOf(pattern)), timestampExtractor, keyDeserializer, valDeserializer);
-            } else {
-                return new SourceNode<>(name, maybeDecorateInternalSourceTopics(sourceTopics), timestampExtractor, keyDeserializer, valDeserializer);
-            }
+            return new SourceNode<>(name, timestampExtractor, keyDeserializer, valDeserializer);

Review comment:
       Nice to simplify :)

##########
File path: streams/src/main/java/org/apache/kafka/streams/processor/internals/SourceNode.java
##########
@@ -112,7 +106,7 @@ public String toString() {
     /**
      * @return a string representation of this node starting with the given indent, useful for debugging.
      */
-    public String toString(final String indent) {
+    public String toString(final String indent, final List<String> topics) {

Review comment:
       We do not need to pass in the topics here, just make the topic string creation logic inside `ProcessorTopology` since the data is stored there.

##########
File path: streams/src/main/java/org/apache/kafka/streams/processor/internals/ProcessorTopology.java
##########
@@ -167,11 +185,21 @@ public String toString() {
      * @return A string representation of this instance.
      */
     public String toString(final String indent) {
+        final Map<SourceNode<?, ?>, List<String>> sourceToTopics = new HashMap<>();
+        for (final Map.Entry<String, SourceNode<?, ?>> sourceNodeEntry : sourcesByTopic.entrySet()) {
+            final String topic = sourceNodeEntry.getKey();
+            final SourceNode<?, ?> source = sourceNodeEntry.getValue();
+            sourceToTopics.computeIfAbsent(source, s -> new ArrayList<>());
+            sourceToTopics.get(source).add(topic);
+        }
+
         final StringBuilder sb = new StringBuilder(indent + "ProcessorTopology:\n");
 
         // start from sources
-        for (final SourceNode<?, ?> source : sourcesByTopic.values()) {
-            sb.append(source.toString(indent + "\t")).append(childrenToString(indent + "\t", source.children()));
+        for (final Map.Entry<SourceNode<?, ?>, List<String>> sourceNodeEntry : sourceToTopics.entrySet()) {
+            final SourceNode<?, ?> source  = sourceNodeEntry.getKey();

Review comment:
       nit: space




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org