You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@kafka.apache.org by mj...@apache.org on 2020/02/11 22:28:12 UTC
[kafka] branch 2.5 updated: KAFKA-9483: Add Scala KStream#toTable
to the Streams DSL (#8024)
This is an automated email from the ASF dual-hosted git repository.
mjsax pushed a commit to branch 2.5
in repository https://gitbox.apache.org/repos/asf/kafka.git
The following commit(s) were added to refs/heads/2.5 by this push:
new 3c1eb10 KAFKA-9483: Add Scala KStream#toTable to the Streams DSL (#8024)
3c1eb10 is described below
commit 3c1eb1021c10f4561530e2a193b77f563cfdd284
Author: high.lee <ye...@daum.net>
AuthorDate: Wed Feb 12 07:15:43 2020 +0900
KAFKA-9483: Add Scala KStream#toTable to the Streams DSL (#8024)
Part of KIP-523
Reviewers: Matthias J. Sax <ma...@confluent.io>, Bill Bejeck <bi...@confluent.io>, John Roesler <jo...@confluent.io>
---
.../kafka/streams/scala/kstream/KStream.scala | 20 +++++++++++++++++
.../kafka/streams/scala/kstream/KStreamTest.scala | 25 ++++++++++++++++++++++
2 files changed, 45 insertions(+)
diff --git a/streams/streams-scala/src/main/scala/org/apache/kafka/streams/scala/kstream/KStream.scala b/streams/streams-scala/src/main/scala/org/apache/kafka/streams/scala/kstream/KStream.scala
index 5f3c6be..e8e63e8 100644
--- a/streams/streams-scala/src/main/scala/org/apache/kafka/streams/scala/kstream/KStream.scala
+++ b/streams/streams-scala/src/main/scala/org/apache/kafka/streams/scala/kstream/KStream.scala
@@ -300,6 +300,26 @@ class KStream[K, V](val inner: KStreamJ[K, V]) {
inner.to(extractor, produced)
/**
+ * Convert this stream to a [[KTable]].
+ *
+ * @return a [[KTable]] that contains the same records as this [[KStream]]
+ * @see `org.apache.kafka.streams.kstream.KStream#toTable`
+ */
+ def toTable: KTable[K, V] =
+ new KTable(inner.toTable)
+
+ /**
+ * Convert this stream to a [[KTable]].
+ *
+ * @param materialized a `Materialized` that describes how the `StateStore` for the resulting [[KTable]]
+ * should be materialized.
+ * @return a [[KTable]] that contains the same records as this [[KStream]]
+ * @see `org.apache.kafka.streams.kstream.KStream#toTable`
+ */
+ def toTable(materialized: Materialized[K, V, ByteArrayKeyValueStore]): KTable[K, V] =
+ new KTable(inner.toTable(materialized))
+
+ /**
* Transform each record of the input stream into zero or more records in the output stream (both key and value type
* can be altered arbitrarily).
* A `Transformer` (provided by the given `TransformerSupplier`) is applied to each input record
diff --git a/streams/streams-scala/src/test/scala/org/apache/kafka/streams/scala/kstream/KStreamTest.scala b/streams/streams-scala/src/test/scala/org/apache/kafka/streams/scala/kstream/KStreamTest.scala
index 14de557..e5a0aad 100644
--- a/streams/streams-scala/src/test/scala/org/apache/kafka/streams/scala/kstream/KStreamTest.scala
+++ b/streams/streams-scala/src/test/scala/org/apache/kafka/streams/scala/kstream/KStreamTest.scala
@@ -312,4 +312,29 @@ class KStreamTest extends FlatSpec with Matchers with TestDriver {
testDriver.close()
}
+
+ "join 2 KStreamToTables" should "join correctly records" in {
+ val builder = new StreamsBuilder()
+ val sourceTopic1 = "source1"
+ val sourceTopic2 = "source2"
+ val sinkTopic = "sink"
+
+ val table1 = builder.stream[String, String](sourceTopic1).toTable
+ val table2 = builder.stream[String, String](sourceTopic2).toTable
+ table1.join(table2)((a, b) => a + b).toStream.to(sinkTopic)
+
+ val testDriver = createTestDriver(builder)
+ val testInput1 = testDriver.createInput[String, String](sourceTopic1)
+ val testInput2 = testDriver.createInput[String, String](sourceTopic2)
+ val testOutput = testDriver.createOutput[String, String](sinkTopic)
+
+ testInput1.pipeInput("1", "topic1value1")
+ testInput2.pipeInput("1", "topic2value1")
+
+ testOutput.readValue shouldBe "topic1value1topic2value1"
+
+ testOutput.isEmpty shouldBe true
+
+ testDriver.close()
+ }
}