You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@kafka.apache.org by mj...@apache.org on 2021/06/15 08:14:48 UTC

[kafka] branch 2.8 updated: KAFKA-12914: StreamSourceNode should return `null` topic name for pattern subscription (#10846)

This is an automated email from the ASF dual-hosted git repository.

mjsax pushed a commit to branch 2.8
in repository https://gitbox.apache.org/repos/asf/kafka.git


The following commit(s) were added to refs/heads/2.8 by this push:
     new b748340  KAFKA-12914: StreamSourceNode should return `null` topic name for pattern subscription (#10846)
b748340 is described below

commit b748340798028092fce5472490d2e8d53b9a0074
Author: Matthias J. Sax <ma...@confluent.io>
AuthorDate: Tue Jun 15 00:59:55 2021 -0700

    KAFKA-12914: StreamSourceNode should return `null` topic name for pattern subscription (#10846)
    
    Reviewers: Luke Chen <sh...@gmail.com>, Bruno Cadonna <br...@confluent.io>, Guozhang Wang <gu...@confluent.io>
---
 .../streams/kstream/internals/InternalStreamsBuilder.java    | 11 ++++++-----
 .../streams/kstream/internals/graph/SourceGraphNode.java     |  9 +++++----
 .../streams/kstream/internals/graph/StreamSourceNode.java    | 11 +++++------
 .../streams/kstream/internals/graph/TableSourceNode.java     | 12 +++++++++++-
 4 files changed, 27 insertions(+), 16 deletions(-)

diff --git a/streams/src/main/java/org/apache/kafka/streams/kstream/internals/InternalStreamsBuilder.java b/streams/src/main/java/org/apache/kafka/streams/kstream/internals/InternalStreamsBuilder.java
index 37ece74..cd59427 100644
--- a/streams/src/main/java/org/apache/kafka/streams/kstream/internals/InternalStreamsBuilder.java
+++ b/streams/src/main/java/org/apache/kafka/streams/kstream/internals/InternalStreamsBuilder.java
@@ -330,16 +330,17 @@ public class InternalStreamsBuilder implements InternalNameProvider {
             if (graphNode instanceof StreamSourceNode) {
                 final StreamSourceNode<?, ?> currentSourceNode = (StreamSourceNode<?, ?>) graphNode;
 
-                if (currentSourceNode.topicPattern() != null) {
-                    if (!patternsToSourceNodes.containsKey(currentSourceNode.topicPattern())) {
-                        patternsToSourceNodes.put(currentSourceNode.topicPattern(), currentSourceNode);
+                if (currentSourceNode.topicPattern().isPresent()) {
+                    final Pattern topicPattern = currentSourceNode.topicPattern().get();
+                    if (!patternsToSourceNodes.containsKey(topicPattern)) {
+                        patternsToSourceNodes.put(topicPattern, currentSourceNode);
                     } else {
-                        final StreamSourceNode<?, ?> mainSourceNode = patternsToSourceNodes.get(currentSourceNode.topicPattern());
+                        final StreamSourceNode<?, ?> mainSourceNode = patternsToSourceNodes.get(topicPattern);
                         mainSourceNode.merge(currentSourceNode);
                         root.removeChild(graphNode);
                     }
                 } else {
-                    for (final String topic : currentSourceNode.topicNames()) {
+                    for (final String topic : currentSourceNode.topicNames().get()) {
                         if (!topicsToSourceNodes.containsKey(topic)) {
                             topicsToSourceNodes.put(topic, currentSourceNode);
                         } else {
diff --git a/streams/src/main/java/org/apache/kafka/streams/kstream/internals/graph/SourceGraphNode.java b/streams/src/main/java/org/apache/kafka/streams/kstream/internals/graph/SourceGraphNode.java
index affce1b..0505227 100644
--- a/streams/src/main/java/org/apache/kafka/streams/kstream/internals/graph/SourceGraphNode.java
+++ b/streams/src/main/java/org/apache/kafka/streams/kstream/internals/graph/SourceGraphNode.java
@@ -19,6 +19,7 @@ package org.apache.kafka.streams.kstream.internals.graph;
 import java.util.Collection;
 import java.util.Collections;
 import java.util.HashSet;
+import java.util.Optional;
 import java.util.Set;
 import java.util.regex.Pattern;
 import org.apache.kafka.common.serialization.Serde;
@@ -51,12 +52,12 @@ abstract public class SourceGraphNode<K, V> extends GraphNode {
         this.consumedInternal = consumedInternal;
     }
 
-    public Set<String> topicNames() {
-        return Collections.unmodifiableSet(topicNames);
+    public Optional<Set<String>> topicNames() {
+        return topicNames == null ? Optional.empty() : Optional.of(Collections.unmodifiableSet(topicNames));
     }
 
-    public Pattern topicPattern() {
-        return topicPattern;
+    public Optional<Pattern> topicPattern() {
+        return Optional.ofNullable(topicPattern);
     }
 
     public ConsumedInternal<K, V> consumedInternal() {
diff --git a/streams/src/main/java/org/apache/kafka/streams/kstream/internals/graph/StreamSourceNode.java b/streams/src/main/java/org/apache/kafka/streams/kstream/internals/graph/StreamSourceNode.java
index f4f9842..81cd569 100644
--- a/streams/src/main/java/org/apache/kafka/streams/kstream/internals/graph/StreamSourceNode.java
+++ b/streams/src/main/java/org/apache/kafka/streams/kstream/internals/graph/StreamSourceNode.java
@@ -62,8 +62,8 @@ public class StreamSourceNode<K, V> extends SourceGraphNode<K, V> {
     @Override
     public String toString() {
         return "StreamSourceNode{" +
-               "topicNames=" + topicNames() +
-               ", topicPattern=" + topicPattern() +
+               "topicNames=" + (topicNames().isPresent() ? topicNames().get() : null) +
+               ", topicPattern=" + (topicPattern().isPresent() ? topicPattern().get() : null) +
                ", consumedInternal=" + consumedInternal() +
                "} " + super.toString();
     }
@@ -71,21 +71,20 @@ public class StreamSourceNode<K, V> extends SourceGraphNode<K, V> {
     @Override
     public void writeToTopology(final InternalTopologyBuilder topologyBuilder) {
 
-        if (topicPattern() != null) {
+        if (topicPattern().isPresent()) {
             topologyBuilder.addSource(consumedInternal().offsetResetPolicy(),
                                       nodeName(),
                                       consumedInternal().timestampExtractor(),
                                       consumedInternal().keyDeserializer(),
                                       consumedInternal().valueDeserializer(),
-                                      topicPattern());
+                                      topicPattern().get());
         } else {
             topologyBuilder.addSource(consumedInternal().offsetResetPolicy(),
                                       nodeName(),
                                       consumedInternal().timestampExtractor(),
                                       consumedInternal().keyDeserializer(),
                                       consumedInternal().valueDeserializer(),
-                                      topicNames().toArray(new String[0]));
-
+                                      topicNames().get().toArray(new String[0]));
         }
     }
 
diff --git a/streams/src/main/java/org/apache/kafka/streams/kstream/internals/graph/TableSourceNode.java b/streams/src/main/java/org/apache/kafka/streams/kstream/internals/graph/TableSourceNode.java
index 203f3af..3b35673 100644
--- a/streams/src/main/java/org/apache/kafka/streams/kstream/internals/graph/TableSourceNode.java
+++ b/streams/src/main/java/org/apache/kafka/streams/kstream/internals/graph/TableSourceNode.java
@@ -17,6 +17,7 @@
 
 package org.apache.kafka.streams.kstream.internals.graph;
 
+import java.util.Iterator;
 import org.apache.kafka.common.utils.Bytes;
 import org.apache.kafka.streams.kstream.internals.ConsumedInternal;
 import org.apache.kafka.streams.kstream.internals.KTableSource;
@@ -82,7 +83,16 @@ public class TableSourceNode<K, V> extends SourceGraphNode<K, V> {
     @Override
     @SuppressWarnings("unchecked")
     public void writeToTopology(final InternalTopologyBuilder topologyBuilder) {
-        final String topicName = topicNames().iterator().next();
+        final String topicName;
+        if (topicNames().isPresent()) {
+            final Iterator<String> topicNames = topicNames().get().iterator();
+            topicName = topicNames.next();
+            if (topicNames.hasNext()) {
+                throw new IllegalStateException("A table source node must have a single topic as input");
+            }
+        } else {
+            throw new IllegalStateException("A table source node must have a single topic as input");
+        }
 
         // TODO: we assume source KTables can only be timestamped-key-value stores for now.
         // should be expanded for other types of stores as well.