You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@kafka.apache.org by gu...@apache.org on 2018/05/11 17:15:55 UTC
[kafka] branch trunk updated: MINOR: Count fix and Type alias
refactor in Streams Scala API (#4966)
This is an automated email from the ASF dual-hosted git repository.
guozhang pushed a commit to branch trunk
in repository https://gitbox.apache.org/repos/asf/kafka.git
The following commit(s) were added to refs/heads/trunk by this push:
new 40d191b MINOR: Count fix and Type alias refactor in Streams Scala API (#4966)
40d191b is described below
commit 40d191b563cb8e94b28b15a217a759a1df9b6759
Author: Joan Goyeau <jo...@goyeau.com>
AuthorDate: Fri May 11 18:15:48 2018 +0100
MINOR: Count fix and Type alias refactor in Streams Scala API (#4966)
Reviewers: Debasish Ghosh <dg...@acm.org>, Guozhang Wang <gu...@confluent.io>
---
.../scala/org/apache/kafka/streams/package.scala | 4 +-
.../kafka/streams/scala/StreamsBuilder.scala | 1 -
.../streams/scala/kstream/KGroupedStream.scala | 14 +++---
.../streams/scala/kstream/KGroupedTable.scala | 29 +++++-------
.../kafka/streams/scala/kstream/KTable.scala | 54 +++++++---------------
.../scala/kstream/SessionWindowedKStream.scala | 26 ++++-------
.../scala/kstream/TimeWindowedKStream.scala | 27 ++++-------
.../apache/kafka/streams/scala/WordCountTest.scala | 33 ++++++++++++-
8 files changed, 88 insertions(+), 100 deletions(-)
diff --git a/streams/streams-scala/src/main/scala/org/apache/kafka/streams/package.scala b/streams/streams-scala/src/main/scala/org/apache/kafka/streams/package.scala
index 864fd19..01f9833 100644
--- a/streams/streams-scala/src/main/scala/org/apache/kafka/streams/package.scala
+++ b/streams/streams-scala/src/main/scala/org/apache/kafka/streams/package.scala
@@ -19,9 +19,11 @@
*/
package org.apache.kafka.streams
-import org.apache.kafka.streams.state.KeyValueStore
+import org.apache.kafka.streams.state.{KeyValueStore, SessionStore, WindowStore}
import org.apache.kafka.common.utils.Bytes
package object scala {
type ByteArrayKeyValueStore = KeyValueStore[Bytes, Array[Byte]]
+ type ByteArraySessionStore = SessionStore[Bytes, Array[Byte]]
+ type ByteArrayWindowStore = WindowStore[Bytes, Array[Byte]]
}
diff --git a/streams/streams-scala/src/main/scala/org/apache/kafka/streams/scala/StreamsBuilder.scala b/streams/streams-scala/src/main/scala/org/apache/kafka/streams/scala/StreamsBuilder.scala
index 397af32..3f58dd3 100644
--- a/streams/streams-scala/src/main/scala/org/apache/kafka/streams/scala/StreamsBuilder.scala
+++ b/streams/streams-scala/src/main/scala/org/apache/kafka/streams/scala/StreamsBuilder.scala
@@ -25,7 +25,6 @@ import org.apache.kafka.streams.kstream.{GlobalKTable, Materialized}
import org.apache.kafka.streams.processor.{ProcessorSupplier, StateStore}
import org.apache.kafka.streams.state.StoreBuilder
import org.apache.kafka.streams.{Consumed, StreamsBuilder => StreamsBuilderJ, Topology}
-
import org.apache.kafka.streams.scala.kstream._
import ImplicitConversions._
import scala.collection.JavaConverters._
diff --git a/streams/streams-scala/src/main/scala/org/apache/kafka/streams/scala/kstream/KGroupedStream.scala b/streams/streams-scala/src/main/scala/org/apache/kafka/streams/scala/kstream/KGroupedStream.scala
index 4cee0ac..acffb1f 100644
--- a/streams/streams-scala/src/main/scala/org/apache/kafka/streams/scala/kstream/KGroupedStream.scala
+++ b/streams/streams-scala/src/main/scala/org/apache/kafka/streams/scala/kstream/KGroupedStream.scala
@@ -58,8 +58,9 @@ class KGroupedStream[K, V](val inner: KGroupedStreamJ[K, V]) {
* represent the latest (rolling) count (i.e., number of records) for each key
* @see `org.apache.kafka.streams.kstream.KGroupedStream#count`
*/
- def count(materialized: Materialized[K, Long, ByteArrayKeyValueStore]): KTable[K, Long] = {
- val c: KTable[K, java.lang.Long] = inner.count(materialized.asInstanceOf[Materialized[K, java.lang.Long, ByteArrayKeyValueStore]])
+ def count(materialized: Materialized[K, Long, ByteArrayKeyValueStore]): KTable[K, Long] = {
+ val c: KTable[K, java.lang.Long] =
+ inner.count(materialized.asInstanceOf[Materialized[K, java.lang.Long, ByteArrayKeyValueStore]])
c.mapValues[Long](Long2long _)
}
@@ -71,9 +72,8 @@ class KGroupedStream[K, V](val inner: KGroupedStreamJ[K, V]) {
* latest (rolling) aggregate for each key
* @see `org.apache.kafka.streams.kstream.KGroupedStream#reduce`
*/
- def reduce(reducer: (V, V) => V): KTable[K, V] = {
+ def reduce(reducer: (V, V) => V): KTable[K, V] =
inner.reduce(reducer.asReducer)
- }
/**
* Combine the values of records in this stream by the grouped key.
@@ -102,9 +102,8 @@ class KGroupedStream[K, V](val inner: KGroupedStreamJ[K, V]) {
* @see `org.apache.kafka.streams.kstream.KGroupedStream#aggregate`
*/
def aggregate[VR](initializer: () => VR,
- aggregator: (K, V, VR) => VR): KTable[K, VR] = {
+ aggregator: (K, V, VR) => VR): KTable[K, VR] =
inner.aggregate(initializer.asInitializer, aggregator.asAggregator)
- }
/**
* Aggregate the values of records in this stream by the grouped key.
@@ -118,9 +117,8 @@ class KGroupedStream[K, V](val inner: KGroupedStreamJ[K, V]) {
*/
def aggregate[VR](initializer: () => VR,
aggregator: (K, V, VR) => VR,
- materialized: Materialized[K, VR, ByteArrayKeyValueStore]): KTable[K, VR] = {
+ materialized: Materialized[K, VR, ByteArrayKeyValueStore]): KTable[K, VR] =
inner.aggregate(initializer.asInitializer, aggregator.asAggregator, materialized)
- }
/**
* Create a new [[SessionWindowedKStream]] instance that can be used to perform session windowed aggregations.
diff --git a/streams/streams-scala/src/main/scala/org/apache/kafka/streams/scala/kstream/KGroupedTable.scala b/streams/streams-scala/src/main/scala/org/apache/kafka/streams/scala/kstream/KGroupedTable.scala
index 57c44fc..673ab5d 100644
--- a/streams/streams-scala/src/main/scala/org/apache/kafka/streams/scala/kstream/KGroupedTable.scala
+++ b/streams/streams-scala/src/main/scala/org/apache/kafka/streams/scala/kstream/KGroupedTable.scala
@@ -42,10 +42,10 @@ class KGroupedTable[K, V](inner: KGroupedTableJ[K, V]) {
* @return a [[KTable]] that contains "update" records with unmodified keys and `Long` values that
* represent the latest (rolling) count (i.e., number of records) for each key
* @see `org.apache.kafka.streams.kstream.KGroupedTable#count`
- */
+ */
def count(): KTable[K, Long] = {
val c: KTable[K, java.lang.Long] = inner.count()
- c.mapValues[Long](Long2long(_))
+ c.mapValues[Long](Long2long _)
}
/**
@@ -56,9 +56,12 @@ class KGroupedTable[K, V](inner: KGroupedTableJ[K, V]) {
* @return a [[KTable]] that contains "update" records with unmodified keys and `Long` values that
* represent the latest (rolling) count (i.e., number of records) for each key
* @see `org.apache.kafka.streams.kstream.KGroupedTable#count`
- */
- def count(materialized: Materialized[K, Long, ByteArrayKeyValueStore]): KTable[K, Long] =
- inner.count(materialized)
+ */
+ def count(materialized: Materialized[K, Long, ByteArrayKeyValueStore]): KTable[K, Long] = {
+ val c: KTable[K, java.lang.Long] =
+ inner.count(materialized.asInstanceOf[Materialized[K, java.lang.Long, ByteArrayKeyValueStore]])
+ c.mapValues[Long](Long2long _)
+ }
/**
* Combine the value of records of the original [[KTable]] that got [[KTable#groupBy]]
@@ -71,12 +74,10 @@ class KGroupedTable[K, V](inner: KGroupedTableJ[K, V]) {
* @see `org.apache.kafka.streams.kstream.KGroupedTable#reduce`
*/
def reduce(adder: (V, V) => V,
- subtractor: (V, V) => V): KTable[K, V] = {
-
+ subtractor: (V, V) => V): KTable[K, V] =
// need this explicit asReducer for Scala 2.11 or else the SAM conversion doesn't take place
// works perfectly with Scala 2.12 though
inner.reduce(adder.asReducer, subtractor.asReducer)
- }
/**
* Combine the value of records of the original [[KTable]] that got [[KTable#groupBy]]
@@ -91,12 +92,10 @@ class KGroupedTable[K, V](inner: KGroupedTableJ[K, V]) {
*/
def reduce(adder: (V, V) => V,
subtractor: (V, V) => V,
- materialized: Materialized[K, V, ByteArrayKeyValueStore]): KTable[K, V] = {
-
+ materialized: Materialized[K, V, ByteArrayKeyValueStore]): KTable[K, V] =
// need this explicit asReducer for Scala 2.11 or else the SAM conversion doesn't take place
// works perfectly with Scala 2.12 though
inner.reduce(adder.asReducer, subtractor.asReducer, materialized)
- }
/**
* Aggregate the value of records of the original [[KTable]] that got [[KTable#groupBy]]
@@ -111,10 +110,8 @@ class KGroupedTable[K, V](inner: KGroupedTableJ[K, V]) {
*/
def aggregate[VR](initializer: () => VR,
adder: (K, V, VR) => VR,
- subtractor: (K, V, VR) => VR): KTable[K, VR] = {
-
+ subtractor: (K, V, VR) => VR): KTable[K, VR] =
inner.aggregate(initializer.asInitializer, adder.asAggregator, subtractor.asAggregator)
- }
/**
* Aggregate the value of records of the original [[KTable]] that got [[KTable#groupBy]]
@@ -131,8 +128,6 @@ class KGroupedTable[K, V](inner: KGroupedTableJ[K, V]) {
def aggregate[VR](initializer: () => VR,
adder: (K, V, VR) => VR,
subtractor: (K, V, VR) => VR,
- materialized: Materialized[K, VR, ByteArrayKeyValueStore]): KTable[K, VR] = {
-
+ materialized: Materialized[K, VR, ByteArrayKeyValueStore]): KTable[K, VR] =
inner.aggregate(initializer.asInitializer, adder.asAggregator, subtractor.asAggregator, materialized)
- }
}
diff --git a/streams/streams-scala/src/main/scala/org/apache/kafka/streams/scala/kstream/KTable.scala b/streams/streams-scala/src/main/scala/org/apache/kafka/streams/scala/kstream/KTable.scala
index 0369ee5..218063e 100644
--- a/streams/streams-scala/src/main/scala/org/apache/kafka/streams/scala/kstream/KTable.scala
+++ b/streams/streams-scala/src/main/scala/org/apache/kafka/streams/scala/kstream/KTable.scala
@@ -58,9 +58,8 @@ class KTable[K, V](val inner: KTableJ[K, V]) {
* @see `org.apache.kafka.streams.kstream.KTable#filter`
*/
def filter(predicate: (K, V) => Boolean,
- materialized: Materialized[K, V, ByteArrayKeyValueStore]): KTable[K, V] = {
+ materialized: Materialized[K, V, ByteArrayKeyValueStore]): KTable[K, V] =
inner.filter(predicate.asPredicate, materialized)
- }
/**
* Create a new [[KTable]] that consists all records of this [[KTable]] which do <em>not</em> satisfy the given
@@ -70,9 +69,8 @@ class KTable[K, V](val inner: KTableJ[K, V]) {
* @return a [[KTable]] that contains only those records that do <em>not</em> satisfy the given predicate
* @see `org.apache.kafka.streams.kstream.KTable#filterNot`
*/
- def filterNot(predicate: (K, V) => Boolean): KTable[K, V] = {
+ def filterNot(predicate: (K, V) => Boolean): KTable[K, V] =
inner.filterNot(predicate(_, _))
- }
/**
* Create a new [[KTable]] that consists all records of this [[KTable]] which do <em>not</em> satisfy the given
@@ -85,9 +83,8 @@ class KTable[K, V](val inner: KTableJ[K, V]) {
* @see `org.apache.kafka.streams.kstream.KTable#filterNot`
*/
def filterNot(predicate: (K, V) => Boolean,
- materialized: Materialized[K, V, ByteArrayKeyValueStore]): KTable[K, V] = {
+ materialized: Materialized[K, V, ByteArrayKeyValueStore]): KTable[K, V] =
inner.filterNot(predicate.asPredicate, materialized)
- }
/**
* Create a new [[KTable]] by transforming the value of each record in this [[KTable]] into a new value
@@ -99,9 +96,8 @@ class KTable[K, V](val inner: KTableJ[K, V]) {
* @return a [[KTable]] that contains records with unmodified key and new values (possibly of different type)
* @see `org.apache.kafka.streams.kstream.KTable#mapValues`
*/
- def mapValues[VR](mapper: V => VR): KTable[K, VR] = {
+ def mapValues[VR](mapper: V => VR): KTable[K, VR] =
inner.mapValues[VR](mapper.asValueMapper)
- }
/**
* Create a new [[KTable]] by transforming the value of each record in this [[KTable]] into a new value
@@ -116,9 +112,8 @@ class KTable[K, V](val inner: KTableJ[K, V]) {
* @see `org.apache.kafka.streams.kstream.KTable#mapValues`
*/
def mapValues[VR](mapper: V => VR,
- materialized: Materialized[K, VR, ByteArrayKeyValueStore]): KTable[K, VR] = {
+ materialized: Materialized[K, VR, ByteArrayKeyValueStore]): KTable[K, VR] =
inner.mapValues[VR](mapper.asValueMapper, materialized)
- }
/**
* Create a new [[KTable]] by transforming the value of each record in this [[KTable]] into a new value
@@ -130,9 +125,8 @@ class KTable[K, V](val inner: KTableJ[K, V]) {
* @return a [[KTable]] that contains records with unmodified key and new values (possibly of different type)
* @see `org.apache.kafka.streams.kstream.KTable#mapValues`
*/
- def mapValues[VR](mapper: (K, V) => VR): KTable[K, VR] = {
+ def mapValues[VR](mapper: (K, V) => VR): KTable[K, VR] =
inner.mapValues[VR](mapper.asValueMapperWithKey)
- }
/**
* Create a new [[KTable]] by transforming the value of each record in this [[KTable]] into a new value
@@ -147,9 +141,8 @@ class KTable[K, V](val inner: KTableJ[K, V]) {
* @see `org.apache.kafka.streams.kstream.KTable#mapValues`
*/
def mapValues[VR](mapper: (K, V) => VR,
- materialized: Materialized[K, VR, ByteArrayKeyValueStore]): KTable[K, VR] = {
+ materialized: Materialized[K, VR, ByteArrayKeyValueStore]): KTable[K, VR] =
inner.mapValues[VR](mapper.asValueMapperWithKey)
- }
/**
* Convert this changelog stream to a [[KStream]].
@@ -166,9 +159,8 @@ class KTable[K, V](val inner: KTableJ[K, V]) {
* @return a [[KStream]] that contains the same records as this [[KTable]]
* @see `org.apache.kafka.streams.kstream.KTable#toStream`
*/
- def toStream[KR](mapper: (K, V) => KR): KStream[KR, V] = {
+ def toStream[KR](mapper: (K, V) => KR): KStream[KR, V] =
inner.toStream[KR](mapper.asKeyValueMapper)
- }
/**
* Re-groups the records of this [[KTable]] using the provided key/value mapper
@@ -179,9 +171,8 @@ class KTable[K, V](val inner: KTableJ[K, V]) {
* @return a [[KGroupedTable]] that contains the re-grouped records of the original [[KTable]]
* @see `org.apache.kafka.streams.kstream.KTable#groupBy`
*/
- def groupBy[KR, VR](selector: (K, V) => (KR, VR))(implicit serialized: Serialized[KR, VR]): KGroupedTable[KR, VR] = {
+ def groupBy[KR, VR](selector: (K, V) => (KR, VR))(implicit serialized: Serialized[KR, VR]): KGroupedTable[KR, VR] =
inner.groupBy(selector.asKeyValueMapper, serialized)
- }
/**
* Join records of this [[KTable]] with another [[KTable]]'s records using non-windowed inner equi join.
@@ -193,10 +184,8 @@ class KTable[K, V](val inner: KTableJ[K, V]) {
* @see `org.apache.kafka.streams.kstream.KTable#join`
*/
def join[VO, VR](other: KTable[K, VO],
- joiner: (V, VO) => VR): KTable[K, VR] = {
-
+ joiner: (V, VO) => VR): KTable[K, VR] =
inner.join[VO, VR](other.inner, joiner.asValueJoiner)
- }
/**
* Join records of this [[KTable]] with another [[KTable]]'s records using non-windowed inner equi join.
@@ -211,10 +200,8 @@ class KTable[K, V](val inner: KTableJ[K, V]) {
*/
def join[VO, VR](other: KTable[K, VO],
joiner: (V, VO) => VR,
- materialized: Materialized[K, VR, ByteArrayKeyValueStore]): KTable[K, VR] = {
-
+ materialized: Materialized[K, VR, ByteArrayKeyValueStore]): KTable[K, VR] =
inner.join[VO, VR](other.inner, joiner.asValueJoiner, materialized)
- }
/**
* Join records of this [[KTable]] with another [[KTable]]'s records using non-windowed left equi join.
@@ -226,10 +213,8 @@ class KTable[K, V](val inner: KTableJ[K, V]) {
* @see `org.apache.kafka.streams.kstream.KTable#leftJoin`
*/
def leftJoin[VO, VR](other: KTable[K, VO],
- joiner: (V, VO) => VR): KTable[K, VR] = {
-
+ joiner: (V, VO) => VR): KTable[K, VR] =
inner.leftJoin[VO, VR](other.inner, joiner.asValueJoiner)
- }
/**
* Join records of this [[KTable]] with another [[KTable]]'s records using non-windowed left equi join.
@@ -244,10 +229,8 @@ class KTable[K, V](val inner: KTableJ[K, V]) {
*/
def leftJoin[VO, VR](other: KTable[K, VO],
joiner: (V, VO) => VR,
- materialized: Materialized[K, VR, ByteArrayKeyValueStore]): KTable[K, VR] = {
-
+ materialized: Materialized[K, VR, ByteArrayKeyValueStore]): KTable[K, VR] =
inner.leftJoin[VO, VR](other.inner, joiner.asValueJoiner, materialized)
- }
/**
* Join records of this [[KTable]] with another [[KTable]]'s records using non-windowed outer equi join.
@@ -259,10 +242,8 @@ class KTable[K, V](val inner: KTableJ[K, V]) {
* @see `org.apache.kafka.streams.kstream.KTable#leftJoin`
*/
def outerJoin[VO, VR](other: KTable[K, VO],
- joiner: (V, VO) => VR): KTable[K, VR] = {
-
+ joiner: (V, VO) => VR): KTable[K, VR] =
inner.outerJoin[VO, VR](other.inner, joiner.asValueJoiner)
- }
/**
* Join records of this [[KTable]] with another [[KTable]]'s records using non-windowed outer equi join.
@@ -277,16 +258,13 @@ class KTable[K, V](val inner: KTableJ[K, V]) {
*/
def outerJoin[VO, VR](other: KTable[K, VO],
joiner: (V, VO) => VR,
- materialized: Materialized[K, VR, ByteArrayKeyValueStore]): KTable[K, VR] = {
-
+ materialized: Materialized[K, VR, ByteArrayKeyValueStore]): KTable[K, VR] =
inner.outerJoin[VO, VR](other.inner, joiner.asValueJoiner, materialized)
- }
/**
* Get the name of the local state store used that can be used to query this [[KTable]].
*
* @return the underlying state store name, or `null` if this [[KTable]] cannot be queried.
*/
- def queryableStoreName: String =
- inner.queryableStoreName
+ def queryableStoreName: String = inner.queryableStoreName
}
diff --git a/streams/streams-scala/src/main/scala/org/apache/kafka/streams/scala/kstream/SessionWindowedKStream.scala b/streams/streams-scala/src/main/scala/org/apache/kafka/streams/scala/kstream/SessionWindowedKStream.scala
index 7e9fa07..1e25554 100644
--- a/streams/streams-scala/src/main/scala/org/apache/kafka/streams/scala/kstream/SessionWindowedKStream.scala
+++ b/streams/streams-scala/src/main/scala/org/apache/kafka/streams/scala/kstream/SessionWindowedKStream.scala
@@ -21,9 +21,6 @@ package org.apache.kafka.streams.scala
package kstream
import org.apache.kafka.streams.kstream.{SessionWindowedKStream => SessionWindowedKStreamJ, _}
-import org.apache.kafka.streams.state.SessionStore
-import org.apache.kafka.common.utils.Bytes
-
import org.apache.kafka.streams.scala.ImplicitConversions._
import org.apache.kafka.streams.scala.FunctionConversions._
@@ -50,10 +47,8 @@ class SessionWindowedKStream[K, V](val inner: SessionWindowedKStreamJ[K, V]) {
*/
def aggregate[VR](initializer: () => VR,
aggregator: (K, V, VR) => VR,
- merger: (K, VR, VR) => VR): KTable[Windowed[K], VR] = {
-
+ merger: (K, VR, VR) => VR): KTable[Windowed[K], VR] =
inner.aggregate(initializer.asInitializer, aggregator.asAggregator, merger.asMerger)
- }
/**
* Aggregate the values of records in this stream by the grouped key and defined `SessionWindows`.
@@ -69,10 +64,8 @@ class SessionWindowedKStream[K, V](val inner: SessionWindowedKStreamJ[K, V]) {
def aggregate[VR](initializer: () => VR,
aggregator: (K, V, VR) => VR,
merger: (K, VR, VR) => VR,
- materialized: Materialized[K, VR, SessionStore[Bytes, Array[Byte]]]): KTable[Windowed[K], VR] = {
-
+ materialized: Materialized[K, VR, ByteArraySessionStore]): KTable[Windowed[K], VR] =
inner.aggregate(initializer.asInitializer, aggregator.asAggregator, merger.asMerger, materialized)
- }
/**
* Count the number of records in this stream by the grouped key into `SessionWindows`.
@@ -83,7 +76,7 @@ class SessionWindowedKStream[K, V](val inner: SessionWindowedKStreamJ[K, V]) {
*/
def count(): KTable[Windowed[K], Long] = {
val c: KTable[Windowed[K], java.lang.Long] = inner.count()
- c.mapValues[Long](Long2long(_))
+ c.mapValues[Long](Long2long _)
}
/**
@@ -94,8 +87,11 @@ class SessionWindowedKStream[K, V](val inner: SessionWindowedKStreamJ[K, V]) {
* that represent the latest (rolling) count (i.e., number of records) for each key within a window
* @see `org.apache.kafka.streams.kstream.SessionWindowedKStream#count`
*/
- def count(materialized: Materialized[K, Long, SessionStore[Bytes, Array[Byte]]]): KTable[Windowed[K], Long] =
- inner.count(materialized)
+ def count(materialized: Materialized[K, Long, ByteArraySessionStore]): KTable[Windowed[K], Long] = {
+ val c: KTable[Windowed[K], java.lang.Long] =
+ inner.count(materialized.asInstanceOf[Materialized[K, java.lang.Long, ByteArraySessionStore]])
+ c.mapValues[Long](Long2long _)
+ }
/**
* Combine values of this stream by the grouped key into {@link SessionWindows}.
@@ -105,9 +101,8 @@ class SessionWindowedKStream[K, V](val inner: SessionWindowedKStreamJ[K, V]) {
* the latest (rolling) aggregate for each key within a window
* @see `org.apache.kafka.streams.kstream.SessionWindowedKStream#reduce`
*/
- def reduce(reducer: (V, V) => V): KTable[Windowed[K], V] = {
+ def reduce(reducer: (V, V) => V): KTable[Windowed[K], V] =
inner.reduce((v1, v2) => reducer(v1, v2))
- }
/**
* Combine values of this stream by the grouped key into {@link SessionWindows}.
@@ -119,7 +114,6 @@ class SessionWindowedKStream[K, V](val inner: SessionWindowedKStreamJ[K, V]) {
* @see `org.apache.kafka.streams.kstream.SessionWindowedKStream#reduce`
*/
def reduce(reducer: (V, V) => V,
- materialized: Materialized[K, V, SessionStore[Bytes, Array[Byte]]]): KTable[Windowed[K], V] = {
+ materialized: Materialized[K, V, ByteArraySessionStore]): KTable[Windowed[K], V] =
inner.reduce(reducer.asReducer, materialized)
- }
}
diff --git a/streams/streams-scala/src/main/scala/org/apache/kafka/streams/scala/kstream/TimeWindowedKStream.scala b/streams/streams-scala/src/main/scala/org/apache/kafka/streams/scala/kstream/TimeWindowedKStream.scala
index 226192f..b00d025 100644
--- a/streams/streams-scala/src/main/scala/org/apache/kafka/streams/scala/kstream/TimeWindowedKStream.scala
+++ b/streams/streams-scala/src/main/scala/org/apache/kafka/streams/scala/kstream/TimeWindowedKStream.scala
@@ -21,8 +21,6 @@ package org.apache.kafka.streams.scala
package kstream
import org.apache.kafka.streams.kstream.{TimeWindowedKStream => TimeWindowedKStreamJ, _}
-import org.apache.kafka.streams.state.WindowStore
-import org.apache.kafka.common.utils.Bytes
import org.apache.kafka.streams.scala.ImplicitConversions._
import org.apache.kafka.streams.scala.FunctionConversions._
@@ -47,10 +45,8 @@ class TimeWindowedKStream[K, V](val inner: TimeWindowedKStreamJ[K, V]) {
* @see `org.apache.kafka.streams.kstream.TimeWindowedKStream#aggregate`
*/
def aggregate[VR](initializer: () => VR,
- aggregator: (K, V, VR) => VR): KTable[Windowed[K], VR] = {
-
+ aggregator: (K, V, VR) => VR): KTable[Windowed[K], VR] =
inner.aggregate(initializer.asInitializer, aggregator.asAggregator)
- }
/**
* Aggregate the values of records in this stream by the grouped key.
@@ -64,10 +60,8 @@ class TimeWindowedKStream[K, V](val inner: TimeWindowedKStreamJ[K, V]) {
*/
def aggregate[VR](initializer: () => VR,
aggregator: (K, V, VR) => VR,
- materialized: Materialized[K, VR, WindowStore[Bytes, Array[Byte]]]): KTable[Windowed[K], VR] = {
-
+ materialized: Materialized[K, VR, ByteArrayWindowStore]): KTable[Windowed[K], VR] =
inner.aggregate(initializer.asInitializer, aggregator.asAggregator, materialized)
- }
/**
* Count the number of records in this stream by the grouped key and the defined windows.
@@ -78,7 +72,7 @@ class TimeWindowedKStream[K, V](val inner: TimeWindowedKStreamJ[K, V]) {
*/
def count(): KTable[Windowed[K], Long] = {
val c: KTable[Windowed[K], java.lang.Long] = inner.count()
- c.mapValues[Long](Long2long(_))
+ c.mapValues[Long](Long2long _)
}
/**
@@ -89,10 +83,10 @@ class TimeWindowedKStream[K, V](val inner: TimeWindowedKStreamJ[K, V]) {
* represent the latest (rolling) count (i.e., number of records) for each key
* @see `org.apache.kafka.streams.kstream.TimeWindowedKStream#count`
*/
- def count(materialized: Materialized[K, Long, WindowStore[Bytes, Array[Byte]]]): KTable[Windowed[K], Long] = {
- val c: KTable[Windowed[K], java.lang.Long] =
- inner.count(materialized.asInstanceOf[Materialized[K, java.lang.Long, WindowStore[Bytes, Array[Byte]]]])
- c.mapValues[Long](Long2long(_))
+ def count(materialized: Materialized[K, Long, ByteArrayWindowStore]): KTable[Windowed[K], Long] = {
+ val c: KTable[Windowed[K], java.lang.Long] =
+ inner.count(materialized.asInstanceOf[Materialized[K, java.lang.Long, ByteArrayWindowStore]])
+ c.mapValues[Long](Long2long _)
}
/**
@@ -103,9 +97,8 @@ class TimeWindowedKStream[K, V](val inner: TimeWindowedKStreamJ[K, V]) {
* latest (rolling) aggregate for each key
* @see `org.apache.kafka.streams.kstream.TimeWindowedKStream#reduce`
*/
- def reduce(reducer: (V, V) => V): KTable[Windowed[K], V] = {
+ def reduce(reducer: (V, V) => V): KTable[Windowed[K], V] =
inner.reduce(reducer.asReducer)
- }
/**
* Combine the values of records in this stream by the grouped key.
@@ -117,8 +110,6 @@ class TimeWindowedKStream[K, V](val inner: TimeWindowedKStreamJ[K, V]) {
* @see `org.apache.kafka.streams.kstream.TimeWindowedKStream#reduce`
*/
def reduce(reducer: (V, V) => V,
- materialized: Materialized[K, V, WindowStore[Bytes, Array[Byte]]]): KTable[Windowed[K], V] = {
-
+ materialized: Materialized[K, V, ByteArrayWindowStore]): KTable[Windowed[K], V] =
inner.reduce(reducer.asReducer, materialized)
- }
}
diff --git a/streams/streams-scala/src/test/scala/org/apache/kafka/streams/scala/WordCountTest.scala b/streams/streams-scala/src/test/scala/org/apache/kafka/streams/scala/WordCountTest.scala
index 17fa35c..12b8c8c 100644
--- a/streams/streams-scala/src/test/scala/org/apache/kafka/streams/scala/WordCountTest.scala
+++ b/streams/streams-scala/src/test/scala/org/apache/kafka/streams/scala/WordCountTest.scala
@@ -30,6 +30,7 @@ import org.junit.rules.TemporaryFolder
import org.apache.kafka.streams.KeyValue
import org.apache.kafka.streams._
import org.apache.kafka.streams.scala.kstream._
+import org.apache.kafka.streams.kstream.Materialized
import org.apache.kafka.streams.integration.utils.{EmbeddedKafkaCluster, IntegrationTestUtils}
import org.apache.kafka.clients.consumer.ConsumerConfig
@@ -74,7 +75,6 @@ class WordCountTest extends JUnitSuite with WordCountTestData {
}
@Test def testShouldCountWords(): Unit = {
-
import Serdes._
val streamsConfiguration = getStreamsConfiguration()
@@ -105,6 +105,37 @@ class WordCountTest extends JUnitSuite with WordCountTestData {
assertEquals(actualWordCounts.asScala.take(expectedWordCounts.size).sortBy(_.key), expectedWordCounts.sortBy(_.key))
}
+ @Test def testShouldCountWordsMaterialized(): Unit = {
+ import Serdes._
+
+ val streamsConfiguration = getStreamsConfiguration()
+
+ val streamBuilder = new StreamsBuilder
+ val textLines = streamBuilder.stream[String, String](inputTopic)
+
+ val pattern = Pattern.compile("\\W+", Pattern.UNICODE_CHARACTER_CLASS)
+
+ // generate word counts
+ val wordCounts: KTable[String, Long] =
+ textLines.flatMapValues(v => pattern.split(v.toLowerCase))
+ .groupBy((k, v) => v)
+ .count(Materialized.as("word-count"))
+
+ // write to output topic
+ wordCounts.toStream.to(outputTopic)
+
+ val streams: KafkaStreams = new KafkaStreams(streamBuilder.build(), streamsConfiguration)
+ streams.start()
+
+ // produce and consume synchronously
+ val actualWordCounts: java.util.List[KeyValue[String, Long]] = produceNConsume(inputTopic, outputTopic)
+
+ streams.close()
+
+ import collection.JavaConverters._
+ assertEquals(actualWordCounts.asScala.take(expectedWordCounts.size).sortBy(_.key), expectedWordCounts.sortBy(_.key))
+ }
+
@Test def testShouldCountWordsJava(): Unit = {
import org.apache.kafka.streams.{KafkaStreams => KafkaStreamsJ, StreamsBuilder => StreamsBuilderJ}
--
To stop receiving notification emails like this one, please contact
guozhang@apache.org.