You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@kafka.apache.org by mj...@apache.org on 2021/06/15 08:14:48 UTC
[kafka] branch 2.8 updated: KAFKA-12914: StreamSourceNode should
return `null` topic name for pattern subscription (#10846)
This is an automated email from the ASF dual-hosted git repository.
mjsax pushed a commit to branch 2.8
in repository https://gitbox.apache.org/repos/asf/kafka.git
The following commit(s) were added to refs/heads/2.8 by this push:
new b748340 KAFKA-12914: StreamSourceNode should return `null` topic name for pattern subscription (#10846)
b748340 is described below
commit b748340798028092fce5472490d2e8d53b9a0074
Author: Matthias J. Sax <ma...@confluent.io>
AuthorDate: Tue Jun 15 00:59:55 2021 -0700
KAFKA-12914: StreamSourceNode should return `null` topic name for pattern subscription (#10846)
Reviewers: Luke Chen <sh...@gmail.com>, Bruno Cadonna <br...@confluent.io>, Guozhang Wang <gu...@confluent.io>
---
.../streams/kstream/internals/InternalStreamsBuilder.java | 11 ++++++-----
.../streams/kstream/internals/graph/SourceGraphNode.java | 9 +++++----
.../streams/kstream/internals/graph/StreamSourceNode.java | 11 +++++------
.../streams/kstream/internals/graph/TableSourceNode.java | 12 +++++++++++-
4 files changed, 27 insertions(+), 16 deletions(-)
diff --git a/streams/src/main/java/org/apache/kafka/streams/kstream/internals/InternalStreamsBuilder.java b/streams/src/main/java/org/apache/kafka/streams/kstream/internals/InternalStreamsBuilder.java
index 37ece74..cd59427 100644
--- a/streams/src/main/java/org/apache/kafka/streams/kstream/internals/InternalStreamsBuilder.java
+++ b/streams/src/main/java/org/apache/kafka/streams/kstream/internals/InternalStreamsBuilder.java
@@ -330,16 +330,17 @@ public class InternalStreamsBuilder implements InternalNameProvider {
if (graphNode instanceof StreamSourceNode) {
final StreamSourceNode<?, ?> currentSourceNode = (StreamSourceNode<?, ?>) graphNode;
- if (currentSourceNode.topicPattern() != null) {
- if (!patternsToSourceNodes.containsKey(currentSourceNode.topicPattern())) {
- patternsToSourceNodes.put(currentSourceNode.topicPattern(), currentSourceNode);
+ if (currentSourceNode.topicPattern().isPresent()) {
+ final Pattern topicPattern = currentSourceNode.topicPattern().get();
+ if (!patternsToSourceNodes.containsKey(topicPattern)) {
+ patternsToSourceNodes.put(topicPattern, currentSourceNode);
} else {
- final StreamSourceNode<?, ?> mainSourceNode = patternsToSourceNodes.get(currentSourceNode.topicPattern());
+ final StreamSourceNode<?, ?> mainSourceNode = patternsToSourceNodes.get(topicPattern);
mainSourceNode.merge(currentSourceNode);
root.removeChild(graphNode);
}
} else {
- for (final String topic : currentSourceNode.topicNames()) {
+ for (final String topic : currentSourceNode.topicNames().get()) {
if (!topicsToSourceNodes.containsKey(topic)) {
topicsToSourceNodes.put(topic, currentSourceNode);
} else {
diff --git a/streams/src/main/java/org/apache/kafka/streams/kstream/internals/graph/SourceGraphNode.java b/streams/src/main/java/org/apache/kafka/streams/kstream/internals/graph/SourceGraphNode.java
index affce1b..0505227 100644
--- a/streams/src/main/java/org/apache/kafka/streams/kstream/internals/graph/SourceGraphNode.java
+++ b/streams/src/main/java/org/apache/kafka/streams/kstream/internals/graph/SourceGraphNode.java
@@ -19,6 +19,7 @@ package org.apache.kafka.streams.kstream.internals.graph;
import java.util.Collection;
import java.util.Collections;
import java.util.HashSet;
+import java.util.Optional;
import java.util.Set;
import java.util.regex.Pattern;
import org.apache.kafka.common.serialization.Serde;
@@ -51,12 +52,12 @@ abstract public class SourceGraphNode<K, V> extends GraphNode {
this.consumedInternal = consumedInternal;
}
- public Set<String> topicNames() {
- return Collections.unmodifiableSet(topicNames);
+ public Optional<Set<String>> topicNames() {
+ return topicNames == null ? Optional.empty() : Optional.of(Collections.unmodifiableSet(topicNames));
}
- public Pattern topicPattern() {
- return topicPattern;
+ public Optional<Pattern> topicPattern() {
+ return Optional.ofNullable(topicPattern);
}
public ConsumedInternal<K, V> consumedInternal() {
diff --git a/streams/src/main/java/org/apache/kafka/streams/kstream/internals/graph/StreamSourceNode.java b/streams/src/main/java/org/apache/kafka/streams/kstream/internals/graph/StreamSourceNode.java
index f4f9842..81cd569 100644
--- a/streams/src/main/java/org/apache/kafka/streams/kstream/internals/graph/StreamSourceNode.java
+++ b/streams/src/main/java/org/apache/kafka/streams/kstream/internals/graph/StreamSourceNode.java
@@ -62,8 +62,8 @@ public class StreamSourceNode<K, V> extends SourceGraphNode<K, V> {
@Override
public String toString() {
return "StreamSourceNode{" +
- "topicNames=" + topicNames() +
- ", topicPattern=" + topicPattern() +
+ "topicNames=" + (topicNames().isPresent() ? topicNames().get() : null) +
+ ", topicPattern=" + (topicPattern().isPresent() ? topicPattern().get() : null) +
", consumedInternal=" + consumedInternal() +
"} " + super.toString();
}
@@ -71,21 +71,20 @@ public class StreamSourceNode<K, V> extends SourceGraphNode<K, V> {
@Override
public void writeToTopology(final InternalTopologyBuilder topologyBuilder) {
- if (topicPattern() != null) {
+ if (topicPattern().isPresent()) {
topologyBuilder.addSource(consumedInternal().offsetResetPolicy(),
nodeName(),
consumedInternal().timestampExtractor(),
consumedInternal().keyDeserializer(),
consumedInternal().valueDeserializer(),
- topicPattern());
+ topicPattern().get());
} else {
topologyBuilder.addSource(consumedInternal().offsetResetPolicy(),
nodeName(),
consumedInternal().timestampExtractor(),
consumedInternal().keyDeserializer(),
consumedInternal().valueDeserializer(),
- topicNames().toArray(new String[0]));
-
+ topicNames().get().toArray(new String[0]));
}
}
diff --git a/streams/src/main/java/org/apache/kafka/streams/kstream/internals/graph/TableSourceNode.java b/streams/src/main/java/org/apache/kafka/streams/kstream/internals/graph/TableSourceNode.java
index 203f3af..3b35673 100644
--- a/streams/src/main/java/org/apache/kafka/streams/kstream/internals/graph/TableSourceNode.java
+++ b/streams/src/main/java/org/apache/kafka/streams/kstream/internals/graph/TableSourceNode.java
@@ -17,6 +17,7 @@
package org.apache.kafka.streams.kstream.internals.graph;
+import java.util.Iterator;
import org.apache.kafka.common.utils.Bytes;
import org.apache.kafka.streams.kstream.internals.ConsumedInternal;
import org.apache.kafka.streams.kstream.internals.KTableSource;
@@ -82,7 +83,16 @@ public class TableSourceNode<K, V> extends SourceGraphNode<K, V> {
@Override
@SuppressWarnings("unchecked")
public void writeToTopology(final InternalTopologyBuilder topologyBuilder) {
- final String topicName = topicNames().iterator().next();
+ final String topicName;
+ if (topicNames().isPresent()) {
+ final Iterator<String> topicNames = topicNames().get().iterator();
+ topicName = topicNames.next();
+ if (topicNames.hasNext()) {
+ throw new IllegalStateException("A table source node must have a single topic as input");
+ }
+ } else {
+ throw new IllegalStateException("A table source node must have a single topic as input");
+ }
// TODO: we assume source KTables can only be timestamped-key-value stores for now.
// should be expanded for other types of stores as well.