You are viewing a plain text version of this content. The canonical link for it is here.
Posted to jira@kafka.apache.org by GitBox <gi...@apache.org> on 2020/06/05 04:03:13 UTC

[GitHub] [kafka] ableegoldman opened a new pull request #8803: KAFKA-10102: update ProcessorTopology instead of rebuilding it

ableegoldman opened a new pull request #8803:
URL: https://github.com/apache/kafka/pull/8803


   Rather than recreate the entire topology when we find new regex matched input partitions, we can just update the relevant pieces.
   
   Only the SourceNodes and ProcessorTopology seem to care about the input topics, and we can actually extract this information out of the SourceNode altogether by pulling it into the ProcessorTopology.
   


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [kafka] mjsax commented on pull request #8803: KAFKA-10102: update ProcessorTopology instead of rebuilding it

Posted by GitBox <gi...@apache.org>.
mjsax commented on pull request #8803:
URL: https://github.com/apache/kafka/pull/8803#issuecomment-639871254


   Retest this please.


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [kafka] ableegoldman commented on a change in pull request #8803: KAFKA-10102: update ProcessorTopology instead of rebuilding it

Posted by GitBox <gi...@apache.org>.
ableegoldman commented on a change in pull request #8803:
URL: https://github.com/apache/kafka/pull/8803#discussion_r436187924



##########
File path: streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamTask.java
##########
@@ -495,12 +496,9 @@ public void closeDirty() {
     }
 
     @Override
-    public void update(final Set<TopicPartition> topicPartitions, final ProcessorTopology processorTopology) {
-        super.update(topicPartitions, processorTopology);
+    public void update(final Set<TopicPartition> topicPartitions, final Map<String, List<String>> nodeToSourceTopics) {

Review comment:
       Ack, done




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [kafka] mjsax merged pull request #8803: KAFKA-10102: update ProcessorTopology instead of rebuilding it

Posted by GitBox <gi...@apache.org>.
mjsax merged pull request #8803:
URL: https://github.com/apache/kafka/pull/8803


   


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [kafka] mjsax commented on pull request #8803: KAFKA-10102: update ProcessorTopology instead of rebuilding it

Posted by GitBox <gi...@apache.org>.
mjsax commented on pull request #8803:
URL: https://github.com/apache/kafka/pull/8803#issuecomment-639873426


   Retest this please.


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [kafka] ableegoldman commented on a change in pull request #8803: KAFKA-10102: update ProcessorTopology instead of rebuilding it

Posted by GitBox <gi...@apache.org>.
ableegoldman commented on a change in pull request #8803:
URL: https://github.com/apache/kafka/pull/8803#discussion_r436093714



##########
File path: streams/src/main/java/org/apache/kafka/streams/processor/internals/ProcessorTopology.java
##########
@@ -131,6 +139,16 @@ public boolean hasPersistentGlobalStore() {
         return false;
     }
 
+    public void updateSourceTopics(final Map<String, List<String>> nodeToSourceTopics) {

Review comment:
       good call




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [kafka] ableegoldman commented on pull request #8803: KAFKA-10102: update ProcessorTopology instead of rebuilding it

Posted by GitBox <gi...@apache.org>.
ableegoldman commented on pull request #8803:
URL: https://github.com/apache/kafka/pull/8803#issuecomment-640741696


   All tests passed, can we merge @mjsax ?


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [kafka] ableegoldman commented on a change in pull request #8803: KAFKA-10102: update ProcessorTopology instead of rebuilding it

Posted by GitBox <gi...@apache.org>.
ableegoldman commented on a change in pull request #8803:
URL: https://github.com/apache/kafka/pull/8803#discussion_r436096665



##########
File path: streams/src/main/java/org/apache/kafka/streams/processor/internals/ProcessorTopology.java
##########
@@ -167,11 +185,21 @@ public String toString() {
      * @return A string representation of this instance.
      */
     public String toString(final String indent) {
+        final Map<SourceNode<?, ?>, List<String>> sourceToTopics = new HashMap<>();
+        for (final Map.Entry<String, SourceNode<?, ?>> sourceNodeEntry : sourcesByTopic.entrySet()) {
+            final String topic = sourceNodeEntry.getKey();
+            final SourceNode<?, ?> source = sourceNodeEntry.getValue();
+            sourceToTopics.computeIfAbsent(source, s -> new ArrayList<>());
+            sourceToTopics.get(source).add(topic);
+        }
+
         final StringBuilder sb = new StringBuilder(indent + "ProcessorTopology:\n");
 
         // start from sources
-        for (final SourceNode<?, ?> source : sourcesByTopic.values()) {
-            sb.append(source.toString(indent + "\t")).append(childrenToString(indent + "\t", source.children()));
+        for (final Map.Entry<SourceNode<?, ?>, List<String>> sourceNodeEntry : sourceToTopics.entrySet()) {

Review comment:
       Well, it's in topological order in that we start from source nodes and proceed through the children




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [kafka] abbccdda commented on a change in pull request #8803: KAFKA-10102: update ProcessorTopology instead of rebuilding it

Posted by GitBox <gi...@apache.org>.
abbccdda commented on a change in pull request #8803:
URL: https://github.com/apache/kafka/pull/8803#discussion_r436125793



##########
File path: streams/src/main/java/org/apache/kafka/streams/processor/internals/ProcessorTopology.java
##########
@@ -167,12 +195,36 @@ public String toString() {
      * @return A string representation of this instance.
      */
     public String toString(final String indent) {

Review comment:
       I prefer we get a newbie ticket to test out this logic in a unit test case.

##########
File path: streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamTask.java
##########
@@ -495,12 +496,9 @@ public void closeDirty() {
     }
 
     @Override
-    public void update(final Set<TopicPartition> topicPartitions, final ProcessorTopology processorTopology) {
-        super.update(topicPartitions, processorTopology);
+    public void update(final Set<TopicPartition> topicPartitions, final Map<String, List<String>> nodeToSourceTopics) {

Review comment:
       Could we add a unit test in `StreamTaskTest`?




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [kafka] mjsax commented on a change in pull request #8803: KAFKA-10102: update ProcessorTopology instead of rebuilding it

Posted by GitBox <gi...@apache.org>.
mjsax commented on a change in pull request #8803:
URL: https://github.com/apache/kafka/pull/8803#discussion_r436188966



##########
File path: streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamTask.java
##########
@@ -512,18 +510,16 @@ public void closeAndRecycleState() {
         }
         switch (state()) {
             case CREATED:
-            case RUNNING:
             case RESTORING:
+            case RUNNING:
             case SUSPENDED:
                 stateMgr.recycle();
                 recordCollector.close();
                 break;
-
             case CLOSED:
-                throw new IllegalStateException("Illegal state " + state() + " while closing active task " + id);
-
+                throw new IllegalStateException("Illegal state " + state() + " while recycling active task " + id);
             default:
-                throw new IllegalStateException("Unknown state " + state() + " while closing active task " + id);
+                throw new IllegalStateException("Unknown state " + state() + " while recycling active task " + id);

Review comment:
       Thanks for the cleanup!




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [kafka] abbccdda commented on a change in pull request #8803: KAFKA-10102: update ProcessorTopology instead of rebuilding it

Posted by GitBox <gi...@apache.org>.
abbccdda commented on a change in pull request #8803:
URL: https://github.com/apache/kafka/pull/8803#discussion_r436117858



##########
File path: streams/src/main/java/org/apache/kafka/streams/processor/internals/ProcessorTopology.java
##########
@@ -167,11 +185,21 @@ public String toString() {
      * @return A string representation of this instance.
      */
     public String toString(final String indent) {
+        final Map<SourceNode<?, ?>, List<String>> sourceToTopics = new HashMap<>();
+        for (final Map.Entry<String, SourceNode<?, ?>> sourceNodeEntry : sourcesByTopic.entrySet()) {
+            final String topic = sourceNodeEntry.getKey();
+            final SourceNode<?, ?> source = sourceNodeEntry.getValue();
+            sourceToTopics.computeIfAbsent(source, s -> new ArrayList<>());
+            sourceToTopics.get(source).add(topic);
+        }
+
         final StringBuilder sb = new StringBuilder(indent + "ProcessorTopology:\n");
 
         // start from sources
-        for (final SourceNode<?, ?> source : sourcesByTopic.values()) {
-            sb.append(source.toString(indent + "\t")).append(childrenToString(indent + "\t", source.children()));
+        for (final Map.Entry<SourceNode<?, ?>, List<String>> sourceNodeEntry : sourceToTopics.entrySet()) {

Review comment:
       Oh I see




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [kafka] mjsax commented on pull request #8803: KAFKA-10102: update ProcessorTopology instead of rebuilding it

Posted by GitBox <gi...@apache.org>.
mjsax commented on pull request #8803:
URL: https://github.com/apache/kafka/pull/8803#issuecomment-639873540


   Retest this please.


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [kafka] mjsax commented on a change in pull request #8803: KAFKA-10102: update ProcessorTopology instead of rebuilding it

Posted by GitBox <gi...@apache.org>.
mjsax commented on a change in pull request #8803:
URL: https://github.com/apache/kafka/pull/8803#discussion_r436188699



##########
File path: streams/src/main/java/org/apache/kafka/streams/processor/internals/ProcessorTopology.java
##########
@@ -131,6 +142,23 @@ public boolean hasPersistentGlobalStore() {
         return false;
     }
 
+    public void updateSourceTopics(final Map<String, List<String>> sourceTopicsByName) {
+        if (!sourceTopicsByName.keySet().equals(sourceNodesByName.keySet())) {
+            log.error("Set of source nodes do not match: \n" +
+                "sourceNodesByName = {}\n" +
+                "sourceTopicsByName = {}",
+                sourceNodesByName.keySet(), sourceTopicsByName.keySet());
+            throw new IllegalStateException("Tried to update source topics but source nodes did not match");
+        }
+        sourceNodesByTopic.clear();
+        for (final Map.Entry<String, List<String>> sourceEntry : sourceTopicsByName.entrySet()) {
+            final String nodeName = sourceEntry.getKey();
+            for (final String topic : sourceEntry.getValue()) {
+                sourceNodesByTopic.put(topic, sourceNodesByName.get(nodeName));

Review comment:
       Should we have a sanity check, that no `topic` is added twice?




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [kafka] ableegoldman commented on pull request #8803: KAFKA-10102: update ProcessorTopology instead of rebuilding it

Posted by GitBox <gi...@apache.org>.
ableegoldman commented on pull request #8803:
URL: https://github.com/apache/kafka/pull/8803#issuecomment-639245176


   Ready for review @abbccdda @guozhangwang 


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [kafka] ableegoldman commented on pull request #8803: KAFKA-10102: update ProcessorTopology instead of rebuilding it

Posted by GitBox <gi...@apache.org>.
ableegoldman commented on pull request #8803:
URL: https://github.com/apache/kafka/pull/8803#issuecomment-639895726


   Builds failed with 
   `15:42:01 java.lang.OutOfMemoryError: unable to create native thread: possibly out of memory or process/resource limits reached`
   :/ 


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [kafka] abbccdda commented on a change in pull request #8803: KAFKA-10102: update ProcessorTopology instead of rebuilding it

Posted by GitBox <gi...@apache.org>.
abbccdda commented on a change in pull request #8803:
URL: https://github.com/apache/kafka/pull/8803#discussion_r435990496



##########
File path: streams/src/test/java/org/apache/kafka/streams/processor/internals/ProcessorTopologyTest.java
##########
@@ -149,6 +150,29 @@ public void shouldGetTerminalNodes() {
         assertThat(processorTopology.terminalNodes(), equalTo(mkSet("processor-2", "sink-1")));
     }
 
+    @Test
+    public void shouldUpdateSourceTopicsWithNewMatchingTopic() {
+        topology.addSource("source-1", "topic-1");
+        final ProcessorTopology processorTopology = topology.getInternalBuilder("X").buildTopology();
+
+        assertNull(processorTopology.source("topic-2"));
+        processorTopology.updateSourceTopics(Collections.singletonMap("source-1", asList("topic-1", "topic-2")));
+
+        assertThat(processorTopology.source("topic-2"), instanceOf(SourceNode.class));

Review comment:
       Why don't we just verify the name of `processorTopology.source("topic-2")` to be `source-1`?

##########
File path: streams/src/main/java/org/apache/kafka/streams/processor/internals/ProcessorTopology.java
##########
@@ -131,6 +139,16 @@ public boolean hasPersistentGlobalStore() {
         return false;
     }
 
+    public void updateSourceTopics(final Map<String, List<String>> nodeToSourceTopics) {

Review comment:
       For this update function, should we also look at the `sourceNodesByName` and make sure their keysets are matching, otherwise we would throw illegal state here? I think this is legitimate insurance before @cadonna starts the topology revolution work.

##########
File path: streams/src/main/java/org/apache/kafka/streams/processor/internals/ProcessorTopology.java
##########
@@ -167,11 +185,21 @@ public String toString() {
      * @return A string representation of this instance.
      */
     public String toString(final String indent) {
+        final Map<SourceNode<?, ?>, List<String>> sourceToTopics = new HashMap<>();
+        for (final Map.Entry<String, SourceNode<?, ?>> sourceNodeEntry : sourcesByTopic.entrySet()) {
+            final String topic = sourceNodeEntry.getKey();
+            final SourceNode<?, ?> source = sourceNodeEntry.getValue();
+            sourceToTopics.computeIfAbsent(source, s -> new ArrayList<>());
+            sourceToTopics.get(source).add(topic);
+        }
+
         final StringBuilder sb = new StringBuilder(indent + "ProcessorTopology:\n");
 
         // start from sources
-        for (final SourceNode<?, ?> source : sourcesByTopic.values()) {
-            sb.append(source.toString(indent + "\t")).append(childrenToString(indent + "\t", source.children()));
+        for (final Map.Entry<SourceNode<?, ?>, List<String>> sourceNodeEntry : sourceToTopics.entrySet()) {

Review comment:
       Does the new map ensure a topological order?

##########
File path: streams/src/main/java/org/apache/kafka/streams/processor/internals/InternalTopologyBuilder.java
##########
@@ -270,16 +270,7 @@ private SourceNodeFactory(final String name,
 
         @Override
         public ProcessorNode<K, V> build() {
-            final List<String> sourceTopics = nodeToSourceTopics.get(name);
-
-            // if it is subscribed via patterns, it is possible that the topic metadata has not been updated
-            // yet and hence the map from source node to topics is stale, in this case we put the pattern as a place holder;
-            // this should only happen for debugging since during runtime this function should always be called after the metadata has updated.
-            if (sourceTopics == null) {
-                return new SourceNode<>(name, Collections.singletonList(String.valueOf(pattern)), timestampExtractor, keyDeserializer, valDeserializer);
-            } else {
-                return new SourceNode<>(name, maybeDecorateInternalSourceTopics(sourceTopics), timestampExtractor, keyDeserializer, valDeserializer);
-            }
+            return new SourceNode<>(name, timestampExtractor, keyDeserializer, valDeserializer);

Review comment:
       Nice to simplify :)

##########
File path: streams/src/main/java/org/apache/kafka/streams/processor/internals/SourceNode.java
##########
@@ -112,7 +106,7 @@ public String toString() {
     /**
      * @return a string representation of this node starting with the given indent, useful for debugging.
      */
-    public String toString(final String indent) {
+    public String toString(final String indent, final List<String> topics) {

Review comment:
       We do not need to pass in the topics here, just make the topic string creation logic inside `ProcessorTopology` since the data is stored there.

##########
File path: streams/src/main/java/org/apache/kafka/streams/processor/internals/ProcessorTopology.java
##########
@@ -167,11 +185,21 @@ public String toString() {
      * @return A string representation of this instance.
      */
     public String toString(final String indent) {
+        final Map<SourceNode<?, ?>, List<String>> sourceToTopics = new HashMap<>();
+        for (final Map.Entry<String, SourceNode<?, ?>> sourceNodeEntry : sourcesByTopic.entrySet()) {
+            final String topic = sourceNodeEntry.getKey();
+            final SourceNode<?, ?> source = sourceNodeEntry.getValue();
+            sourceToTopics.computeIfAbsent(source, s -> new ArrayList<>());
+            sourceToTopics.get(source).add(topic);
+        }
+
         final StringBuilder sb = new StringBuilder(indent + "ProcessorTopology:\n");
 
         // start from sources
-        for (final SourceNode<?, ?> source : sourcesByTopic.values()) {
-            sb.append(source.toString(indent + "\t")).append(childrenToString(indent + "\t", source.children()));
+        for (final Map.Entry<SourceNode<?, ?>, List<String>> sourceNodeEntry : sourceToTopics.entrySet()) {
+            final SourceNode<?, ?> source  = sourceNodeEntry.getKey();

Review comment:
       nit: space




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [kafka] mjsax commented on pull request #8803: KAFKA-10102: update ProcessorTopology instead of rebuilding it

Posted by GitBox <gi...@apache.org>.
mjsax commented on pull request #8803:
URL: https://github.com/apache/kafka/pull/8803#issuecomment-640113151


   Java 8 and Java 11 failed with env issue. Java 14 passed.


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [kafka] ableegoldman commented on a change in pull request #8803: KAFKA-10102: update ProcessorTopology instead of rebuilding it

Posted by GitBox <gi...@apache.org>.
ableegoldman commented on a change in pull request #8803:
URL: https://github.com/apache/kafka/pull/8803#discussion_r436177038



##########
File path: streams/src/main/java/org/apache/kafka/streams/processor/internals/ProcessorTopology.java
##########
@@ -167,12 +195,36 @@ public String toString() {
      * @return A string representation of this instance.
      */
     public String toString(final String indent) {

Review comment:
       SG




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [kafka] mjsax commented on pull request #8803: KAFKA-10102: update ProcessorTopology instead of rebuilding it

Posted by GitBox <gi...@apache.org>.
mjsax commented on pull request #8803:
URL: https://github.com/apache/kafka/pull/8803#issuecomment-640934234


   Merged to `trunk` and cherry-picked to `2.6` branch.


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [kafka] mjsax commented on pull request #8803: KAFKA-10102: update ProcessorTopology instead of rebuilding it

Posted by GitBox <gi...@apache.org>.
mjsax commented on pull request #8803:
URL: https://github.com/apache/kafka/pull/8803#issuecomment-640113179


   Retest this please.


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [kafka] mjsax commented on pull request #8803: KAFKA-10102: update ProcessorTopology instead of rebuilding it

Posted by GitBox <gi...@apache.org>.
mjsax commented on pull request #8803:
URL: https://github.com/apache/kafka/pull/8803#issuecomment-639970287


   Retest this please.


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org