You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@kafka.apache.org by gu...@apache.org on 2018/10/16 14:25:00 UTC
[kafka] branch trunk updated: MINOR: Update Streams Scala API for
addition of Grouped (#5793)
This is an automated email from the ASF dual-hosted git repository.
guozhang pushed a commit to branch trunk
in repository https://gitbox.apache.org/repos/asf/kafka.git
The following commit(s) were added to refs/heads/trunk by this push:
new 86b1150 MINOR: Update Streams Scala API for addition of Grouped (#5793)
86b1150 is described below
commit 86b1150e18d3fe0a0e3019e034e2a6f0204f7a17
Author: Bill Bejeck <bb...@gmail.com>
AuthorDate: Tue Oct 16 07:24:50 2018 -0700
MINOR: Update Streams Scala API for addition of Grouped (#5793)
While working on the documentation updates I realized the Streams Scala API needs
to get updated for the addition of Grouped
Added a test for Grouped.scala ran all streams-scala tests and streams tests
Reviewers: Matthias J. Sax <ma...@confluent.io>, John Roesler <jo...@confluent.io>, Guozhang Wang <gu...@confluent.io>
---
docs/streams/upgrade-guide.html | 3 ++
.../streams/kstream/internals/GroupedInternal.java | 2 +-
.../kafka/streams/scala/ImplicitConversions.scala | 12 +++---
.../kstream/{Serialized.scala => Grouped.scala} | 32 +++++++++++----
.../kafka/streams/scala/kstream/Joined.scala | 22 ++++++++++
.../kafka/streams/scala/kstream/KStream.scala | 28 ++++++-------
.../kafka/streams/scala/kstream/KTable.scala | 11 +++--
.../kafka/streams/scala/kstream/package.scala | 2 +-
...bleJoinScalaIntegrationTestImplicitSerdes.scala | 48 +++++++++++++++++++---
.../apache/kafka/streams/scala/TopologyTest.scala | 2 +-
.../{SerializedTest.scala => GroupedTest.scala} | 30 +++++++++-----
.../kafka/streams/scala/kstream/JoinedTest.scala | 10 +++++
12 files changed, 150 insertions(+), 52 deletions(-)
diff --git a/docs/streams/upgrade-guide.html b/docs/streams/upgrade-guide.html
index 660b817..e79b106 100644
--- a/docs/streams/upgrade-guide.html
+++ b/docs/streams/upgrade-guide.html
@@ -88,6 +88,9 @@
Additionally, we've updated the <code>Joined</code> class with a new method <code>Joined#withName</code>
enabling users to name any repartition topics required for performing Stream/Stream or Stream/Table join. For more details repartition
topic naming, see <a href="https://cwiki.apache.org/confluence/display/KAFKA/KIP-372%3A+Naming+Repartition+Topics+for+Joins+and+Grouping">KIP-372</a>.
+
+ As a result we've updated the Kafka Streams Scala API and removed the <code>Serialized</code> class in favor of adding <code>Grouped</code>.
+ If you just rely on the implicit <code>Serialized</code>, you just need to recompile; if you pass in <code>Serialized</code> explicitly, sorry you'll have to make code changes.
</p>
<p>
diff --git a/streams/src/main/java/org/apache/kafka/streams/kstream/internals/GroupedInternal.java b/streams/src/main/java/org/apache/kafka/streams/kstream/internals/GroupedInternal.java
index 2360fc6..3569caa 100644
--- a/streams/src/main/java/org/apache/kafka/streams/kstream/internals/GroupedInternal.java
+++ b/streams/src/main/java/org/apache/kafka/streams/kstream/internals/GroupedInternal.java
@@ -22,7 +22,7 @@ import org.apache.kafka.streams.kstream.Grouped;
public class GroupedInternal<K, V> extends Grouped<K, V> {
- GroupedInternal(final Grouped<K, V> grouped) {
+ public GroupedInternal(final Grouped<K, V> grouped) {
super(grouped);
}
diff --git a/streams/streams-scala/src/main/scala/org/apache/kafka/streams/scala/ImplicitConversions.scala b/streams/streams-scala/src/main/scala/org/apache/kafka/streams/scala/ImplicitConversions.scala
index c2ac1ff..f62da2e 100644
--- a/streams/streams-scala/src/main/scala/org/apache/kafka/streams/scala/ImplicitConversions.scala
+++ b/streams/streams-scala/src/main/scala/org/apache/kafka/streams/scala/ImplicitConversions.scala
@@ -19,6 +19,8 @@
*/
package org.apache.kafka.streams.scala
+import org.apache.kafka.common.serialization.Serde
+import org.apache.kafka.streams.KeyValue
import org.apache.kafka.streams.kstream.{
KGroupedStream => KGroupedStreamJ,
KGroupedTable => KGroupedTableJ,
@@ -27,12 +29,10 @@ import org.apache.kafka.streams.kstream.{
SessionWindowedKStream => SessionWindowedKStreamJ,
TimeWindowedKStream => TimeWindowedKStreamJ
}
+import org.apache.kafka.streams.processor.StateStore
import org.apache.kafka.streams.scala.kstream._
-import org.apache.kafka.streams.KeyValue
-import org.apache.kafka.common.serialization.Serde
import scala.language.implicitConversions
-import org.apache.kafka.streams.processor.StateStore
/**
* Implicit conversions between the Scala wrapper objects and the underlying Java
@@ -61,10 +61,10 @@ object ImplicitConversions {
implicit def tuple2ToKeyValue[K, V](tuple: (K, V)): KeyValue[K, V] = new KeyValue(tuple._1, tuple._2)
// we would also like to allow users implicit serdes
- // and these implicits will convert them to `Serialized`, `Produced` or `Consumed`
+ // and these implicits will convert them to `Grouped`, `Produced` or `Consumed`
- implicit def serializedFromSerde[K, V](implicit keySerde: Serde[K], valueSerde: Serde[V]): Serialized[K, V] =
- Serialized.`with`[K, V]
+ implicit def groupedFromSerde[K, V](implicit keySerde: Serde[K], valueSerde: Serde[V]): Grouped[K, V] =
+ Grouped.`with`[K, V]
implicit def consumedFromSerde[K, V](implicit keySerde: Serde[K], valueSerde: Serde[V]): Consumed[K, V] =
Consumed.`with`[K, V]
diff --git a/streams/streams-scala/src/main/scala/org/apache/kafka/streams/scala/kstream/Serialized.scala b/streams/streams-scala/src/main/scala/org/apache/kafka/streams/scala/kstream/Grouped.scala
similarity index 53%
rename from streams/streams-scala/src/main/scala/org/apache/kafka/streams/scala/kstream/Serialized.scala
rename to streams/streams-scala/src/main/scala/org/apache/kafka/streams/scala/kstream/Grouped.scala
index f48d9bf..355eb93 100644
--- a/streams/streams-scala/src/main/scala/org/apache/kafka/streams/scala/kstream/Serialized.scala
+++ b/streams/streams-scala/src/main/scala/org/apache/kafka/streams/scala/kstream/Grouped.scala
@@ -14,23 +14,39 @@
* See the License for the specific language governing permissions and
* limitations under the License.
*/
+
package org.apache.kafka.streams.scala.kstream
import org.apache.kafka.common.serialization.Serde
-import org.apache.kafka.streams.kstream.{Serialized => SerializedJ}
+import org.apache.kafka.streams.kstream.{Grouped => GroupedJ}
-object Serialized {
+object Grouped {
/**
- * Construct a `Serialized` instance with the provided key and value [[Serde]]s.
+ * Construct a `Grouped` instance with the provided key and value [[Serde]]s.
* If the [[Serde]] params are `null` the default serdes defined in the configs will be used.
*
- * @tparam K the key type
- * @tparam V the value type
+ * @tparam K the key type
+ * @tparam V the value type
* @param keySerde keySerde that will be used to materialize a stream
* @param valueSerde valueSerde that will be used to materialize a stream
- * @return a new instance of [[Serialized]] configured with the provided serdes
+ * @return a new instance of [[Grouped]] configured with the provided serdes
*/
- def `with`[K, V](implicit keySerde: Serde[K], valueSerde: Serde[V]): SerializedJ[K, V] =
- SerializedJ.`with`(keySerde, valueSerde)
+ def `with`[K, V](implicit keySerde: Serde[K], valueSerde: Serde[V]): GroupedJ[K, V] =
+ GroupedJ.`with`(keySerde, valueSerde)
+
+ /**
+ * Construct a `Grouped` instance with the provided key and value [[Serde]]s.
+ * If the [[Serde]] params are `null` the default serdes defined in the configs will be used.
+ *
+ * @tparam K the key type
+ * @tparam V the value type
+ * @param name the name used as part of a potential repartition topic
+ * @param keySerde keySerde that will be used to materialize a stream
+ * @param valueSerde valueSerde that will be used to materialize a stream
+ * @return a new instance of [[Grouped]] configured with the provided serdes
+ */
+ def `with`[K, V](name: String)(implicit keySerde: Serde[K], valueSerde: Serde[V]): GroupedJ[K, V] =
+ GroupedJ.`with`(name, keySerde, valueSerde)
+
}
diff --git a/streams/streams-scala/src/main/scala/org/apache/kafka/streams/scala/kstream/Joined.scala b/streams/streams-scala/src/main/scala/org/apache/kafka/streams/scala/kstream/Joined.scala
index ffd3e61..b6dbb05 100644
--- a/streams/streams-scala/src/main/scala/org/apache/kafka/streams/scala/kstream/Joined.scala
+++ b/streams/streams-scala/src/main/scala/org/apache/kafka/streams/scala/kstream/Joined.scala
@@ -39,4 +39,26 @@ object Joined {
otherValueSerde: Serde[VO]): JoinedJ[K, V, VO] =
JoinedJ.`with`(keySerde, valueSerde, otherValueSerde)
+ /**
+ * Create an instance of [[org.apache.kafka.streams.kstream.Joined]] with key, value, and otherValue [[Serde]]
+ * instances.
+ * `null` values are accepted and will be replaced by the default serdes as defined in config.
+ *
+ * @tparam K key type
+ * @tparam V value type
+ * @tparam VO other value type
+ * @param name name of possible repartition topic
+ * @param keySerde the key serde to use.
+ * @param valueSerde the value serde to use.
+ * @param otherValueSerde the otherValue serde to use. If `null` the default value serde from config will be used
+ * @return new [[org.apache.kafka.streams.kstream.Joined]] instance with the provided serdes
+ */
+ // disable spotless scala, which wants to make a mess of the argument lists
+ // format: off
+ def `with`[K, V, VO](name: String)
+ (implicit keySerde: Serde[K],
+ valueSerde: Serde[V],
+ otherValueSerde: Serde[VO]): JoinedJ[K, V, VO] =
+ JoinedJ.`with`(keySerde, valueSerde, otherValueSerde, name)
+ // format:on
}
diff --git a/streams/streams-scala/src/main/scala/org/apache/kafka/streams/scala/kstream/KStream.scala b/streams/streams-scala/src/main/scala/org/apache/kafka/streams/scala/kstream/KStream.scala
index d54ac5a..635975b 100644
--- a/streams/streams-scala/src/main/scala/org/apache/kafka/streams/scala/kstream/KStream.scala
+++ b/streams/streams-scala/src/main/scala/org/apache/kafka/streams/scala/kstream/KStream.scala
@@ -23,8 +23,8 @@ package kstream
import org.apache.kafka.streams.KeyValue
import org.apache.kafka.streams.kstream.{KStream => KStreamJ, _}
import org.apache.kafka.streams.processor.{Processor, ProcessorSupplier, TopicNameExtractor}
-import org.apache.kafka.streams.scala.ImplicitConversions._
import org.apache.kafka.streams.scala.FunctionsCompatConversions._
+import org.apache.kafka.streams.scala.ImplicitConversions._
import scala.collection.JavaConverters._
@@ -351,8 +351,8 @@ class KStream[K, V](val inner: KStreamJ[K, V]) {
/**
* Group the records by their current key into a [[KGroupedStream]]
* <p>
- * The user can either supply the `Serialized` instance as an implicit in scope or she can also provide an implicit
- * serdes that will be converted to a `Serialized` instance implicitly.
+ * The user can either supply the `Grouped` instance as an implicit in scope or she can also provide an implicit
+ * serdes that will be converted to a `Grouped` instance implicitly.
* <p>
* {{{
* Example:
@@ -365,28 +365,28 @@ class KStream[K, V](val inner: KStreamJ[K, V]) {
* .leftJoin(userRegionsTable, (clicks: Long, region: String) => (if (region == null) "UNKNOWN" else region, clicks))
* .map((_, regionWithClicks) => regionWithClicks)
*
- * // the groupByKey gets the Serialized instance through an implicit conversion of the
+ * // the groupByKey gets the Grouped instance through an implicit conversion of the
* // serdes brought into scope through the import Serdes._ above
* .groupByKey
* .reduce(_ + _)
*
- * // Similarly you can create an implicit Serialized and it will be passed implicitly
+ * // Similarly you can create an implicit Grouped and it will be passed implicitly
* // to the groupByKey call
* }}}
*
- * @param serialized the instance of Serialized that gives the serdes
+ * @param grouped the instance of Grouped that gives the serdes
* @return a [[KGroupedStream]] that contains the grouped records of the original [[KStream]]
* @see `org.apache.kafka.streams.kstream.KStream#groupByKey`
*/
- def groupByKey(implicit serialized: Serialized[K, V]): KGroupedStream[K, V] =
- inner.groupByKey(serialized)
+ def groupByKey(implicit grouped: Grouped[K, V]): KGroupedStream[K, V] =
+ inner.groupByKey(grouped)
/**
* Group the records of this [[KStream]] on a new key that is selected using the provided key transformation function
- * and the `Serialized` instance.
+ * and the `Grouped` instance.
* <p>
- * The user can either supply the `Serialized` instance as an implicit in scope or she can also provide an implicit
- * serdes that will be converted to a `Serialized` instance implicitly.
+ * The user can either supply the `Grouped` instance as an implicit in scope or she can also provide an implicit
+ * serdes that will be converted to a `Grouped` instance implicitly.
* <p>
* {{{
* Example:
@@ -401,7 +401,7 @@ class KStream[K, V](val inner: KStreamJ[K, V]) {
* val wordCounts: KTable[String, Long] =
* textLines.flatMapValues(v => pattern.split(v.toLowerCase))
*
- * // the groupBy gets the Serialized instance through an implicit conversion of the
+ * // the groupBy gets the Grouped instance through an implicit conversion of the
* // serdes brought into scope through the import Serdes._ above
* .groupBy((k, v) => v)
*
@@ -412,8 +412,8 @@ class KStream[K, V](val inner: KStreamJ[K, V]) {
* @return a [[KGroupedStream]] that contains the grouped records of the original [[KStream]]
* @see `org.apache.kafka.streams.kstream.KStream#groupBy`
*/
- def groupBy[KR](selector: (K, V) => KR)(implicit serialized: Serialized[KR, V]): KGroupedStream[KR, V] =
- inner.groupBy(selector.asKeyValueMapper, serialized)
+ def groupBy[KR](selector: (K, V) => KR)(implicit grouped: Grouped[KR, V]): KGroupedStream[KR, V] =
+ inner.groupBy(selector.asKeyValueMapper, grouped)
/**
* Join records of this stream with another [[KStream]]'s records using windowed inner equi join with
diff --git a/streams/streams-scala/src/main/scala/org/apache/kafka/streams/scala/kstream/KTable.scala b/streams/streams-scala/src/main/scala/org/apache/kafka/streams/scala/kstream/KTable.scala
index 881c8e0..9ac27ee 100644
--- a/streams/streams-scala/src/main/scala/org/apache/kafka/streams/scala/kstream/KTable.scala
+++ b/streams/streams-scala/src/main/scala/org/apache/kafka/streams/scala/kstream/KTable.scala
@@ -20,11 +20,10 @@
package org.apache.kafka.streams.scala
package kstream
-import org.apache.kafka.common.serialization.Serde
import org.apache.kafka.common.utils.Bytes
import org.apache.kafka.streams.kstream.{KTable => KTableJ, _}
-import org.apache.kafka.streams.scala.ImplicitConversions._
import org.apache.kafka.streams.scala.FunctionsCompatConversions._
+import org.apache.kafka.streams.scala.ImplicitConversions._
import org.apache.kafka.streams.state.KeyValueStore
/**
@@ -213,15 +212,15 @@ class KTable[K, V](val inner: KTableJ[K, V]) {
/**
* Re-groups the records of this [[KTable]] using the provided key/value mapper
- * and `Serde`s as specified by `Serialized`.
+ * and `Serde`s as specified by `Grouped`.
*
* @param selector a function that computes a new grouping key and value to be aggregated
- * @param serialized the `Serialized` instance used to specify `Serdes`
+ * @param grouped the `Grouped` instance used to specify `Serdes`
* @return a [[KGroupedTable]] that contains the re-grouped records of the original [[KTable]]
* @see `org.apache.kafka.streams.kstream.KTable#groupBy`
*/
- def groupBy[KR, VR](selector: (K, V) => (KR, VR))(implicit serialized: Serialized[KR, VR]): KGroupedTable[KR, VR] =
- inner.groupBy(selector.asKeyValueMapper, serialized)
+ def groupBy[KR, VR](selector: (K, V) => (KR, VR))(implicit grouped: Grouped[KR, VR]): KGroupedTable[KR, VR] =
+ inner.groupBy(selector.asKeyValueMapper, grouped)
/**
* Join records of this [[KTable]] with another [[KTable]]'s records using non-windowed inner equi join.
diff --git a/streams/streams-scala/src/main/scala/org/apache/kafka/streams/scala/kstream/package.scala b/streams/streams-scala/src/main/scala/org/apache/kafka/streams/scala/kstream/package.scala
index 842dd79..db4463b 100644
--- a/streams/streams-scala/src/main/scala/org/apache/kafka/streams/scala/kstream/package.scala
+++ b/streams/streams-scala/src/main/scala/org/apache/kafka/streams/scala/kstream/package.scala
@@ -20,7 +20,7 @@ import org.apache.kafka.streams.processor.StateStore
package object kstream {
type Materialized[K, V, S <: StateStore] = org.apache.kafka.streams.kstream.Materialized[K, V, S]
- type Serialized[K, V] = org.apache.kafka.streams.kstream.Serialized[K, V]
+ type Grouped[K, V] = org.apache.kafka.streams.kstream.Grouped[K, V]
type Consumed[K, V] = org.apache.kafka.streams.kstream.Consumed[K, V]
type Produced[K, V] = org.apache.kafka.streams.kstream.Produced[K, V]
type Joined[K, V, VO] = org.apache.kafka.streams.kstream.Joined[K, V, VO]
diff --git a/streams/streams-scala/src/test/scala/org/apache/kafka/streams/scala/StreamToTableJoinScalaIntegrationTestImplicitSerdes.scala b/streams/streams-scala/src/test/scala/org/apache/kafka/streams/scala/StreamToTableJoinScalaIntegrationTestImplicitSerdes.scala
index 44c3605..523418d 100644
--- a/streams/streams-scala/src/test/scala/org/apache/kafka/streams/scala/StreamToTableJoinScalaIntegrationTestImplicitSerdes.scala
+++ b/streams/streams-scala/src/test/scala/org/apache/kafka/streams/scala/StreamToTableJoinScalaIntegrationTestImplicitSerdes.scala
@@ -40,8 +40,48 @@ class StreamToTableJoinScalaIntegrationTestImplicitSerdes extends StreamToTableJ
@Test def testShouldCountClicksPerRegion(): Unit = {
- // DefaultSerdes brings into scope implicit serdes (mostly for primitives) that will set up all Serialized, Produced,
- // Consumed and Joined instances. So all APIs below that accept Serialized, Produced, Consumed or Joined will
+ // DefaultSerdes brings into scope implicit serdes (mostly for primitives) that will set up all Grouped, Produced,
+ // Consumed and Joined instances. So all APIs below that accept Grouped, Produced, Consumed or Joined will
+ // get these instances automatically
+ import Serdes._
+
+ val streamsConfiguration: Properties = getStreamsConfiguration()
+
+ val builder = new StreamsBuilder()
+
+ val userClicksStream: KStream[String, Long] = builder.stream(userClicksTopic)
+
+ val userRegionsTable: KTable[String, String] = builder.table(userRegionsTopic)
+
+ // Compute the total per region by summing the individual click counts per region.
+ val clicksPerRegion: KTable[String, Long] =
+ userClicksStream
+
+ // Join the stream against the table.
+ .leftJoin(userRegionsTable)((clicks, region) => (if (region == null) "UNKNOWN" else region, clicks))
+
+ // Change the stream from <user> -> <region, clicks> to <region> -> <clicks>
+ .map((_, regionWithClicks) => regionWithClicks)
+
+ // Compute the total per region by summing the individual click counts per region.
+ .groupByKey
+ .reduce(_ + _)
+
+ // Write the (continuously updating) results to the output topic.
+ clicksPerRegion.toStream.to(outputTopic)
+
+ val streams: KafkaStreams = new KafkaStreams(builder.build(), streamsConfiguration)
+ streams.start()
+
+ val actualClicksPerRegion: java.util.List[KeyValue[String, Long]] =
+ produceNConsume(userClicksTopic, userRegionsTopic, outputTopic)
+ streams.close()
+ }
+
+ @Test def testShouldCountClicksPerRegionWithNamedRepartitionTopic(): Unit = {
+
+ // DefaultSerdes brings into scope implicit serdes (mostly for primitives) that will set up all Grouped, Produced,
+ // Consumed and Joined instances. So all APIs below that accept Grouped, Produced, Consumed or Joined will
// get these instances automatically
import Serdes._
@@ -85,8 +125,6 @@ class StreamToTableJoinScalaIntegrationTestImplicitSerdes extends StreamToTableJ
import org.apache.kafka.streams.kstream.{KStream => KStreamJ, KTable => KTableJ, _}
import org.apache.kafka.streams.{KafkaStreams => KafkaStreamsJ, StreamsBuilder => StreamsBuilderJ}
- import collection.JavaConverters._
-
val streamsConfiguration: Properties = getStreamsConfiguration()
streamsConfiguration.put(StreamsConfig.DEFAULT_KEY_SERDE_CLASS_CONFIG, Serdes.String.getClass.getName)
@@ -122,7 +160,7 @@ class StreamToTableJoinScalaIntegrationTestImplicitSerdes extends StreamToTableJ
// Compute the total per region by summing the individual click counts per region.
val clicksPerRegion: KTableJ[String, JLong] = clicksByRegion
- .groupByKey(Serialized.`with`(Serdes.String, Serdes.JavaLong))
+ .groupByKey(Grouped.`with`[String, JLong](Serdes.String, Serdes.JavaLong))
.reduce {
new Reducer[JLong] {
def apply(v1: JLong, v2: JLong) = v1 + v2
diff --git a/streams/streams-scala/src/test/scala/org/apache/kafka/streams/scala/TopologyTest.scala b/streams/streams-scala/src/test/scala/org/apache/kafka/streams/scala/TopologyTest.scala
index 889e67c..a826401 100644
--- a/streams/streams-scala/src/test/scala/org/apache/kafka/streams/scala/TopologyTest.scala
+++ b/streams/streams-scala/src/test/scala/org/apache/kafka/streams/scala/TopologyTest.scala
@@ -191,7 +191,7 @@ class TopologyTest extends JUnitSuite {
// Compute the total per region by summing the individual click counts per region.
val clicksPerRegion: KTableJ[String, JLong] = clicksByRegion
- .groupByKey(Serialized.`with`[String, JLong])
+ .groupByKey(Grouped.`with`[String, JLong])
.reduce {
new Reducer[JLong] {
def apply(v1: JLong, v2: JLong) = v1 + v2
diff --git a/streams/streams-scala/src/test/scala/org/apache/kafka/streams/scala/kstream/SerializedTest.scala b/streams/streams-scala/src/test/scala/org/apache/kafka/streams/scala/kstream/GroupedTest.scala
similarity index 52%
rename from streams/streams-scala/src/test/scala/org/apache/kafka/streams/scala/kstream/SerializedTest.scala
rename to streams/streams-scala/src/test/scala/org/apache/kafka/streams/scala/kstream/GroupedTest.scala
index 4264fa5..728562a 100644
--- a/streams/streams-scala/src/test/scala/org/apache/kafka/streams/scala/kstream/SerializedTest.scala
+++ b/streams/streams-scala/src/test/scala/org/apache/kafka/streams/scala/kstream/GroupedTest.scala
@@ -1,6 +1,4 @@
/*
- * Copyright (C) 2018 Joan Goyeau.
- *
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
@@ -16,23 +14,35 @@
* See the License for the specific language governing permissions and
* limitations under the License.
*/
+
package org.apache.kafka.streams.scala.kstream
-import org.apache.kafka.streams.kstream.internals.SerializedInternal
-import org.apache.kafka.streams.scala.Serdes._
+import org.apache.kafka.streams.kstream.internals.GroupedInternal
import org.apache.kafka.streams.scala.Serdes
+import org.apache.kafka.streams.scala.Serdes._
import org.junit.runner.RunWith
import org.scalatest.junit.JUnitRunner
import org.scalatest.{FlatSpec, Matchers}
@RunWith(classOf[JUnitRunner])
-class SerializedTest extends FlatSpec with Matchers {
+class GroupedTest extends FlatSpec with Matchers {
- "Create a Serialized" should "create a Serialized with Serdes" in {
- val serialized: Serialized[String, Long] = Serialized.`with`[String, Long]
+ "Create a Grouped" should "create a Grouped with Serdes" in {
+ val grouped: Grouped[String, Long] = Grouped.`with`[String, Long]
- val internalSerialized = new SerializedInternal(serialized)
- internalSerialized.keySerde.getClass shouldBe Serdes.String.getClass
- internalSerialized.valueSerde.getClass shouldBe Serdes.Long.getClass
+ val internalGrouped = new GroupedInternal[String, Long](grouped)
+ internalGrouped.keySerde.getClass shouldBe Serdes.String.getClass
+ internalGrouped.valueSerde.getClass shouldBe Serdes.Long.getClass
}
+
+ "Create a Grouped with repartition topic name" should "create a Grouped with Serdes, and repartition topic name" in {
+ val repartitionTopicName = "repartition-topic"
+ val grouped: Grouped[String, Long] = Grouped.`with`(repartitionTopicName)
+
+ val internalGrouped = new GroupedInternal[String, Long](grouped)
+ internalGrouped.keySerde.getClass shouldBe Serdes.String.getClass
+ internalGrouped.valueSerde.getClass shouldBe Serdes.Long.getClass
+ internalGrouped.name() shouldBe repartitionTopicName
+ }
+
}
diff --git a/streams/streams-scala/src/test/scala/org/apache/kafka/streams/scala/kstream/JoinedTest.scala b/streams/streams-scala/src/test/scala/org/apache/kafka/streams/scala/kstream/JoinedTest.scala
index 288b790..9a96a81 100644
--- a/streams/streams-scala/src/test/scala/org/apache/kafka/streams/scala/kstream/JoinedTest.scala
+++ b/streams/streams-scala/src/test/scala/org/apache/kafka/streams/scala/kstream/JoinedTest.scala
@@ -34,4 +34,14 @@ class JoinedTest extends FlatSpec with Matchers {
joined.valueSerde.getClass shouldBe Serdes.Long.getClass
joined.otherValueSerde.getClass shouldBe Serdes.Integer.getClass
}
+
+ "Create a Joined" should "create a Joined with Serdes and repartition topic name" in {
+ val repartitionTopicName = "repartition-topic"
+ val joined: Joined[String, Long, Int] = Joined.`with`(repartitionTopicName)
+
+ joined.keySerde.getClass shouldBe Serdes.String.getClass
+ joined.valueSerde.getClass shouldBe Serdes.Long.getClass
+ joined.otherValueSerde.getClass shouldBe Serdes.Integer.getClass
+ joined.name() shouldBe repartitionTopicName
+ }
}