You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@kafka.apache.org by bb...@apache.org on 2021/06/24 16:56:28 UTC

[kafka] 01/02: KAFKA-12336 Custom stream naming does not work while calling stream[K… (#10190)

This is an automated email from the ASF dual-hosted git repository.

bbejeck pushed a commit to branch 2.8
in repository https://gitbox.apache.org/repos/asf/kafka.git

commit 85000e1d33ba5bfba4088d4241be6c2da42625cf
Author: Geordie <g1...@gmail.com>
AuthorDate: Fri Jun 25 00:07:22 2021 +0800

    KAFKA-12336 Custom stream naming does not work while calling stream[K… (#10190)
    
    Custom stream naming does not work while calling stream[K, V](topicPattern: Pattern)
    
    Reviewers: Bill Bejeck <bb...@apache.org>
---
 .../kstream/internals/InternalStreamsBuilder.java    |  2 +-
 .../kafka/streams/scala/kstream/KStreamTest.scala    | 20 ++++++++++++++++++++
 2 files changed, 21 insertions(+), 1 deletion(-)

diff --git a/streams/src/main/java/org/apache/kafka/streams/kstream/internals/InternalStreamsBuilder.java b/streams/src/main/java/org/apache/kafka/streams/kstream/internals/InternalStreamsBuilder.java
index cd59427..01ebfbd 100644
--- a/streams/src/main/java/org/apache/kafka/streams/kstream/internals/InternalStreamsBuilder.java
+++ b/streams/src/main/java/org/apache/kafka/streams/kstream/internals/InternalStreamsBuilder.java
@@ -102,7 +102,7 @@ public class InternalStreamsBuilder implements InternalNameProvider {
 
     public <K, V> KStream<K, V> stream(final Pattern topicPattern,
                                        final ConsumedInternal<K, V> consumed) {
-        final String name = newProcessorName(KStreamImpl.SOURCE_NAME);
+        final String name = new NamedInternal(consumed.name()).orElseGenerateWithPrefix(this, KStreamImpl.SOURCE_NAME);
         final StreamSourceNode<K, V> streamPatternSourceNode = new StreamSourceNode<>(name, topicPattern, consumed);
 
         addGraphNode(root, streamPatternSourceNode);
diff --git a/streams/streams-scala/src/test/scala/org/apache/kafka/streams/scala/kstream/KStreamTest.scala b/streams/streams-scala/src/test/scala/org/apache/kafka/streams/scala/kstream/KStreamTest.scala
index a7f7f58..d7e7e9a 100644
--- a/streams/streams-scala/src/test/scala/org/apache/kafka/streams/scala/kstream/KStreamTest.scala
+++ b/streams/streams-scala/src/test/scala/org/apache/kafka/streams/scala/kstream/KStreamTest.scala
@@ -18,6 +18,7 @@ package org.apache.kafka.streams.scala.kstream
 
 import java.time.Duration.ofSeconds
 import java.time.Instant
+import java.util.regex.Pattern
 
 import org.apache.kafka.streams.KeyValue
 import org.apache.kafka.streams.kstream.{
@@ -449,4 +450,23 @@ class KStreamTest extends FlatSpec with Matchers with TestDriver {
     val transformNode = builder.build().describe().subtopologies().asScala.head.nodes().asScala.toList(1)
     transformNode.name() shouldBe "my-name"
   }
+
+  @Test
+  def testSettingNameOnStream(): Unit = {
+    val builder = new StreamsBuilder()
+    val topicsPattern = "t-[A-Za-z0-9-].suffix"
+    val sinkTopic = "sink"
+
+    builder
+      .stream[String, String](Pattern.compile(topicsPattern))(
+        Consumed.`with`[String, String].withName("my-fancy-name")
+      )
+      .to(sinkTopic)
+
+    import scala.jdk.CollectionConverters._
+
+    val streamNode = builder.build().describe().subtopologies().asScala.head.nodes().asScala.head
+    assertEquals("my-fancy-name", streamNode.name())
+  }
+
 }