You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@kafka.apache.org by gu...@apache.org on 2018/10/16 14:25:00 UTC

[kafka] branch trunk updated: MINOR: Update Streams Scala API for addition of Grouped (#5793)

This is an automated email from the ASF dual-hosted git repository.

guozhang pushed a commit to branch trunk
in repository https://gitbox.apache.org/repos/asf/kafka.git


The following commit(s) were added to refs/heads/trunk by this push:
     new 86b1150  MINOR: Update Streams Scala API for addition of Grouped (#5793)
86b1150 is described below

commit 86b1150e18d3fe0a0e3019e034e2a6f0204f7a17
Author: Bill Bejeck <bb...@gmail.com>
AuthorDate: Tue Oct 16 07:24:50 2018 -0700

    MINOR: Update Streams Scala API for addition of Grouped (#5793)
    
    While working on the documentation updates I realized the Streams Scala API needs
    to get updated for the addition of Grouped
    
    Added a test for Grouped.scala ran all streams-scala tests and streams tests
    
    Reviewers: Matthias J. Sax <ma...@confluent.io>, John Roesler <jo...@confluent.io>, Guozhang Wang <gu...@confluent.io>
---
 docs/streams/upgrade-guide.html                    |  3 ++
 .../streams/kstream/internals/GroupedInternal.java |  2 +-
 .../kafka/streams/scala/ImplicitConversions.scala  | 12 +++---
 .../kstream/{Serialized.scala => Grouped.scala}    | 32 +++++++++++----
 .../kafka/streams/scala/kstream/Joined.scala       | 22 ++++++++++
 .../kafka/streams/scala/kstream/KStream.scala      | 28 ++++++-------
 .../kafka/streams/scala/kstream/KTable.scala       | 11 +++--
 .../kafka/streams/scala/kstream/package.scala      |  2 +-
 ...bleJoinScalaIntegrationTestImplicitSerdes.scala | 48 +++++++++++++++++++---
 .../apache/kafka/streams/scala/TopologyTest.scala  |  2 +-
 .../{SerializedTest.scala => GroupedTest.scala}    | 30 +++++++++-----
 .../kafka/streams/scala/kstream/JoinedTest.scala   | 10 +++++
 12 files changed, 150 insertions(+), 52 deletions(-)

diff --git a/docs/streams/upgrade-guide.html b/docs/streams/upgrade-guide.html
index 660b817..e79b106 100644
--- a/docs/streams/upgrade-guide.html
+++ b/docs/streams/upgrade-guide.html
@@ -88,6 +88,9 @@
         Additionally, we've updated the <code>Joined</code> class with a new method <code>Joined#withName</code>
         enabling users to name any repartition topics required for performing Stream/Stream or Stream/Table join.  For more details repartition
         topic naming, see <a href="https://cwiki.apache.org/confluence/display/KAFKA/KIP-372%3A+Naming+Repartition+Topics+for+Joins+and+Grouping">KIP-372</a>.
+
+        As a result we've updated the Kafka Streams Scala API and removed the <code>Serialized</code> class in favor of adding <code>Grouped</code>.
+        If you just rely on the implicit <code>Serialized</code>, you just need to recompile; if you pass in <code>Serialized</code> explicitly, sorry you'll have to make code changes.
     </p>
 
     <p>
diff --git a/streams/src/main/java/org/apache/kafka/streams/kstream/internals/GroupedInternal.java b/streams/src/main/java/org/apache/kafka/streams/kstream/internals/GroupedInternal.java
index 2360fc6..3569caa 100644
--- a/streams/src/main/java/org/apache/kafka/streams/kstream/internals/GroupedInternal.java
+++ b/streams/src/main/java/org/apache/kafka/streams/kstream/internals/GroupedInternal.java
@@ -22,7 +22,7 @@ import org.apache.kafka.streams.kstream.Grouped;
 
 public class GroupedInternal<K, V> extends Grouped<K, V> {
 
-    GroupedInternal(final Grouped<K, V> grouped) {
+    public GroupedInternal(final Grouped<K, V> grouped) {
         super(grouped);
     }
 
diff --git a/streams/streams-scala/src/main/scala/org/apache/kafka/streams/scala/ImplicitConversions.scala b/streams/streams-scala/src/main/scala/org/apache/kafka/streams/scala/ImplicitConversions.scala
index c2ac1ff..f62da2e 100644
--- a/streams/streams-scala/src/main/scala/org/apache/kafka/streams/scala/ImplicitConversions.scala
+++ b/streams/streams-scala/src/main/scala/org/apache/kafka/streams/scala/ImplicitConversions.scala
@@ -19,6 +19,8 @@
  */
 package org.apache.kafka.streams.scala
 
+import org.apache.kafka.common.serialization.Serde
+import org.apache.kafka.streams.KeyValue
 import org.apache.kafka.streams.kstream.{
   KGroupedStream => KGroupedStreamJ,
   KGroupedTable => KGroupedTableJ,
@@ -27,12 +29,10 @@ import org.apache.kafka.streams.kstream.{
   SessionWindowedKStream => SessionWindowedKStreamJ,
   TimeWindowedKStream => TimeWindowedKStreamJ
 }
+import org.apache.kafka.streams.processor.StateStore
 import org.apache.kafka.streams.scala.kstream._
-import org.apache.kafka.streams.KeyValue
-import org.apache.kafka.common.serialization.Serde
 
 import scala.language.implicitConversions
-import org.apache.kafka.streams.processor.StateStore
 
 /**
  * Implicit conversions between the Scala wrapper objects and the underlying Java
@@ -61,10 +61,10 @@ object ImplicitConversions {
   implicit def tuple2ToKeyValue[K, V](tuple: (K, V)): KeyValue[K, V] = new KeyValue(tuple._1, tuple._2)
 
   // we would also like to allow users implicit serdes
-  // and these implicits will convert them to `Serialized`, `Produced` or `Consumed`
+  // and these implicits will convert them to `Grouped`, `Produced` or `Consumed`
 
-  implicit def serializedFromSerde[K, V](implicit keySerde: Serde[K], valueSerde: Serde[V]): Serialized[K, V] =
-    Serialized.`with`[K, V]
+  implicit def groupedFromSerde[K, V](implicit keySerde: Serde[K], valueSerde: Serde[V]): Grouped[K, V] =
+    Grouped.`with`[K, V]
 
   implicit def consumedFromSerde[K, V](implicit keySerde: Serde[K], valueSerde: Serde[V]): Consumed[K, V] =
     Consumed.`with`[K, V]
diff --git a/streams/streams-scala/src/main/scala/org/apache/kafka/streams/scala/kstream/Serialized.scala b/streams/streams-scala/src/main/scala/org/apache/kafka/streams/scala/kstream/Grouped.scala
similarity index 53%
rename from streams/streams-scala/src/main/scala/org/apache/kafka/streams/scala/kstream/Serialized.scala
rename to streams/streams-scala/src/main/scala/org/apache/kafka/streams/scala/kstream/Grouped.scala
index f48d9bf..355eb93 100644
--- a/streams/streams-scala/src/main/scala/org/apache/kafka/streams/scala/kstream/Serialized.scala
+++ b/streams/streams-scala/src/main/scala/org/apache/kafka/streams/scala/kstream/Grouped.scala
@@ -14,23 +14,39 @@
  * See the License for the specific language governing permissions and
  * limitations under the License.
  */
+
 package org.apache.kafka.streams.scala.kstream
 
 import org.apache.kafka.common.serialization.Serde
-import org.apache.kafka.streams.kstream.{Serialized => SerializedJ}
+import org.apache.kafka.streams.kstream.{Grouped => GroupedJ}
 
-object Serialized {
+object Grouped {
 
   /**
-   * Construct a `Serialized` instance with the provided key and value [[Serde]]s.
+   * Construct a `Grouped` instance with the provided key and value [[Serde]]s.
    * If the [[Serde]] params are `null` the default serdes defined in the configs will be used.
    *
-   * @tparam K         the key type
-   * @tparam V         the value type
+   * @tparam K the key type
+   * @tparam V the value type
    * @param keySerde   keySerde that will be used to materialize a stream
    * @param valueSerde valueSerde that will be used to materialize a stream
-   * @return a new instance of [[Serialized]] configured with the provided serdes
+   * @return a new instance of [[Grouped]] configured with the provided serdes
    */
-  def `with`[K, V](implicit keySerde: Serde[K], valueSerde: Serde[V]): SerializedJ[K, V] =
-    SerializedJ.`with`(keySerde, valueSerde)
+  def `with`[K, V](implicit keySerde: Serde[K], valueSerde: Serde[V]): GroupedJ[K, V] =
+    GroupedJ.`with`(keySerde, valueSerde)
+
+  /**
+   * Construct a `Grouped` instance with the provided key and value [[Serde]]s.
+   * If the [[Serde]] params are `null` the default serdes defined in the configs will be used.
+   *
+   * @tparam K the key type
+   * @tparam V the value type
+   * @param name the name used as part of a potential repartition topic
+   * @param keySerde   keySerde that will be used to materialize a stream
+   * @param valueSerde valueSerde that will be used to materialize a stream
+   * @return a new instance of [[Grouped]] configured with the provided serdes
+   */
+  def `with`[K, V](name: String)(implicit keySerde: Serde[K], valueSerde: Serde[V]): GroupedJ[K, V] =
+    GroupedJ.`with`(name, keySerde, valueSerde)
+
 }
diff --git a/streams/streams-scala/src/main/scala/org/apache/kafka/streams/scala/kstream/Joined.scala b/streams/streams-scala/src/main/scala/org/apache/kafka/streams/scala/kstream/Joined.scala
index ffd3e61..b6dbb05 100644
--- a/streams/streams-scala/src/main/scala/org/apache/kafka/streams/scala/kstream/Joined.scala
+++ b/streams/streams-scala/src/main/scala/org/apache/kafka/streams/scala/kstream/Joined.scala
@@ -39,4 +39,26 @@ object Joined {
                        otherValueSerde: Serde[VO]): JoinedJ[K, V, VO] =
     JoinedJ.`with`(keySerde, valueSerde, otherValueSerde)
 
+  /**
+   * Create an instance of [[org.apache.kafka.streams.kstream.Joined]] with key, value, and otherValue [[Serde]]
+   * instances.
+   * `null` values are accepted and will be replaced by the default serdes as defined in config.
+   *
+   * @tparam K              key type
+   * @tparam V              value type
+   * @tparam VO             other value type
+   * @param name            name of possible repartition topic
+   * @param keySerde        the key serde to use.
+   * @param valueSerde      the value serde to use.
+   * @param otherValueSerde the otherValue serde to use. If `null` the default value serde from config will be used
+   * @return new [[org.apache.kafka.streams.kstream.Joined]] instance with the provided serdes
+   */
+  // disable spotless scala, which wants to make a mess of the argument lists
+  // format: off
+  def `with`[K, V, VO](name: String)
+                      (implicit keySerde: Serde[K],
+                       valueSerde: Serde[V],
+                       otherValueSerde: Serde[VO]): JoinedJ[K, V, VO] =
+    JoinedJ.`with`(keySerde, valueSerde, otherValueSerde, name)
+  // format:on
 }
diff --git a/streams/streams-scala/src/main/scala/org/apache/kafka/streams/scala/kstream/KStream.scala b/streams/streams-scala/src/main/scala/org/apache/kafka/streams/scala/kstream/KStream.scala
index d54ac5a..635975b 100644
--- a/streams/streams-scala/src/main/scala/org/apache/kafka/streams/scala/kstream/KStream.scala
+++ b/streams/streams-scala/src/main/scala/org/apache/kafka/streams/scala/kstream/KStream.scala
@@ -23,8 +23,8 @@ package kstream
 import org.apache.kafka.streams.KeyValue
 import org.apache.kafka.streams.kstream.{KStream => KStreamJ, _}
 import org.apache.kafka.streams.processor.{Processor, ProcessorSupplier, TopicNameExtractor}
-import org.apache.kafka.streams.scala.ImplicitConversions._
 import org.apache.kafka.streams.scala.FunctionsCompatConversions._
+import org.apache.kafka.streams.scala.ImplicitConversions._
 
 import scala.collection.JavaConverters._
 
@@ -351,8 +351,8 @@ class KStream[K, V](val inner: KStreamJ[K, V]) {
   /**
    * Group the records by their current key into a [[KGroupedStream]]
    * <p>
-   * The user can either supply the `Serialized` instance as an implicit in scope or she can also provide an implicit
-   * serdes that will be converted to a `Serialized` instance implicitly.
+   * The user can either supply the `Grouped` instance as an implicit in scope or she can also provide an implicit
+   * serdes that will be converted to a `Grouped` instance implicitly.
    * <p>
    * {{{
    * Example:
@@ -365,28 +365,28 @@ class KStream[K, V](val inner: KStreamJ[K, V]) {
    *     .leftJoin(userRegionsTable, (clicks: Long, region: String) => (if (region == null) "UNKNOWN" else region, clicks))
    *     .map((_, regionWithClicks) => regionWithClicks)
    *
-   *     // the groupByKey gets the Serialized instance through an implicit conversion of the
+   *     // the groupByKey gets the Grouped instance through an implicit conversion of the
    *     // serdes brought into scope through the import Serdes._ above
    *     .groupByKey
    *     .reduce(_ + _)
    *
-   * // Similarly you can create an implicit Serialized and it will be passed implicitly
+   * // Similarly you can create an implicit Grouped and it will be passed implicitly
    * // to the groupByKey call
    * }}}
    *
-   * @param serialized the instance of Serialized that gives the serdes
+   * @param grouped the instance of Grouped that gives the serdes
    * @return a [[KGroupedStream]] that contains the grouped records of the original [[KStream]]
    * @see `org.apache.kafka.streams.kstream.KStream#groupByKey`
    */
-  def groupByKey(implicit serialized: Serialized[K, V]): KGroupedStream[K, V] =
-    inner.groupByKey(serialized)
+  def groupByKey(implicit grouped: Grouped[K, V]): KGroupedStream[K, V] =
+    inner.groupByKey(grouped)
 
   /**
    * Group the records of this [[KStream]] on a new key that is selected using the provided key transformation function
-   * and the `Serialized` instance.
+   * and the `Grouped` instance.
    * <p>
-   * The user can either supply the `Serialized` instance as an implicit in scope or she can also provide an implicit
-   * serdes that will be converted to a `Serialized` instance implicitly.
+   * The user can either supply the `Grouped` instance as an implicit in scope or she can also provide an implicit
+   * serdes that will be converted to a `Grouped` instance implicitly.
    * <p>
    * {{{
    * Example:
@@ -401,7 +401,7 @@ class KStream[K, V](val inner: KStreamJ[K, V]) {
    * val wordCounts: KTable[String, Long] =
    *   textLines.flatMapValues(v => pattern.split(v.toLowerCase))
    *
-   *     // the groupBy gets the Serialized instance through an implicit conversion of the
+   *     // the groupBy gets the Grouped instance through an implicit conversion of the
    *     // serdes brought into scope through the import Serdes._ above
    *     .groupBy((k, v) => v)
    *
@@ -412,8 +412,8 @@ class KStream[K, V](val inner: KStreamJ[K, V]) {
    * @return a [[KGroupedStream]] that contains the grouped records of the original [[KStream]]
    * @see `org.apache.kafka.streams.kstream.KStream#groupBy`
    */
-  def groupBy[KR](selector: (K, V) => KR)(implicit serialized: Serialized[KR, V]): KGroupedStream[KR, V] =
-    inner.groupBy(selector.asKeyValueMapper, serialized)
+  def groupBy[KR](selector: (K, V) => KR)(implicit grouped: Grouped[KR, V]): KGroupedStream[KR, V] =
+    inner.groupBy(selector.asKeyValueMapper, grouped)
 
   /**
    * Join records of this stream with another [[KStream]]'s records using windowed inner equi join with
diff --git a/streams/streams-scala/src/main/scala/org/apache/kafka/streams/scala/kstream/KTable.scala b/streams/streams-scala/src/main/scala/org/apache/kafka/streams/scala/kstream/KTable.scala
index 881c8e0..9ac27ee 100644
--- a/streams/streams-scala/src/main/scala/org/apache/kafka/streams/scala/kstream/KTable.scala
+++ b/streams/streams-scala/src/main/scala/org/apache/kafka/streams/scala/kstream/KTable.scala
@@ -20,11 +20,10 @@
 package org.apache.kafka.streams.scala
 package kstream
 
-import org.apache.kafka.common.serialization.Serde
 import org.apache.kafka.common.utils.Bytes
 import org.apache.kafka.streams.kstream.{KTable => KTableJ, _}
-import org.apache.kafka.streams.scala.ImplicitConversions._
 import org.apache.kafka.streams.scala.FunctionsCompatConversions._
+import org.apache.kafka.streams.scala.ImplicitConversions._
 import org.apache.kafka.streams.state.KeyValueStore
 
 /**
@@ -213,15 +212,15 @@ class KTable[K, V](val inner: KTableJ[K, V]) {
 
   /**
    * Re-groups the records of this [[KTable]] using the provided key/value mapper
-   * and `Serde`s as specified by `Serialized`.
+   * and `Serde`s as specified by `Grouped`.
    *
    * @param selector      a function that computes a new grouping key and value to be aggregated
-   * @param serialized    the `Serialized` instance used to specify `Serdes`
+   * @param grouped       the `Grouped` instance used to specify `Serdes`
    * @return a [[KGroupedTable]] that contains the re-grouped records of the original [[KTable]]
    * @see `org.apache.kafka.streams.kstream.KTable#groupBy`
    */
-  def groupBy[KR, VR](selector: (K, V) => (KR, VR))(implicit serialized: Serialized[KR, VR]): KGroupedTable[KR, VR] =
-    inner.groupBy(selector.asKeyValueMapper, serialized)
+  def groupBy[KR, VR](selector: (K, V) => (KR, VR))(implicit grouped: Grouped[KR, VR]): KGroupedTable[KR, VR] =
+    inner.groupBy(selector.asKeyValueMapper, grouped)
 
   /**
    * Join records of this [[KTable]] with another [[KTable]]'s records using non-windowed inner equi join.
diff --git a/streams/streams-scala/src/main/scala/org/apache/kafka/streams/scala/kstream/package.scala b/streams/streams-scala/src/main/scala/org/apache/kafka/streams/scala/kstream/package.scala
index 842dd79..db4463b 100644
--- a/streams/streams-scala/src/main/scala/org/apache/kafka/streams/scala/kstream/package.scala
+++ b/streams/streams-scala/src/main/scala/org/apache/kafka/streams/scala/kstream/package.scala
@@ -20,7 +20,7 @@ import org.apache.kafka.streams.processor.StateStore
 
 package object kstream {
   type Materialized[K, V, S <: StateStore] = org.apache.kafka.streams.kstream.Materialized[K, V, S]
-  type Serialized[K, V] = org.apache.kafka.streams.kstream.Serialized[K, V]
+  type Grouped[K, V] = org.apache.kafka.streams.kstream.Grouped[K, V]
   type Consumed[K, V] = org.apache.kafka.streams.kstream.Consumed[K, V]
   type Produced[K, V] = org.apache.kafka.streams.kstream.Produced[K, V]
   type Joined[K, V, VO] = org.apache.kafka.streams.kstream.Joined[K, V, VO]
diff --git a/streams/streams-scala/src/test/scala/org/apache/kafka/streams/scala/StreamToTableJoinScalaIntegrationTestImplicitSerdes.scala b/streams/streams-scala/src/test/scala/org/apache/kafka/streams/scala/StreamToTableJoinScalaIntegrationTestImplicitSerdes.scala
index 44c3605..523418d 100644
--- a/streams/streams-scala/src/test/scala/org/apache/kafka/streams/scala/StreamToTableJoinScalaIntegrationTestImplicitSerdes.scala
+++ b/streams/streams-scala/src/test/scala/org/apache/kafka/streams/scala/StreamToTableJoinScalaIntegrationTestImplicitSerdes.scala
@@ -40,8 +40,48 @@ class StreamToTableJoinScalaIntegrationTestImplicitSerdes extends StreamToTableJ
 
   @Test def testShouldCountClicksPerRegion(): Unit = {
 
-    // DefaultSerdes brings into scope implicit serdes (mostly for primitives) that will set up all Serialized, Produced,
-    // Consumed and Joined instances. So all APIs below that accept Serialized, Produced, Consumed or Joined will
+    // DefaultSerdes brings into scope implicit serdes (mostly for primitives) that will set up all Grouped, Produced,
+    // Consumed and Joined instances. So all APIs below that accept Grouped, Produced, Consumed or Joined will
+    // get these instances automatically
+    import Serdes._
+
+    val streamsConfiguration: Properties = getStreamsConfiguration()
+
+    val builder = new StreamsBuilder()
+
+    val userClicksStream: KStream[String, Long] = builder.stream(userClicksTopic)
+
+    val userRegionsTable: KTable[String, String] = builder.table(userRegionsTopic)
+
+    // Compute the total per region by summing the individual click counts per region.
+    val clicksPerRegion: KTable[String, Long] =
+      userClicksStream
+
+      // Join the stream against the table.
+        .leftJoin(userRegionsTable)((clicks, region) => (if (region == null) "UNKNOWN" else region, clicks))
+
+        // Change the stream from <user> -> <region, clicks> to <region> -> <clicks>
+        .map((_, regionWithClicks) => regionWithClicks)
+
+        // Compute the total per region by summing the individual click counts per region.
+        .groupByKey
+        .reduce(_ + _)
+
+    // Write the (continuously updating) results to the output topic.
+    clicksPerRegion.toStream.to(outputTopic)
+
+    val streams: KafkaStreams = new KafkaStreams(builder.build(), streamsConfiguration)
+    streams.start()
+
+    val actualClicksPerRegion: java.util.List[KeyValue[String, Long]] =
+      produceNConsume(userClicksTopic, userRegionsTopic, outputTopic)
+    streams.close()
+  }
+
+  @Test def testShouldCountClicksPerRegionWithNamedRepartitionTopic(): Unit = {
+
+    // DefaultSerdes brings into scope implicit serdes (mostly for primitives) that will set up all Grouped, Produced,
+    // Consumed and Joined instances. So all APIs below that accept Grouped, Produced, Consumed or Joined will
     // get these instances automatically
     import Serdes._
 
@@ -85,8 +125,6 @@ class StreamToTableJoinScalaIntegrationTestImplicitSerdes extends StreamToTableJ
     import org.apache.kafka.streams.kstream.{KStream => KStreamJ, KTable => KTableJ, _}
     import org.apache.kafka.streams.{KafkaStreams => KafkaStreamsJ, StreamsBuilder => StreamsBuilderJ}
 
-    import collection.JavaConverters._
-
     val streamsConfiguration: Properties = getStreamsConfiguration()
 
     streamsConfiguration.put(StreamsConfig.DEFAULT_KEY_SERDE_CLASS_CONFIG, Serdes.String.getClass.getName)
@@ -122,7 +160,7 @@ class StreamToTableJoinScalaIntegrationTestImplicitSerdes extends StreamToTableJ
 
     // Compute the total per region by summing the individual click counts per region.
     val clicksPerRegion: KTableJ[String, JLong] = clicksByRegion
-      .groupByKey(Serialized.`with`(Serdes.String, Serdes.JavaLong))
+      .groupByKey(Grouped.`with`[String, JLong](Serdes.String, Serdes.JavaLong))
       .reduce {
         new Reducer[JLong] {
           def apply(v1: JLong, v2: JLong) = v1 + v2
diff --git a/streams/streams-scala/src/test/scala/org/apache/kafka/streams/scala/TopologyTest.scala b/streams/streams-scala/src/test/scala/org/apache/kafka/streams/scala/TopologyTest.scala
index 889e67c..a826401 100644
--- a/streams/streams-scala/src/test/scala/org/apache/kafka/streams/scala/TopologyTest.scala
+++ b/streams/streams-scala/src/test/scala/org/apache/kafka/streams/scala/TopologyTest.scala
@@ -191,7 +191,7 @@ class TopologyTest extends JUnitSuite {
 
       // Compute the total per region by summing the individual click counts per region.
       val clicksPerRegion: KTableJ[String, JLong] = clicksByRegion
-        .groupByKey(Serialized.`with`[String, JLong])
+        .groupByKey(Grouped.`with`[String, JLong])
         .reduce {
           new Reducer[JLong] {
             def apply(v1: JLong, v2: JLong) = v1 + v2
diff --git a/streams/streams-scala/src/test/scala/org/apache/kafka/streams/scala/kstream/SerializedTest.scala b/streams/streams-scala/src/test/scala/org/apache/kafka/streams/scala/kstream/GroupedTest.scala
similarity index 52%
rename from streams/streams-scala/src/test/scala/org/apache/kafka/streams/scala/kstream/SerializedTest.scala
rename to streams/streams-scala/src/test/scala/org/apache/kafka/streams/scala/kstream/GroupedTest.scala
index 4264fa5..728562a 100644
--- a/streams/streams-scala/src/test/scala/org/apache/kafka/streams/scala/kstream/SerializedTest.scala
+++ b/streams/streams-scala/src/test/scala/org/apache/kafka/streams/scala/kstream/GroupedTest.scala
@@ -1,6 +1,4 @@
 /*
- * Copyright (C) 2018 Joan Goyeau.
- *
  * Licensed to the Apache Software Foundation (ASF) under one or more
  * contributor license agreements. See the NOTICE file distributed with
  * this work for additional information regarding copyright ownership.
@@ -16,23 +14,35 @@
  * See the License for the specific language governing permissions and
  * limitations under the License.
  */
+
 package org.apache.kafka.streams.scala.kstream
 
-import org.apache.kafka.streams.kstream.internals.SerializedInternal
-import org.apache.kafka.streams.scala.Serdes._
+import org.apache.kafka.streams.kstream.internals.GroupedInternal
 import org.apache.kafka.streams.scala.Serdes
+import org.apache.kafka.streams.scala.Serdes._
 import org.junit.runner.RunWith
 import org.scalatest.junit.JUnitRunner
 import org.scalatest.{FlatSpec, Matchers}
 
 @RunWith(classOf[JUnitRunner])
-class SerializedTest extends FlatSpec with Matchers {
+class GroupedTest extends FlatSpec with Matchers {
 
-  "Create a Serialized" should "create a Serialized with Serdes" in {
-    val serialized: Serialized[String, Long] = Serialized.`with`[String, Long]
+  "Create a Grouped" should "create a Grouped with Serdes" in {
+    val grouped: Grouped[String, Long] = Grouped.`with`[String, Long]
 
-    val internalSerialized = new SerializedInternal(serialized)
-    internalSerialized.keySerde.getClass shouldBe Serdes.String.getClass
-    internalSerialized.valueSerde.getClass shouldBe Serdes.Long.getClass
+    val internalGrouped = new GroupedInternal[String, Long](grouped)
+    internalGrouped.keySerde.getClass shouldBe Serdes.String.getClass
+    internalGrouped.valueSerde.getClass shouldBe Serdes.Long.getClass
   }
+
+  "Create a Grouped with repartition topic name" should "create a Grouped with Serdes, and repartition topic name" in {
+    val repartitionTopicName = "repartition-topic"
+    val grouped: Grouped[String, Long] = Grouped.`with`(repartitionTopicName)
+
+    val internalGrouped = new GroupedInternal[String, Long](grouped)
+    internalGrouped.keySerde.getClass shouldBe Serdes.String.getClass
+    internalGrouped.valueSerde.getClass shouldBe Serdes.Long.getClass
+    internalGrouped.name() shouldBe repartitionTopicName
+  }
+
 }
diff --git a/streams/streams-scala/src/test/scala/org/apache/kafka/streams/scala/kstream/JoinedTest.scala b/streams/streams-scala/src/test/scala/org/apache/kafka/streams/scala/kstream/JoinedTest.scala
index 288b790..9a96a81 100644
--- a/streams/streams-scala/src/test/scala/org/apache/kafka/streams/scala/kstream/JoinedTest.scala
+++ b/streams/streams-scala/src/test/scala/org/apache/kafka/streams/scala/kstream/JoinedTest.scala
@@ -34,4 +34,14 @@ class JoinedTest extends FlatSpec with Matchers {
     joined.valueSerde.getClass shouldBe Serdes.Long.getClass
     joined.otherValueSerde.getClass shouldBe Serdes.Integer.getClass
   }
+
+  "Create a Joined" should "create a Joined with Serdes and repartition topic name" in {
+    val repartitionTopicName = "repartition-topic"
+    val joined: Joined[String, Long, Int] = Joined.`with`(repartitionTopicName)
+
+    joined.keySerde.getClass shouldBe Serdes.String.getClass
+    joined.valueSerde.getClass shouldBe Serdes.Long.getClass
+    joined.otherValueSerde.getClass shouldBe Serdes.Integer.getClass
+    joined.name() shouldBe repartitionTopicName
+  }
 }