You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@kafka.apache.org by mj...@apache.org on 2020/02/11 22:28:12 UTC

[kafka] branch 2.5 updated: KAFKA-9483: Add Scala KStream#toTable to the Streams DSL (#8024)

This is an automated email from the ASF dual-hosted git repository.

mjsax pushed a commit to branch 2.5
in repository https://gitbox.apache.org/repos/asf/kafka.git


The following commit(s) were added to refs/heads/2.5 by this push:
     new 3c1eb10  KAFKA-9483: Add Scala KStream#toTable to the Streams DSL (#8024)
3c1eb10 is described below

commit 3c1eb1021c10f4561530e2a193b77f563cfdd284
Author: high.lee <ye...@daum.net>
AuthorDate: Wed Feb 12 07:15:43 2020 +0900

    KAFKA-9483: Add Scala KStream#toTable to the Streams DSL (#8024)
    
    Part of KIP-523
    
    Reviewers: Matthias J. Sax <ma...@confluent.io>, Bill Bejeck <bi...@confluent.io>, John Roesler <jo...@confluent.io>
---
 .../kafka/streams/scala/kstream/KStream.scala      | 20 +++++++++++++++++
 .../kafka/streams/scala/kstream/KStreamTest.scala  | 25 ++++++++++++++++++++++
 2 files changed, 45 insertions(+)

diff --git a/streams/streams-scala/src/main/scala/org/apache/kafka/streams/scala/kstream/KStream.scala b/streams/streams-scala/src/main/scala/org/apache/kafka/streams/scala/kstream/KStream.scala
index 5f3c6be..e8e63e8 100644
--- a/streams/streams-scala/src/main/scala/org/apache/kafka/streams/scala/kstream/KStream.scala
+++ b/streams/streams-scala/src/main/scala/org/apache/kafka/streams/scala/kstream/KStream.scala
@@ -300,6 +300,26 @@ class KStream[K, V](val inner: KStreamJ[K, V]) {
     inner.to(extractor, produced)
 
   /**
+   * Convert this stream to a [[KTable]].
+   *
+   * @return a [[KTable]] that contains the same records as this [[KStream]]
+   * @see `org.apache.kafka.streams.kstream.KStream#toTable`
+   */
+  def toTable: KTable[K, V] =
+    new KTable(inner.toTable)
+
+  /**
+   * Convert this stream to a [[KTable]].
+   *
+   * @param materialized  a `Materialized` that describes how the `StateStore` for the resulting [[KTable]]
+   *                      should be materialized.
+   * @return a [[KTable]] that contains the same records as this [[KStream]]
+   * @see `org.apache.kafka.streams.kstream.KStream#toTable`
+   */
+  def toTable(materialized: Materialized[K, V, ByteArrayKeyValueStore]): KTable[K, V] =
+    new KTable(inner.toTable(materialized))
+
+  /**
    * Transform each record of the input stream into zero or more records in the output stream (both key and value type
    * can be altered arbitrarily).
    * A `Transformer` (provided by the given `TransformerSupplier`) is applied to each input record
diff --git a/streams/streams-scala/src/test/scala/org/apache/kafka/streams/scala/kstream/KStreamTest.scala b/streams/streams-scala/src/test/scala/org/apache/kafka/streams/scala/kstream/KStreamTest.scala
index 14de557..e5a0aad 100644
--- a/streams/streams-scala/src/test/scala/org/apache/kafka/streams/scala/kstream/KStreamTest.scala
+++ b/streams/streams-scala/src/test/scala/org/apache/kafka/streams/scala/kstream/KStreamTest.scala
@@ -312,4 +312,29 @@ class KStreamTest extends FlatSpec with Matchers with TestDriver {
 
     testDriver.close()
   }
+
+  "join 2 KStreamToTables" should "join correctly records" in {
+    val builder = new StreamsBuilder()
+    val sourceTopic1 = "source1"
+    val sourceTopic2 = "source2"
+    val sinkTopic = "sink"
+
+    val table1 = builder.stream[String, String](sourceTopic1).toTable
+    val table2 = builder.stream[String, String](sourceTopic2).toTable
+    table1.join(table2)((a, b) => a + b).toStream.to(sinkTopic)
+
+    val testDriver = createTestDriver(builder)
+    val testInput1 = testDriver.createInput[String, String](sourceTopic1)
+    val testInput2 = testDriver.createInput[String, String](sourceTopic2)
+    val testOutput = testDriver.createOutput[String, String](sinkTopic)
+
+    testInput1.pipeInput("1", "topic1value1")
+    testInput2.pipeInput("1", "topic2value1")
+
+    testOutput.readValue shouldBe "topic1value1topic2value1"
+
+    testOutput.isEmpty shouldBe true
+
+    testDriver.close()
+  }
 }