You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@kafka.apache.org by bb...@apache.org on 2021/06/24 16:56:27 UTC

[kafka] branch 2.8 updated (bab9398 -> 74d2f8a)

This is an automated email from the ASF dual-hosted git repository.

bbejeck pushed a change to branch 2.8
in repository https://gitbox.apache.org/repos/asf/kafka.git.


    from bab9398  KAFKA-12991; Fix unsafe access to `AbstractCoordinator.state` (#10879)
     new 85000e1  KAFKA-12336 Custom stream naming does not work while calling stream[K… (#10190)
     new 74d2f8a  Update cherry-pick to fit 2.8 testing style for scala

The 2 revisions listed above as "new" are entirely new to this
repository and will be described in separate emails.  The revisions
listed as "add" were already present in the repository and have only
been added to this reference.


Summary of changes:
 .../kstream/internals/InternalStreamsBuilder.java     |  2 +-
 .../kafka/streams/scala/kstream/KStreamTest.scala     | 19 +++++++++++++++++++
 2 files changed, 20 insertions(+), 1 deletion(-)

[kafka] 02/02: Update cherry-pick to fit 2.8 testing style for scala

Posted by bb...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

bbejeck pushed a commit to branch 2.8
in repository https://gitbox.apache.org/repos/asf/kafka.git

commit 74d2f8abb4c8b9ec74578cd492ad4ada152458f3
Author: bill <bb...@gmail.com>
AuthorDate: Thu Jun 24 12:54:59 2021 -0400

    Update cherry-pick to fit 2.8 testing style for scala
---
 .../scala/org/apache/kafka/streams/scala/kstream/KStreamTest.scala   | 5 ++---
 1 file changed, 2 insertions(+), 3 deletions(-)

diff --git a/streams/streams-scala/src/test/scala/org/apache/kafka/streams/scala/kstream/KStreamTest.scala b/streams/streams-scala/src/test/scala/org/apache/kafka/streams/scala/kstream/KStreamTest.scala
index d7e7e9a..34fcd14 100644
--- a/streams/streams-scala/src/test/scala/org/apache/kafka/streams/scala/kstream/KStreamTest.scala
+++ b/streams/streams-scala/src/test/scala/org/apache/kafka/streams/scala/kstream/KStreamTest.scala
@@ -451,8 +451,7 @@ class KStreamTest extends FlatSpec with Matchers with TestDriver {
     transformNode.name() shouldBe "my-name"
   }
 
-  @Test
-  def testSettingNameOnStream(): Unit = {
+  "Setting a name for a pattern source" should "pass the name to the topology" in {
     val builder = new StreamsBuilder()
     val topicsPattern = "t-[A-Za-z0-9-].suffix"
     val sinkTopic = "sink"
@@ -466,7 +465,7 @@ class KStreamTest extends FlatSpec with Matchers with TestDriver {
     import scala.jdk.CollectionConverters._
 
     val streamNode = builder.build().describe().subtopologies().asScala.head.nodes().asScala.head
-    assertEquals("my-fancy-name", streamNode.name())
+    streamNode.name() shouldBe "my-fancy-name"
   }
 
 }

[kafka] 01/02: KAFKA-12336 Custom stream naming does not work while calling stream[K… (#10190)

Posted by bb...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

bbejeck pushed a commit to branch 2.8
in repository https://gitbox.apache.org/repos/asf/kafka.git

commit 85000e1d33ba5bfba4088d4241be6c2da42625cf
Author: Geordie <g1...@gmail.com>
AuthorDate: Fri Jun 25 00:07:22 2021 +0800

    KAFKA-12336 Custom stream naming does not work while calling stream[K… (#10190)
    
    Custom stream naming does not work while calling stream[K, V](topicPattern: Pattern)
    
    Reviewers: Bill Bejeck <bb...@apache.org>
---
 .../kstream/internals/InternalStreamsBuilder.java    |  2 +-
 .../kafka/streams/scala/kstream/KStreamTest.scala    | 20 ++++++++++++++++++++
 2 files changed, 21 insertions(+), 1 deletion(-)

diff --git a/streams/src/main/java/org/apache/kafka/streams/kstream/internals/InternalStreamsBuilder.java b/streams/src/main/java/org/apache/kafka/streams/kstream/internals/InternalStreamsBuilder.java
index cd59427..01ebfbd 100644
--- a/streams/src/main/java/org/apache/kafka/streams/kstream/internals/InternalStreamsBuilder.java
+++ b/streams/src/main/java/org/apache/kafka/streams/kstream/internals/InternalStreamsBuilder.java
@@ -102,7 +102,7 @@ public class InternalStreamsBuilder implements InternalNameProvider {
 
     public <K, V> KStream<K, V> stream(final Pattern topicPattern,
                                        final ConsumedInternal<K, V> consumed) {
-        final String name = newProcessorName(KStreamImpl.SOURCE_NAME);
+        final String name = new NamedInternal(consumed.name()).orElseGenerateWithPrefix(this, KStreamImpl.SOURCE_NAME);
         final StreamSourceNode<K, V> streamPatternSourceNode = new StreamSourceNode<>(name, topicPattern, consumed);
 
         addGraphNode(root, streamPatternSourceNode);
diff --git a/streams/streams-scala/src/test/scala/org/apache/kafka/streams/scala/kstream/KStreamTest.scala b/streams/streams-scala/src/test/scala/org/apache/kafka/streams/scala/kstream/KStreamTest.scala
index a7f7f58..d7e7e9a 100644
--- a/streams/streams-scala/src/test/scala/org/apache/kafka/streams/scala/kstream/KStreamTest.scala
+++ b/streams/streams-scala/src/test/scala/org/apache/kafka/streams/scala/kstream/KStreamTest.scala
@@ -18,6 +18,7 @@ package org.apache.kafka.streams.scala.kstream
 
 import java.time.Duration.ofSeconds
 import java.time.Instant
+import java.util.regex.Pattern
 
 import org.apache.kafka.streams.KeyValue
 import org.apache.kafka.streams.kstream.{
@@ -449,4 +450,23 @@ class KStreamTest extends FlatSpec with Matchers with TestDriver {
     val transformNode = builder.build().describe().subtopologies().asScala.head.nodes().asScala.toList(1)
     transformNode.name() shouldBe "my-name"
   }
+
+  @Test
+  def testSettingNameOnStream(): Unit = {
+    val builder = new StreamsBuilder()
+    val topicsPattern = "t-[A-Za-z0-9-].suffix"
+    val sinkTopic = "sink"
+
+    builder
+      .stream[String, String](Pattern.compile(topicsPattern))(
+        Consumed.`with`[String, String].withName("my-fancy-name")
+      )
+      .to(sinkTopic)
+
+    import scala.jdk.CollectionConverters._
+
+    val streamNode = builder.build().describe().subtopologies().asScala.head.nodes().asScala.head
+    assertEquals("my-fancy-name", streamNode.name())
+  }
+
 }