You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@kafka.apache.org by gu...@apache.org on 2018/05/11 17:15:55 UTC

[kafka] branch trunk updated: MINOR: Count fix and Type alias refactor in Streams Scala API (#4966)

This is an automated email from the ASF dual-hosted git repository.

guozhang pushed a commit to branch trunk
in repository https://gitbox.apache.org/repos/asf/kafka.git


The following commit(s) were added to refs/heads/trunk by this push:
     new 40d191b  MINOR: Count fix and Type alias refactor in Streams Scala API (#4966)
40d191b is described below

commit 40d191b563cb8e94b28b15a217a759a1df9b6759
Author: Joan Goyeau <jo...@goyeau.com>
AuthorDate: Fri May 11 18:15:48 2018 +0100

    MINOR: Count fix and Type alias refactor in Streams Scala API (#4966)
    
    Reviewers: Debasish Ghosh <dg...@acm.org>, Guozhang Wang <gu...@confluent.io>
---
 .../scala/org/apache/kafka/streams/package.scala   |  4 +-
 .../kafka/streams/scala/StreamsBuilder.scala       |  1 -
 .../streams/scala/kstream/KGroupedStream.scala     | 14 +++---
 .../streams/scala/kstream/KGroupedTable.scala      | 29 +++++-------
 .../kafka/streams/scala/kstream/KTable.scala       | 54 +++++++---------------
 .../scala/kstream/SessionWindowedKStream.scala     | 26 ++++-------
 .../scala/kstream/TimeWindowedKStream.scala        | 27 ++++-------
 .../apache/kafka/streams/scala/WordCountTest.scala | 33 ++++++++++++-
 8 files changed, 88 insertions(+), 100 deletions(-)

diff --git a/streams/streams-scala/src/main/scala/org/apache/kafka/streams/package.scala b/streams/streams-scala/src/main/scala/org/apache/kafka/streams/package.scala
index 864fd19..01f9833 100644
--- a/streams/streams-scala/src/main/scala/org/apache/kafka/streams/package.scala
+++ b/streams/streams-scala/src/main/scala/org/apache/kafka/streams/package.scala
@@ -19,9 +19,11 @@
  */
 package org.apache.kafka.streams
 
-import org.apache.kafka.streams.state.KeyValueStore
+import org.apache.kafka.streams.state.{KeyValueStore, SessionStore, WindowStore}
 import org.apache.kafka.common.utils.Bytes
 
 package object scala {
   type ByteArrayKeyValueStore = KeyValueStore[Bytes, Array[Byte]]
+  type ByteArraySessionStore = SessionStore[Bytes, Array[Byte]]
+  type ByteArrayWindowStore = WindowStore[Bytes, Array[Byte]]
 }
diff --git a/streams/streams-scala/src/main/scala/org/apache/kafka/streams/scala/StreamsBuilder.scala b/streams/streams-scala/src/main/scala/org/apache/kafka/streams/scala/StreamsBuilder.scala
index 397af32..3f58dd3 100644
--- a/streams/streams-scala/src/main/scala/org/apache/kafka/streams/scala/StreamsBuilder.scala
+++ b/streams/streams-scala/src/main/scala/org/apache/kafka/streams/scala/StreamsBuilder.scala
@@ -25,7 +25,6 @@ import org.apache.kafka.streams.kstream.{GlobalKTable, Materialized}
 import org.apache.kafka.streams.processor.{ProcessorSupplier, StateStore}
 import org.apache.kafka.streams.state.StoreBuilder
 import org.apache.kafka.streams.{Consumed, StreamsBuilder => StreamsBuilderJ, Topology}
-
 import org.apache.kafka.streams.scala.kstream._
 import ImplicitConversions._
 import scala.collection.JavaConverters._
diff --git a/streams/streams-scala/src/main/scala/org/apache/kafka/streams/scala/kstream/KGroupedStream.scala b/streams/streams-scala/src/main/scala/org/apache/kafka/streams/scala/kstream/KGroupedStream.scala
index 4cee0ac..acffb1f 100644
--- a/streams/streams-scala/src/main/scala/org/apache/kafka/streams/scala/kstream/KGroupedStream.scala
+++ b/streams/streams-scala/src/main/scala/org/apache/kafka/streams/scala/kstream/KGroupedStream.scala
@@ -58,8 +58,9 @@ class KGroupedStream[K, V](val inner: KGroupedStreamJ[K, V]) {
    * represent the latest (rolling) count (i.e., number of records) for each key
    * @see `org.apache.kafka.streams.kstream.KGroupedStream#count`
    */ 
-  def count(materialized: Materialized[K, Long, ByteArrayKeyValueStore]): KTable[K, Long] = { 
-    val c: KTable[K, java.lang.Long] = inner.count(materialized.asInstanceOf[Materialized[K, java.lang.Long, ByteArrayKeyValueStore]])
+  def count(materialized: Materialized[K, Long, ByteArrayKeyValueStore]): KTable[K, Long] = {
+    val c: KTable[K, java.lang.Long] =
+      inner.count(materialized.asInstanceOf[Materialized[K, java.lang.Long, ByteArrayKeyValueStore]])
     c.mapValues[Long](Long2long _)
   }
 
@@ -71,9 +72,8 @@ class KGroupedStream[K, V](val inner: KGroupedStreamJ[K, V]) {
    * latest (rolling) aggregate for each key
    * @see `org.apache.kafka.streams.kstream.KGroupedStream#reduce`
    */ 
-  def reduce(reducer: (V, V) => V): KTable[K, V] = {
+  def reduce(reducer: (V, V) => V): KTable[K, V] =
     inner.reduce(reducer.asReducer)
-  }
 
   /**
    * Combine the values of records in this stream by the grouped key.
@@ -102,9 +102,8 @@ class KGroupedStream[K, V](val inner: KGroupedStreamJ[K, V]) {
    * @see `org.apache.kafka.streams.kstream.KGroupedStream#aggregate`
    */ 
   def aggregate[VR](initializer: () => VR,
-    aggregator: (K, V, VR) => VR): KTable[K, VR] = {
+    aggregator: (K, V, VR) => VR): KTable[K, VR] =
     inner.aggregate(initializer.asInitializer, aggregator.asAggregator)
-  }
 
   /**
    * Aggregate the values of records in this stream by the grouped key.
@@ -118,9 +117,8 @@ class KGroupedStream[K, V](val inner: KGroupedStreamJ[K, V]) {
    */ 
   def aggregate[VR](initializer: () => VR,
     aggregator: (K, V, VR) => VR,
-    materialized: Materialized[K, VR, ByteArrayKeyValueStore]): KTable[K, VR] = {
+    materialized: Materialized[K, VR, ByteArrayKeyValueStore]): KTable[K, VR] =
     inner.aggregate(initializer.asInitializer, aggregator.asAggregator, materialized)
-  }
 
   /**
    * Create a new [[SessionWindowedKStream]] instance that can be used to perform session windowed aggregations.
diff --git a/streams/streams-scala/src/main/scala/org/apache/kafka/streams/scala/kstream/KGroupedTable.scala b/streams/streams-scala/src/main/scala/org/apache/kafka/streams/scala/kstream/KGroupedTable.scala
index 57c44fc..673ab5d 100644
--- a/streams/streams-scala/src/main/scala/org/apache/kafka/streams/scala/kstream/KGroupedTable.scala
+++ b/streams/streams-scala/src/main/scala/org/apache/kafka/streams/scala/kstream/KGroupedTable.scala
@@ -42,10 +42,10 @@ class KGroupedTable[K, V](inner: KGroupedTableJ[K, V]) {
    * @return a [[KTable]] that contains "update" records with unmodified keys and `Long` values that
    * represent the latest (rolling) count (i.e., number of records) for each key
    * @see `org.apache.kafka.streams.kstream.KGroupedTable#count`
-   */ 
+   */
   def count(): KTable[K, Long] = {
     val c: KTable[K, java.lang.Long] = inner.count()
-    c.mapValues[Long](Long2long(_))
+    c.mapValues[Long](Long2long _)
   }
 
   /**
@@ -56,9 +56,12 @@ class KGroupedTable[K, V](inner: KGroupedTableJ[K, V]) {
    * @return a [[KTable]] that contains "update" records with unmodified keys and `Long` values that
    * represent the latest (rolling) count (i.e., number of records) for each key
    * @see `org.apache.kafka.streams.kstream.KGroupedTable#count`
-   */ 
-  def count(materialized: Materialized[K, Long, ByteArrayKeyValueStore]): KTable[K, Long] =
-    inner.count(materialized)
+   */
+  def count(materialized: Materialized[K, Long, ByteArrayKeyValueStore]): KTable[K, Long] = {
+    val c: KTable[K, java.lang.Long] =
+      inner.count(materialized.asInstanceOf[Materialized[K, java.lang.Long, ByteArrayKeyValueStore]])
+    c.mapValues[Long](Long2long _)
+  }
 
   /**
    * Combine the value of records of the original [[KTable]] that got [[KTable#groupBy]]
@@ -71,12 +74,10 @@ class KGroupedTable[K, V](inner: KGroupedTableJ[K, V]) {
    * @see `org.apache.kafka.streams.kstream.KGroupedTable#reduce`
    */
   def reduce(adder: (V, V) => V,
-             subtractor: (V, V) => V): KTable[K, V] = {
-
+             subtractor: (V, V) => V): KTable[K, V] =
     // need this explicit asReducer for Scala 2.11 or else the SAM conversion doesn't take place
     // works perfectly with Scala 2.12 though
     inner.reduce(adder.asReducer, subtractor.asReducer)
-  }
 
   /**
    * Combine the value of records of the original [[KTable]] that got [[KTable#groupBy]]
@@ -91,12 +92,10 @@ class KGroupedTable[K, V](inner: KGroupedTableJ[K, V]) {
    */
   def reduce(adder: (V, V) => V,
              subtractor: (V, V) => V,
-             materialized: Materialized[K, V, ByteArrayKeyValueStore]): KTable[K, V] = {
-
+             materialized: Materialized[K, V, ByteArrayKeyValueStore]): KTable[K, V] =
     // need this explicit asReducer for Scala 2.11 or else the SAM conversion doesn't take place
     // works perfectly with Scala 2.12 though
     inner.reduce(adder.asReducer, subtractor.asReducer, materialized)
-  }
 
   /**
    * Aggregate the value of records of the original [[KTable]] that got [[KTable#groupBy]]
@@ -111,10 +110,8 @@ class KGroupedTable[K, V](inner: KGroupedTableJ[K, V]) {
    */
   def aggregate[VR](initializer: () => VR,
                     adder: (K, V, VR) => VR,
-                    subtractor: (K, V, VR) => VR): KTable[K, VR] = {
-
+                    subtractor: (K, V, VR) => VR): KTable[K, VR] =
     inner.aggregate(initializer.asInitializer, adder.asAggregator, subtractor.asAggregator)
-  }
 
   /**
    * Aggregate the value of records of the original [[KTable]] that got [[KTable#groupBy]]
@@ -131,8 +128,6 @@ class KGroupedTable[K, V](inner: KGroupedTableJ[K, V]) {
   def aggregate[VR](initializer: () => VR,
                     adder: (K, V, VR) => VR,
                     subtractor: (K, V, VR) => VR,
-                    materialized: Materialized[K, VR, ByteArrayKeyValueStore]): KTable[K, VR] = {
-
+                    materialized: Materialized[K, VR, ByteArrayKeyValueStore]): KTable[K, VR] =
     inner.aggregate(initializer.asInitializer, adder.asAggregator, subtractor.asAggregator, materialized)
-  }
 }
diff --git a/streams/streams-scala/src/main/scala/org/apache/kafka/streams/scala/kstream/KTable.scala b/streams/streams-scala/src/main/scala/org/apache/kafka/streams/scala/kstream/KTable.scala
index 0369ee5..218063e 100644
--- a/streams/streams-scala/src/main/scala/org/apache/kafka/streams/scala/kstream/KTable.scala
+++ b/streams/streams-scala/src/main/scala/org/apache/kafka/streams/scala/kstream/KTable.scala
@@ -58,9 +58,8 @@ class KTable[K, V](val inner: KTableJ[K, V]) {
    * @see `org.apache.kafka.streams.kstream.KTable#filter`
    */ 
   def filter(predicate: (K, V) => Boolean,
-    materialized: Materialized[K, V, ByteArrayKeyValueStore]): KTable[K, V] = {
+    materialized: Materialized[K, V, ByteArrayKeyValueStore]): KTable[K, V] =
     inner.filter(predicate.asPredicate, materialized)
-  }
 
   /**
    * Create a new [[KTable]] that consists all records of this [[KTable]] which do <em>not</em> satisfy the given
@@ -70,9 +69,8 @@ class KTable[K, V](val inner: KTableJ[K, V]) {
    * @return a [[KTable]] that contains only those records that do <em>not</em> satisfy the given predicate
    * @see `org.apache.kafka.streams.kstream.KTable#filterNot`
    */ 
-  def filterNot(predicate: (K, V) => Boolean): KTable[K, V] = {
+  def filterNot(predicate: (K, V) => Boolean): KTable[K, V] =
     inner.filterNot(predicate(_, _))
-  }
 
   /**
    * Create a new [[KTable]] that consists all records of this [[KTable]] which do <em>not</em> satisfy the given
@@ -85,9 +83,8 @@ class KTable[K, V](val inner: KTableJ[K, V]) {
    * @see `org.apache.kafka.streams.kstream.KTable#filterNot`
    */ 
   def filterNot(predicate: (K, V) => Boolean,
-    materialized: Materialized[K, V, ByteArrayKeyValueStore]): KTable[K, V] = {
+    materialized: Materialized[K, V, ByteArrayKeyValueStore]): KTable[K, V] =
     inner.filterNot(predicate.asPredicate, materialized)
-  }
 
   /**
    * Create a new [[KTable]] by transforming the value of each record in this [[KTable]] into a new value
@@ -99,9 +96,8 @@ class KTable[K, V](val inner: KTableJ[K, V]) {
    * @return a [[KTable]] that contains records with unmodified key and new values (possibly of different type)
    * @see `org.apache.kafka.streams.kstream.KTable#mapValues`
    */ 
-  def mapValues[VR](mapper: V => VR): KTable[K, VR] = {
+  def mapValues[VR](mapper: V => VR): KTable[K, VR] =
     inner.mapValues[VR](mapper.asValueMapper)
-  }
 
   /**
    * Create a new [[KTable]] by transforming the value of each record in this [[KTable]] into a new value
@@ -116,9 +112,8 @@ class KTable[K, V](val inner: KTableJ[K, V]) {
    * @see `org.apache.kafka.streams.kstream.KTable#mapValues`
    */ 
   def mapValues[VR](mapper: V => VR,
-    materialized: Materialized[K, VR, ByteArrayKeyValueStore]): KTable[K, VR] = {
+    materialized: Materialized[K, VR, ByteArrayKeyValueStore]): KTable[K, VR] =
     inner.mapValues[VR](mapper.asValueMapper, materialized)
-  }
 
   /**
    * Create a new [[KTable]] by transforming the value of each record in this [[KTable]] into a new value
@@ -130,9 +125,8 @@ class KTable[K, V](val inner: KTableJ[K, V]) {
    * @return a [[KTable]] that contains records with unmodified key and new values (possibly of different type)
    * @see `org.apache.kafka.streams.kstream.KTable#mapValues`
    */ 
-  def mapValues[VR](mapper: (K, V) => VR): KTable[K, VR] = {
+  def mapValues[VR](mapper: (K, V) => VR): KTable[K, VR] =
     inner.mapValues[VR](mapper.asValueMapperWithKey)
-  }
 
   /**
    * Create a new [[KTable]] by transforming the value of each record in this [[KTable]] into a new value
@@ -147,9 +141,8 @@ class KTable[K, V](val inner: KTableJ[K, V]) {
    * @see `org.apache.kafka.streams.kstream.KTable#mapValues`
    */ 
   def mapValues[VR](mapper: (K, V) => VR,
-    materialized: Materialized[K, VR, ByteArrayKeyValueStore]): KTable[K, VR] = {
+    materialized: Materialized[K, VR, ByteArrayKeyValueStore]): KTable[K, VR] =
     inner.mapValues[VR](mapper.asValueMapperWithKey)
-  }
 
   /**
    * Convert this changelog stream to a [[KStream]].
@@ -166,9 +159,8 @@ class KTable[K, V](val inner: KTableJ[K, V]) {
    * @return a [[KStream]] that contains the same records as this [[KTable]]
    * @see `org.apache.kafka.streams.kstream.KTable#toStream`
    */
-  def toStream[KR](mapper: (K, V) => KR): KStream[KR, V] = {
+  def toStream[KR](mapper: (K, V) => KR): KStream[KR, V] =
     inner.toStream[KR](mapper.asKeyValueMapper)
-  }
 
   /**
    * Re-groups the records of this [[KTable]] using the provided key/value mapper
@@ -179,9 +171,8 @@ class KTable[K, V](val inner: KTableJ[K, V]) {
    * @return a [[KGroupedTable]] that contains the re-grouped records of the original [[KTable]]
    * @see `org.apache.kafka.streams.kstream.KTable#groupBy`
    */ 
-  def groupBy[KR, VR](selector: (K, V) => (KR, VR))(implicit serialized: Serialized[KR, VR]): KGroupedTable[KR, VR] = {
+  def groupBy[KR, VR](selector: (K, V) => (KR, VR))(implicit serialized: Serialized[KR, VR]): KGroupedTable[KR, VR] =
     inner.groupBy(selector.asKeyValueMapper, serialized)
-  }
 
   /**
    * Join records of this [[KTable]] with another [[KTable]]'s records using non-windowed inner equi join.
@@ -193,10 +184,8 @@ class KTable[K, V](val inner: KTableJ[K, V]) {
    * @see `org.apache.kafka.streams.kstream.KTable#join`
    */ 
   def join[VO, VR](other: KTable[K, VO],
-    joiner: (V, VO) => VR): KTable[K, VR] = {
-
+    joiner: (V, VO) => VR): KTable[K, VR] =
     inner.join[VO, VR](other.inner, joiner.asValueJoiner)
-  }
 
   /**
    * Join records of this [[KTable]] with another [[KTable]]'s records using non-windowed inner equi join.
@@ -211,10 +200,8 @@ class KTable[K, V](val inner: KTableJ[K, V]) {
    */ 
   def join[VO, VR](other: KTable[K, VO],
     joiner: (V, VO) => VR,
-    materialized: Materialized[K, VR, ByteArrayKeyValueStore]): KTable[K, VR] = {
-
+    materialized: Materialized[K, VR, ByteArrayKeyValueStore]): KTable[K, VR] =
     inner.join[VO, VR](other.inner, joiner.asValueJoiner, materialized)
-  }
 
   /**
    * Join records of this [[KTable]] with another [[KTable]]'s records using non-windowed left equi join.
@@ -226,10 +213,8 @@ class KTable[K, V](val inner: KTableJ[K, V]) {
    * @see `org.apache.kafka.streams.kstream.KTable#leftJoin`
    */ 
   def leftJoin[VO, VR](other: KTable[K, VO],
-    joiner: (V, VO) => VR): KTable[K, VR] = {
-
+    joiner: (V, VO) => VR): KTable[K, VR] =
     inner.leftJoin[VO, VR](other.inner, joiner.asValueJoiner)
-  }
 
   /**
    * Join records of this [[KTable]] with another [[KTable]]'s records using non-windowed left equi join.
@@ -244,10 +229,8 @@ class KTable[K, V](val inner: KTableJ[K, V]) {
    */ 
   def leftJoin[VO, VR](other: KTable[K, VO],
     joiner: (V, VO) => VR,
-    materialized: Materialized[K, VR, ByteArrayKeyValueStore]): KTable[K, VR] = {
-
+    materialized: Materialized[K, VR, ByteArrayKeyValueStore]): KTable[K, VR] =
     inner.leftJoin[VO, VR](other.inner, joiner.asValueJoiner, materialized)
-  }
 
   /**
    * Join records of this [[KTable]] with another [[KTable]]'s records using non-windowed outer equi join.
@@ -259,10 +242,8 @@ class KTable[K, V](val inner: KTableJ[K, V]) {
    * @see `org.apache.kafka.streams.kstream.KTable#leftJoin`
    */ 
   def outerJoin[VO, VR](other: KTable[K, VO],
-    joiner: (V, VO) => VR): KTable[K, VR] = {
-
+    joiner: (V, VO) => VR): KTable[K, VR] =
     inner.outerJoin[VO, VR](other.inner, joiner.asValueJoiner)
-  }
 
   /**
    * Join records of this [[KTable]] with another [[KTable]]'s records using non-windowed outer equi join.
@@ -277,16 +258,13 @@ class KTable[K, V](val inner: KTableJ[K, V]) {
    */ 
   def outerJoin[VO, VR](other: KTable[K, VO],
     joiner: (V, VO) => VR,
-    materialized: Materialized[K, VR, ByteArrayKeyValueStore]): KTable[K, VR] = {
-
+    materialized: Materialized[K, VR, ByteArrayKeyValueStore]): KTable[K, VR] =
     inner.outerJoin[VO, VR](other.inner, joiner.asValueJoiner, materialized)
-  }
 
   /**
    * Get the name of the local state store used that can be used to query this [[KTable]].
    *
    * @return the underlying state store name, or `null` if this [[KTable]] cannot be queried.
    */
-  def queryableStoreName: String =
-    inner.queryableStoreName
+  def queryableStoreName: String = inner.queryableStoreName
 }
diff --git a/streams/streams-scala/src/main/scala/org/apache/kafka/streams/scala/kstream/SessionWindowedKStream.scala b/streams/streams-scala/src/main/scala/org/apache/kafka/streams/scala/kstream/SessionWindowedKStream.scala
index 7e9fa07..1e25554 100644
--- a/streams/streams-scala/src/main/scala/org/apache/kafka/streams/scala/kstream/SessionWindowedKStream.scala
+++ b/streams/streams-scala/src/main/scala/org/apache/kafka/streams/scala/kstream/SessionWindowedKStream.scala
@@ -21,9 +21,6 @@ package org.apache.kafka.streams.scala
 package kstream
 
 import org.apache.kafka.streams.kstream.{SessionWindowedKStream => SessionWindowedKStreamJ, _}
-import org.apache.kafka.streams.state.SessionStore
-import org.apache.kafka.common.utils.Bytes
-
 import org.apache.kafka.streams.scala.ImplicitConversions._
 import org.apache.kafka.streams.scala.FunctionConversions._
 
@@ -50,10 +47,8 @@ class SessionWindowedKStream[K, V](val inner: SessionWindowedKStreamJ[K, V]) {
    */
   def aggregate[VR](initializer: () => VR,
     aggregator: (K, V, VR) => VR,
-    merger: (K, VR, VR) => VR): KTable[Windowed[K], VR] = {
-
+    merger: (K, VR, VR) => VR): KTable[Windowed[K], VR] =
     inner.aggregate(initializer.asInitializer, aggregator.asAggregator, merger.asMerger)
-  }
 
   /**
    * Aggregate the values of records in this stream by the grouped key and defined `SessionWindows`.
@@ -69,10 +64,8 @@ class SessionWindowedKStream[K, V](val inner: SessionWindowedKStreamJ[K, V]) {
   def aggregate[VR](initializer: () => VR,
     aggregator: (K, V, VR) => VR,
     merger: (K, VR, VR) => VR,
-    materialized: Materialized[K, VR, SessionStore[Bytes, Array[Byte]]]): KTable[Windowed[K], VR] = {
-
+    materialized: Materialized[K, VR, ByteArraySessionStore]): KTable[Windowed[K], VR] =
     inner.aggregate(initializer.asInitializer, aggregator.asAggregator, merger.asMerger, materialized)
-  }
 
   /**
    * Count the number of records in this stream by the grouped key into `SessionWindows`.
@@ -83,7 +76,7 @@ class SessionWindowedKStream[K, V](val inner: SessionWindowedKStreamJ[K, V]) {
    */
   def count(): KTable[Windowed[K], Long] = {
     val c: KTable[Windowed[K], java.lang.Long] = inner.count()
-    c.mapValues[Long](Long2long(_))
+    c.mapValues[Long](Long2long _)
   }
 
   /**
@@ -94,8 +87,11 @@ class SessionWindowedKStream[K, V](val inner: SessionWindowedKStreamJ[K, V]) {
    * that represent the latest (rolling) count (i.e., number of records) for each key within a window
    * @see `org.apache.kafka.streams.kstream.SessionWindowedKStream#count`
    */
-  def count(materialized: Materialized[K, Long, SessionStore[Bytes, Array[Byte]]]): KTable[Windowed[K], Long] =
-    inner.count(materialized)
+  def count(materialized: Materialized[K, Long, ByteArraySessionStore]): KTable[Windowed[K], Long] = {
+    val c: KTable[Windowed[K], java.lang.Long] =
+      inner.count(materialized.asInstanceOf[Materialized[K, java.lang.Long, ByteArraySessionStore]])
+    c.mapValues[Long](Long2long _)
+  }
 
   /**
    * Combine values of this stream by the grouped key into {@link SessionWindows}.
@@ -105,9 +101,8 @@ class SessionWindowedKStream[K, V](val inner: SessionWindowedKStreamJ[K, V]) {
    * the latest (rolling) aggregate for each key within a window
    * @see `org.apache.kafka.streams.kstream.SessionWindowedKStream#reduce`
    */
-  def reduce(reducer: (V, V) => V): KTable[Windowed[K], V] = {
+  def reduce(reducer: (V, V) => V): KTable[Windowed[K], V] =
     inner.reduce((v1, v2) => reducer(v1, v2))
-  }
 
   /**
    * Combine values of this stream by the grouped key into {@link SessionWindows}.
@@ -119,7 +114,6 @@ class SessionWindowedKStream[K, V](val inner: SessionWindowedKStreamJ[K, V]) {
    * @see `org.apache.kafka.streams.kstream.SessionWindowedKStream#reduce`
    */
   def reduce(reducer: (V, V) => V,
-    materialized: Materialized[K, V, SessionStore[Bytes, Array[Byte]]]): KTable[Windowed[K], V] = {
+    materialized: Materialized[K, V, ByteArraySessionStore]): KTable[Windowed[K], V] =
     inner.reduce(reducer.asReducer, materialized)
-  }
 }
diff --git a/streams/streams-scala/src/main/scala/org/apache/kafka/streams/scala/kstream/TimeWindowedKStream.scala b/streams/streams-scala/src/main/scala/org/apache/kafka/streams/scala/kstream/TimeWindowedKStream.scala
index 226192f..b00d025 100644
--- a/streams/streams-scala/src/main/scala/org/apache/kafka/streams/scala/kstream/TimeWindowedKStream.scala
+++ b/streams/streams-scala/src/main/scala/org/apache/kafka/streams/scala/kstream/TimeWindowedKStream.scala
@@ -21,8 +21,6 @@ package org.apache.kafka.streams.scala
 package kstream
 
 import org.apache.kafka.streams.kstream.{TimeWindowedKStream => TimeWindowedKStreamJ, _}
-import org.apache.kafka.streams.state.WindowStore
-import org.apache.kafka.common.utils.Bytes
 import org.apache.kafka.streams.scala.ImplicitConversions._
 import org.apache.kafka.streams.scala.FunctionConversions._
 
@@ -47,10 +45,8 @@ class TimeWindowedKStream[K, V](val inner: TimeWindowedKStreamJ[K, V]) {
    * @see `org.apache.kafka.streams.kstream.TimeWindowedKStream#aggregate`
    */
   def aggregate[VR](initializer: () => VR,
-    aggregator: (K, V, VR) => VR): KTable[Windowed[K], VR] = {
-
+    aggregator: (K, V, VR) => VR): KTable[Windowed[K], VR] =
     inner.aggregate(initializer.asInitializer, aggregator.asAggregator)
-  }
 
   /**
    * Aggregate the values of records in this stream by the grouped key.
@@ -64,10 +60,8 @@ class TimeWindowedKStream[K, V](val inner: TimeWindowedKStreamJ[K, V]) {
    */
   def aggregate[VR](initializer: () => VR,
     aggregator: (K, V, VR) => VR,
-    materialized: Materialized[K, VR, WindowStore[Bytes, Array[Byte]]]): KTable[Windowed[K], VR] = {
-
+    materialized: Materialized[K, VR, ByteArrayWindowStore]): KTable[Windowed[K], VR] =
     inner.aggregate(initializer.asInitializer, aggregator.asAggregator, materialized)
-  }
 
   /**
    * Count the number of records in this stream by the grouped key and the defined windows.
@@ -78,7 +72,7 @@ class TimeWindowedKStream[K, V](val inner: TimeWindowedKStreamJ[K, V]) {
    */ 
   def count(): KTable[Windowed[K], Long] = {
     val c: KTable[Windowed[K], java.lang.Long] = inner.count()
-    c.mapValues[Long](Long2long(_))
+    c.mapValues[Long](Long2long _)
   }
 
   /**
@@ -89,10 +83,10 @@ class TimeWindowedKStream[K, V](val inner: TimeWindowedKStreamJ[K, V]) {
    * represent the latest (rolling) count (i.e., number of records) for each key
    * @see `org.apache.kafka.streams.kstream.TimeWindowedKStream#count`
    */ 
-  def count(materialized: Materialized[K, Long, WindowStore[Bytes, Array[Byte]]]): KTable[Windowed[K], Long] = {
-    val c: KTable[Windowed[K], java.lang.Long] = 
-      inner.count(materialized.asInstanceOf[Materialized[K, java.lang.Long, WindowStore[Bytes, Array[Byte]]]])
-    c.mapValues[Long](Long2long(_))
+  def count(materialized: Materialized[K, Long, ByteArrayWindowStore]): KTable[Windowed[K], Long] = {
+    val c: KTable[Windowed[K], java.lang.Long] =
+      inner.count(materialized.asInstanceOf[Materialized[K, java.lang.Long, ByteArrayWindowStore]])
+    c.mapValues[Long](Long2long _)
   }
 
   /**
@@ -103,9 +97,8 @@ class TimeWindowedKStream[K, V](val inner: TimeWindowedKStreamJ[K, V]) {
    * latest (rolling) aggregate for each key
    * @see `org.apache.kafka.streams.kstream.TimeWindowedKStream#reduce`
    */
-  def reduce(reducer: (V, V) => V): KTable[Windowed[K], V] = {
+  def reduce(reducer: (V, V) => V): KTable[Windowed[K], V] =
     inner.reduce(reducer.asReducer)
-  }
 
   /**
    * Combine the values of records in this stream by the grouped key.
@@ -117,8 +110,6 @@ class TimeWindowedKStream[K, V](val inner: TimeWindowedKStreamJ[K, V]) {
    * @see `org.apache.kafka.streams.kstream.TimeWindowedKStream#reduce`
    */
   def reduce(reducer: (V, V) => V,
-    materialized: Materialized[K, V, WindowStore[Bytes, Array[Byte]]]): KTable[Windowed[K], V] = {
-
+    materialized: Materialized[K, V, ByteArrayWindowStore]): KTable[Windowed[K], V] =
     inner.reduce(reducer.asReducer, materialized)
-  }
 }
diff --git a/streams/streams-scala/src/test/scala/org/apache/kafka/streams/scala/WordCountTest.scala b/streams/streams-scala/src/test/scala/org/apache/kafka/streams/scala/WordCountTest.scala
index 17fa35c..12b8c8c 100644
--- a/streams/streams-scala/src/test/scala/org/apache/kafka/streams/scala/WordCountTest.scala
+++ b/streams/streams-scala/src/test/scala/org/apache/kafka/streams/scala/WordCountTest.scala
@@ -30,6 +30,7 @@ import org.junit.rules.TemporaryFolder
 import org.apache.kafka.streams.KeyValue
 import org.apache.kafka.streams._
 import org.apache.kafka.streams.scala.kstream._
+import org.apache.kafka.streams.kstream.Materialized
 
 import org.apache.kafka.streams.integration.utils.{EmbeddedKafkaCluster, IntegrationTestUtils}
 import org.apache.kafka.clients.consumer.ConsumerConfig
@@ -74,7 +75,6 @@ class WordCountTest extends JUnitSuite with WordCountTestData {
   }
 
   @Test def testShouldCountWords(): Unit = {
-
     import Serdes._
 
     val streamsConfiguration = getStreamsConfiguration()
@@ -105,6 +105,37 @@ class WordCountTest extends JUnitSuite with WordCountTestData {
     assertEquals(actualWordCounts.asScala.take(expectedWordCounts.size).sortBy(_.key), expectedWordCounts.sortBy(_.key))
   }
 
+  @Test def testShouldCountWordsMaterialized(): Unit = {
+    import Serdes._
+
+    val streamsConfiguration = getStreamsConfiguration()
+
+    val streamBuilder = new StreamsBuilder
+    val textLines = streamBuilder.stream[String, String](inputTopic)
+
+    val pattern = Pattern.compile("\\W+", Pattern.UNICODE_CHARACTER_CLASS)
+
+    // generate word counts
+    val wordCounts: KTable[String, Long] =
+      textLines.flatMapValues(v => pattern.split(v.toLowerCase))
+        .groupBy((k, v) => v)
+        .count(Materialized.as("word-count"))
+
+    // write to output topic
+    wordCounts.toStream.to(outputTopic)
+
+    val streams: KafkaStreams = new KafkaStreams(streamBuilder.build(), streamsConfiguration)
+    streams.start()
+
+    // produce and consume synchronously
+    val actualWordCounts: java.util.List[KeyValue[String, Long]] = produceNConsume(inputTopic, outputTopic)
+
+    streams.close()
+
+    import collection.JavaConverters._
+    assertEquals(actualWordCounts.asScala.take(expectedWordCounts.size).sortBy(_.key), expectedWordCounts.sortBy(_.key))
+  }
+
   @Test def testShouldCountWordsJava(): Unit = {
 
     import org.apache.kafka.streams.{KafkaStreams => KafkaStreamsJ, StreamsBuilder => StreamsBuilderJ}

-- 
To stop receiving notification emails like this one, please contact
guozhang@apache.org.