You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@kafka.apache.org by gu...@apache.org on 2018/05/08 16:15:39 UTC
[kafka] branch trunk updated: MINOR: Make Serdes less confusing in
Scala (#4963)
This is an automated email from the ASF dual-hosted git repository.
guozhang pushed a commit to branch trunk
in repository https://gitbox.apache.org/repos/asf/kafka.git
The following commit(s) were added to refs/heads/trunk by this push:
new b88d70b MINOR: Make Serdes less confusing in Scala (#4963)
b88d70b is described below
commit b88d70b53290af715034a1f772a271f7e44505fd
Author: Joan Goyeau <jo...@goyeau.com>
AuthorDate: Tue May 8 17:15:31 2018 +0100
MINOR: Make Serdes less confusing in Scala (#4963)
Serdes are confusing in the Scala wrapper:
* We have wrappers around Serializer, Deserializer and Serde which are not very useful.
* We have Serdes in 2 places org.apache.kafka.common.serialization.Serde and in DefaultSerdes, instead we should be having only one place where to find all the Serdes.
I wanted to do this PR before the release as this is a breaking change.
This shouldn't add more so the current tests should be enough.
Reviewers: Debasish Ghosh <dg...@acm.org>, Guozhang Wang <gu...@confluent.io>
---
docs/streams/developer-guide/dsl-api.html | 11 ++--
docs/streams/index.html | 2 +-
.../apache/kafka/streams/scala/DefaultSerdes.scala | 47 --------------
.../apache/kafka/streams/scala/ScalaSerde.scala | 70 ---------------------
.../org/apache/kafka/streams/scala/Serdes.scala | 71 ++++++++++++++++++++++
.../kafka/streams/scala/StreamsBuilder.scala | 4 +-
.../kafka/streams/scala/kstream/KStream.scala | 12 ++--
...bleJoinScalaIntegrationTestImplicitSerdes.scala | 16 ++---
.../apache/kafka/streams/scala/TopologyTest.scala | 14 ++---
.../apache/kafka/streams/scala/WordCountTest.scala | 10 +--
10 files changed, 105 insertions(+), 152 deletions(-)
diff --git a/docs/streams/developer-guide/dsl-api.html b/docs/streams/developer-guide/dsl-api.html
index 2b25072..687dff9 100644
--- a/docs/streams/developer-guide/dsl-api.html
+++ b/docs/streams/developer-guide/dsl-api.html
@@ -3165,8 +3165,7 @@ t=5 (blue), which lead to a merge of sessions and an extension of a session, res
<p>The library also has several utility abstractions and modules that the user needs to use for proper semantics.</p>
<ul>
<li><code class="docutils literal"><span class="pre">org.apache.kafka.streams.scala.ImplicitConversions</span></code>: Module that brings into scope the implicit conversions between the Scala and Java classes.</li>
- <li><code class="docutils literal"><span class="pre">org.apache.kafka.streams.scala.DefaultSerdes</span></code>: Module that brings into scope the implicit values of all primitive SerDes.</li>
- <li><code class="docutils literal"><span class="pre">org.apache.kafka.streams.scala.ScalaSerde</span></code>: Base abstraction that can be used to implement custom SerDes in a type safe way.</li>
+ <li><code class="docutils literal"><span class="pre">org.apache.kafka.streams.scala.Serdes</span></code>: Module that contains all primitive SerDes that can be imported as implicits and a helper to create custom SerDes.</li>
</ul>
<p>The library is cross-built with Scala 2.11 and 2.12. To reference the library compiled against Scala 2.11 include the following in your maven <code>pom.xml</code> add the following:</p>
<pre class="brush: xml;">
@@ -3197,7 +3196,7 @@ import org.apache.kafka.streams.scala.kstream._
import org.apache.kafka.streams.{KafkaStreams, StreamsConfig}
object WordCountApplication extends App {
- import DefaultSerdes._
+ import Serdes._
val config: Properties = {
val p = new Properties()
@@ -3235,7 +3234,7 @@ object WordCountApplication extends App {
// that will set up all Serialized, Produced, Consumed and Joined instances.
// So all APIs below that accept Serialized, Produced, Consumed or Joined will
// get these instances automatically
-import DefaultSerdes._
+import Serdes._
val builder = new StreamsBuilder()
@@ -3260,7 +3259,7 @@ clicksPerRegion.toStream.to(outputTopic)
<p>Quite a few things are going on in the above code snippet that may warrant a few lines of elaboration:</p>
<ol>
<li>The code snippet does not depend on any config defined SerDes. In fact any SerDes defined as part of the config will be ignored.</li>
- <li>All SerDes are picked up from the implicits in scope. And <code class="docutils literal"><span class="pre">import DefaultSerdes._</span></code> brings all necessary SerDes in scope.</li>
+ <li>All SerDes are picked up from the implicits in scope. And <code class="docutils literal"><span class="pre">import Serdes._</span></code> brings all necessary SerDes in scope.</li>
<li>This is an example of compile time type safety that we don't have in the Java APIs.</li>
<li>The code looks less verbose and more focused towards the actual transformation that it does on the data stream.</li>
</ol>
@@ -3277,7 +3276,7 @@ case class UserClicks(clicks: Long)
implicit val userClicksSerde: Serde[UserClicks] = new AvroSerde
// Primitive SerDes
-import DefaultSerdes._
+import Serdes._
// And then business as usual ..
diff --git a/docs/streams/index.html b/docs/streams/index.html
index 72e1323..6dfaf6b 100644
--- a/docs/streams/index.html
+++ b/docs/streams/index.html
@@ -261,7 +261,7 @@ import org.apache.kafka.streams.scala.kstream._
import org.apache.kafka.streams.{KafkaStreams, StreamsConfig}
object WordCountApplication extends App {
- import DefaultSerdes._
+ import Serdes._
val config: Properties = {
val p = new Properties()
diff --git a/streams/streams-scala/src/main/scala/org/apache/kafka/streams/scala/DefaultSerdes.scala b/streams/streams-scala/src/main/scala/org/apache/kafka/streams/scala/DefaultSerdes.scala
deleted file mode 100644
index 3f2840e..0000000
--- a/streams/streams-scala/src/main/scala/org/apache/kafka/streams/scala/DefaultSerdes.scala
+++ /dev/null
@@ -1,47 +0,0 @@
-/*
- * Copyright (C) 2018 Lightbend Inc. <https://www.lightbend.com>
- * Copyright (C) 2017-2018 Alexis Seigneurin.
- *
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.kafka.streams.scala
-
-import java.nio.ByteBuffer
-
-import org.apache.kafka.common.serialization.{Serde, Serdes}
-import org.apache.kafka.common.utils.Bytes
-import org.apache.kafka.streams.kstream.WindowedSerdes
-
-
-/**
- * Implicit values for default serdes.
- * <p>
- * Bring them in scope for default serializers / de-serializers to work.
- */
-object DefaultSerdes {
- implicit val stringSerde: Serde[String] = Serdes.String()
- implicit val longSerde: Serde[Long] = Serdes.Long().asInstanceOf[Serde[Long]]
- implicit val byteArraySerde: Serde[Array[Byte]] = Serdes.ByteArray()
- implicit val bytesSerde: Serde[Bytes] = Serdes.Bytes()
- implicit val floatSerde: Serde[Float] = Serdes.Float().asInstanceOf[Serde[Float]]
- implicit val doubleSerde: Serde[Double] = Serdes.Double().asInstanceOf[Serde[Double]]
- implicit val integerSerde: Serde[Int] = Serdes.Integer().asInstanceOf[Serde[Int]]
- implicit val shortSerde: Serde[Short] = Serdes.Short().asInstanceOf[Serde[Short]]
- implicit val byteBufferSerde: Serde[ByteBuffer] = Serdes.ByteBuffer()
-
- implicit def timeWindowedSerde[T]: WindowedSerdes.TimeWindowedSerde[T] = new WindowedSerdes.TimeWindowedSerde[T]()
- implicit def sessionWindowedSerde[T]: WindowedSerdes.SessionWindowedSerde[T] = new WindowedSerdes.SessionWindowedSerde[T]()
-}
diff --git a/streams/streams-scala/src/main/scala/org/apache/kafka/streams/scala/ScalaSerde.scala b/streams/streams-scala/src/main/scala/org/apache/kafka/streams/scala/ScalaSerde.scala
deleted file mode 100644
index 06afcae..0000000
--- a/streams/streams-scala/src/main/scala/org/apache/kafka/streams/scala/ScalaSerde.scala
+++ /dev/null
@@ -1,70 +0,0 @@
-/*
- * Copyright (C) 2018 Lightbend Inc. <https://www.lightbend.com>
- * Copyright (C) 2017-2018 Alexis Seigneurin.
- *
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.kafka.streams.scala
-
-import org.apache.kafka.common.serialization.{Serde, Deserializer => JDeserializer, Serializer => JSerializer}
-
-trait ScalaSerde[T] extends Serde[T] {
- override def deserializer(): JDeserializer[T]
-
- override def serializer(): JSerializer[T]
-
- override def configure(configs: java.util.Map[String, _], isKey: Boolean): Unit = ()
-
- override def close(): Unit = ()
-}
-
-trait SimpleScalaSerde[T >: Null] extends Serde[T] with ScalaSerde[T] {
- def serialize(data: T): Array[Byte]
- def deserialize(data: Array[Byte]): Option[T]
-
- private def outerSerialize(data: T): Array[Byte] = serialize(data)
- private def outerDeserialize(data: Array[Byte]): Option[T] = deserialize(data)
-
- override def deserializer(): Deserializer[T] = new Deserializer[T] {
- override def deserialize(data: Array[Byte]): Option[T] = outerDeserialize(data)
- }
-
- override def serializer(): Serializer[T] = new Serializer[T] {
- override def serialize(data: T): Array[Byte] = outerSerialize(data)
- }
-}
-
-trait Deserializer[T >: Null] extends JDeserializer[T] {
- override def configure(configs: java.util.Map[String, _], isKey: Boolean): Unit = ()
-
- override def close(): Unit = ()
-
- override def deserialize(topic: String, data: Array[Byte]): T =
- Option(data).flatMap(deserialize).orNull
-
- def deserialize(data: Array[Byte]): Option[T]
-}
-
-trait Serializer[T] extends JSerializer[T] {
- override def configure(configs: java.util.Map[String, _], isKey: Boolean): Unit = ()
-
- override def close(): Unit = ()
-
- override def serialize(topic: String, data: T): Array[Byte] =
- Option(data).map(serialize).orNull
-
- def serialize(data: T): Array[Byte]
-}
diff --git a/streams/streams-scala/src/main/scala/org/apache/kafka/streams/scala/Serdes.scala b/streams/streams-scala/src/main/scala/org/apache/kafka/streams/scala/Serdes.scala
new file mode 100644
index 0000000..a0ffffa
--- /dev/null
+++ b/streams/streams-scala/src/main/scala/org/apache/kafka/streams/scala/Serdes.scala
@@ -0,0 +1,71 @@
+/*
+ * Copyright (C) 2018 Lightbend Inc. <https://www.lightbend.com>
+ * Copyright (C) 2017-2018 Alexis Seigneurin.
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.streams.scala
+
+import java.util
+
+import org.apache.kafka.common.serialization.{Deserializer, Serde, Serializer, Serdes => JSerdes}
+import org.apache.kafka.streams.kstream.WindowedSerdes
+
+object Serdes {
+ implicit val String: Serde[String] = JSerdes.String()
+ implicit val Long: Serde[Long] = JSerdes.Long().asInstanceOf[Serde[Long]]
+ implicit val JavaLong: Serde[java.lang.Long] = JSerdes.Long()
+ implicit val ByteArray: Serde[Array[Byte]] = JSerdes.ByteArray()
+ implicit val Bytes: Serde[org.apache.kafka.common.utils.Bytes] = JSerdes.Bytes()
+ implicit val Float: Serde[Float] = JSerdes.Float().asInstanceOf[Serde[Float]]
+ implicit val JavaFloat: Serde[java.lang.Float] = JSerdes.Float()
+ implicit val Double: Serde[Double] = JSerdes.Double().asInstanceOf[Serde[Double]]
+ implicit val JavaDouble: Serde[java.lang.Double] = JSerdes.Double()
+ implicit val Integer: Serde[Int] = JSerdes.Integer().asInstanceOf[Serde[Int]]
+ implicit val JavaInteger: Serde[java.lang.Integer] = JSerdes.Integer()
+
+ implicit def timeWindowedSerde[T]: WindowedSerdes.TimeWindowedSerde[T] = new WindowedSerdes.TimeWindowedSerde[T]()
+ implicit def sessionWindowedSerde[T]: WindowedSerdes.SessionWindowedSerde[T] = new WindowedSerdes.SessionWindowedSerde[T]()
+
+ def fromFn[T >: Null](serializer: T => Array[Byte], deserializer: Array[Byte] => Option[T]): Serde[T] =
+ JSerdes.serdeFrom(
+ new Serializer[T] {
+ override def serialize(topic: String, data: T): Array[Byte] = serializer(data)
+ override def configure(configs: util.Map[String, _], isKey: Boolean): Unit = ()
+ override def close(): Unit = ()
+ },
+ new Deserializer[T] {
+ override def deserialize(topic: String, data: Array[Byte]): T = deserializer(data).orNull
+ override def configure(configs: util.Map[String, _], isKey: Boolean): Unit = ()
+ override def close(): Unit = ()
+ }
+ )
+
+ def fromFn[T >: Null](serializer: (String, T) => Array[Byte],
+ deserializer: (String, Array[Byte]) => Option[T]): Serde[T] =
+ JSerdes.serdeFrom(
+ new Serializer[T] {
+ override def serialize(topic: String, data: T): Array[Byte] = serializer(topic, data)
+ override def configure(configs: util.Map[String, _], isKey: Boolean): Unit = ()
+ override def close(): Unit = ()
+ },
+ new Deserializer[T] {
+ override def deserialize(topic: String, data: Array[Byte]): T = deserializer(topic, data).orNull
+ override def configure(configs: util.Map[String, _], isKey: Boolean): Unit = ()
+ override def close(): Unit = ()
+ }
+ )
+}
diff --git a/streams/streams-scala/src/main/scala/org/apache/kafka/streams/scala/StreamsBuilder.scala b/streams/streams-scala/src/main/scala/org/apache/kafka/streams/scala/StreamsBuilder.scala
index 9e6e204..397af32 100644
--- a/streams/streams-scala/src/main/scala/org/apache/kafka/streams/scala/StreamsBuilder.scala
+++ b/streams/streams-scala/src/main/scala/org/apache/kafka/streams/scala/StreamsBuilder.scala
@@ -48,7 +48,7 @@ class StreamsBuilder(inner: StreamsBuilderJ = new StreamsBuilderJ) {
* import ImplicitConversions._
*
* // Bring implicit default serdes in scope
- * import DefaultSerdes._
+ * import Serdes._
*
* val builder = new StreamsBuilder()
*
@@ -98,7 +98,7 @@ class StreamsBuilder(inner: StreamsBuilderJ = new StreamsBuilderJ) {
* import ImplicitConversions._
*
* // Bring implicit default serdes in scope
- * import DefaultSerdes._
+ * import Serdes._
*
* val builder = new StreamsBuilder()
*
diff --git a/streams/streams-scala/src/main/scala/org/apache/kafka/streams/scala/kstream/KStream.scala b/streams/streams-scala/src/main/scala/org/apache/kafka/streams/scala/kstream/KStream.scala
index 7634b95..e3e8470 100644
--- a/streams/streams-scala/src/main/scala/org/apache/kafka/streams/scala/kstream/KStream.scala
+++ b/streams/streams-scala/src/main/scala/org/apache/kafka/streams/scala/kstream/KStream.scala
@@ -206,7 +206,7 @@ class KStream[K, V](val inner: KStreamJ[K, V]) {
* Example:
*
* // brings implicit serdes in scope
- * import DefaultSerdes._
+ * import Serdes._
*
* //..
* val clicksPerRegion: KTable[String, Long] = //..
@@ -238,7 +238,7 @@ class KStream[K, V](val inner: KStreamJ[K, V]) {
* Example:
*
* // brings implicit serdes in scope
- * import DefaultSerdes._
+ * import Serdes._
*
* //..
* val clicksPerRegion: KTable[String, Long] = //..
@@ -354,7 +354,7 @@ class KStream[K, V](val inner: KStreamJ[K, V]) {
* Example:
*
* // brings implicit serdes in scope
- * import DefaultSerdes._
+ * import Serdes._
*
* val clicksPerRegion: KTable[String, Long] =
* userClicksStream
@@ -362,7 +362,7 @@ class KStream[K, V](val inner: KStreamJ[K, V]) {
* .map((_, regionWithClicks) => regionWithClicks)
*
* // the groupByKey gets the Serialized instance through an implicit conversion of the
- * // serdes brought into scope through the import DefaultSerdes._ above
+ * // serdes brought into scope through the import Serdes._ above
* .groupByKey
* .reduce(_ + _)
*
@@ -388,7 +388,7 @@ class KStream[K, V](val inner: KStreamJ[K, V]) {
* Example:
*
* // brings implicit serdes in scope
- * import DefaultSerdes._
+ * import Serdes._
*
* val textLines = streamBuilder.stream[String, String](inputTopic)
*
@@ -398,7 +398,7 @@ class KStream[K, V](val inner: KStreamJ[K, V]) {
* textLines.flatMapValues(v => pattern.split(v.toLowerCase))
*
* // the groupBy gets the Serialized instance through an implicit conversion of the
- * // serdes brought into scope through the import DefaultSerdes._ above
+ * // serdes brought into scope through the import Serdes._ above
* .groupBy((k, v) => v)
*
* .count()
diff --git a/streams/streams-scala/src/test/scala/org/apache/kafka/streams/scala/StreamToTableJoinScalaIntegrationTestImplicitSerdes.scala b/streams/streams-scala/src/test/scala/org/apache/kafka/streams/scala/StreamToTableJoinScalaIntegrationTestImplicitSerdes.scala
index e701431..113458e 100644
--- a/streams/streams-scala/src/test/scala/org/apache/kafka/streams/scala/StreamToTableJoinScalaIntegrationTestImplicitSerdes.scala
+++ b/streams/streams-scala/src/test/scala/org/apache/kafka/streams/scala/StreamToTableJoinScalaIntegrationTestImplicitSerdes.scala
@@ -73,7 +73,7 @@ class StreamToTableJoinScalaIntegrationTestImplicitSerdes extends JUnitSuite
// DefaultSerdes brings into scope implicit serdes (mostly for primitives) that will set up all Serialized, Produced,
// Consumed and Joined instances. So all APIs below that accept Serialized, Produced, Consumed or Joined will
// get these instances automatically
- import DefaultSerdes._
+ import Serdes._
val streamsConfiguration: Properties = getStreamsConfiguration()
@@ -122,16 +122,16 @@ class StreamToTableJoinScalaIntegrationTestImplicitSerdes extends JUnitSuite
val streamsConfiguration: Properties = getStreamsConfiguration()
- streamsConfiguration.put(StreamsConfig.DEFAULT_KEY_SERDE_CLASS_CONFIG, Serdes.String().getClass().getName())
- streamsConfiguration.put(StreamsConfig.DEFAULT_VALUE_SERDE_CLASS_CONFIG, Serdes.String().getClass().getName())
+ streamsConfiguration.put(StreamsConfig.DEFAULT_KEY_SERDE_CLASS_CONFIG, Serdes.String.getClass.getName)
+ streamsConfiguration.put(StreamsConfig.DEFAULT_VALUE_SERDE_CLASS_CONFIG, Serdes.String.getClass.getName)
val builder: StreamsBuilderJ = new StreamsBuilderJ()
val userClicksStream: KStreamJ[String, JLong] =
- builder.stream[String, JLong](userClicksTopicJ, Consumed.`with`(Serdes.String(), Serdes.Long()))
+ builder.stream[String, JLong](userClicksTopicJ, Consumed.`with`(Serdes.String, Serdes.JavaLong))
val userRegionsTable: KTableJ[String, String] =
- builder.table[String, String](userRegionsTopicJ, Consumed.`with`(Serdes.String(), Serdes.String()))
+ builder.table[String, String](userRegionsTopicJ, Consumed.`with`(Serdes.String, Serdes.String))
// Join the stream against the table.
val userClicksJoinRegion: KStreamJ[String, (String, JLong)] = userClicksStream
@@ -140,7 +140,7 @@ class StreamToTableJoinScalaIntegrationTestImplicitSerdes extends JUnitSuite
def apply(clicks: JLong, region: String): (String, JLong) =
(if (region == null) "UNKNOWN" else region, clicks)
},
- Joined.`with`[String, JLong, String](Serdes.String(), Serdes.Long(), Serdes.String()))
+ Joined.`with`[String, JLong, String](Serdes.String, Serdes.JavaLong, Serdes.String))
// Change the stream from <user> -> <region, clicks> to <region> -> <clicks>
val clicksByRegion : KStreamJ[String, JLong] = userClicksJoinRegion
@@ -152,7 +152,7 @@ class StreamToTableJoinScalaIntegrationTestImplicitSerdes extends JUnitSuite
// Compute the total per region by summing the individual click counts per region.
val clicksPerRegion: KTableJ[String, JLong] = clicksByRegion
- .groupByKey(Serialized.`with`(Serdes.String(), Serdes.Long()))
+ .groupByKey(Serialized.`with`(Serdes.String, Serdes.JavaLong))
.reduce {
new Reducer[JLong] {
def apply(v1: JLong, v2: JLong) = v1 + v2
@@ -160,7 +160,7 @@ class StreamToTableJoinScalaIntegrationTestImplicitSerdes extends JUnitSuite
}
// Write the (continuously updating) results to the output topic.
- clicksPerRegion.toStream.to(outputTopicJ, Produced.`with`(Serdes.String(), Serdes.Long()))
+ clicksPerRegion.toStream.to(outputTopicJ, Produced.`with`(Serdes.String, Serdes.JavaLong))
val streams: KafkaStreamsJ = new KafkaStreamsJ(builder.build(), streamsConfiguration)
diff --git a/streams/streams-scala/src/test/scala/org/apache/kafka/streams/scala/TopologyTest.scala b/streams/streams-scala/src/test/scala/org/apache/kafka/streams/scala/TopologyTest.scala
index 71d4834..9495ea7 100644
--- a/streams/streams-scala/src/test/scala/org/apache/kafka/streams/scala/TopologyTest.scala
+++ b/streams/streams-scala/src/test/scala/org/apache/kafka/streams/scala/TopologyTest.scala
@@ -52,7 +52,7 @@ class TopologyTest extends JUnitSuite {
// build the Scala topology
def getTopologyScala(): TopologyDescription = {
- import DefaultSerdes._
+ import Serdes._
import collection.JavaConverters._
val streamBuilder = new StreamsBuilder
@@ -87,7 +87,7 @@ class TopologyTest extends JUnitSuite {
// build the Scala topology
def getTopologyScala(): TopologyDescription = {
- import DefaultSerdes._
+ import Serdes._
import collection.JavaConverters._
val streamBuilder = new StreamsBuilder
@@ -132,7 +132,7 @@ class TopologyTest extends JUnitSuite {
// build the Scala topology
def getTopologyScala(): TopologyDescription = {
- import DefaultSerdes._
+ import Serdes._
val builder = new StreamsBuilder()
@@ -158,10 +158,10 @@ class TopologyTest extends JUnitSuite {
val builder: StreamsBuilderJ = new StreamsBuilderJ()
val userClicksStream: KStreamJ[String, JLong] =
- builder.stream[String, JLong](userClicksTopic, Consumed.`with`(Serdes.String(), Serdes.Long()))
+ builder.stream[String, JLong](userClicksTopic, Consumed.`with`(Serdes.String, Serdes.JavaLong))
val userRegionsTable: KTableJ[String, String] =
- builder.table[String, String](userRegionsTopic, Consumed.`with`(Serdes.String(), Serdes.String()))
+ builder.table[String, String](userRegionsTopic, Consumed.`with`(Serdes.String, Serdes.String))
// Join the stream against the table.
val userClicksJoinRegion: KStreamJ[String, (String, JLong)] = userClicksStream
@@ -170,7 +170,7 @@ class TopologyTest extends JUnitSuite {
def apply(clicks: JLong, region: String): (String, JLong) =
(if (region == null) "UNKNOWN" else region, clicks)
},
- Joined.`with`[String, JLong, String](Serdes.String(), Serdes.Long(), Serdes.String()))
+ Joined.`with`[String, JLong, String](Serdes.String, Serdes.JavaLong, Serdes.String))
// Change the stream from <user> -> <region, clicks> to <region> -> <clicks>
val clicksByRegion : KStreamJ[String, JLong] = userClicksJoinRegion
@@ -182,7 +182,7 @@ class TopologyTest extends JUnitSuite {
// Compute the total per region by summing the individual click counts per region.
val clicksPerRegion: KTableJ[String, JLong] = clicksByRegion
- .groupByKey(Serialized.`with`(Serdes.String(), Serdes.Long()))
+ .groupByKey(Serialized.`with`(Serdes.String, Serdes.JavaLong))
.reduce {
new Reducer[JLong] {
def apply(v1: JLong, v2: JLong) = v1 + v2
diff --git a/streams/streams-scala/src/test/scala/org/apache/kafka/streams/scala/WordCountTest.scala b/streams/streams-scala/src/test/scala/org/apache/kafka/streams/scala/WordCountTest.scala
index e827a3c..17fa35c 100644
--- a/streams/streams-scala/src/test/scala/org/apache/kafka/streams/scala/WordCountTest.scala
+++ b/streams/streams-scala/src/test/scala/org/apache/kafka/streams/scala/WordCountTest.scala
@@ -75,7 +75,7 @@ class WordCountTest extends JUnitSuite with WordCountTestData {
@Test def testShouldCountWords(): Unit = {
- import DefaultSerdes._
+ import Serdes._
val streamsConfiguration = getStreamsConfiguration()
@@ -112,8 +112,8 @@ class WordCountTest extends JUnitSuite with WordCountTestData {
import collection.JavaConverters._
val streamsConfiguration = getStreamsConfiguration()
- streamsConfiguration.put(StreamsConfig.DEFAULT_KEY_SERDE_CLASS_CONFIG, Serdes.String().getClass().getName())
- streamsConfiguration.put(StreamsConfig.DEFAULT_VALUE_SERDE_CLASS_CONFIG, Serdes.String().getClass().getName())
+ streamsConfiguration.put(StreamsConfig.DEFAULT_KEY_SERDE_CLASS_CONFIG, Serdes.String.getClass.getName)
+ streamsConfiguration.put(StreamsConfig.DEFAULT_VALUE_SERDE_CLASS_CONFIG, Serdes.String.getClass.getName)
val streamBuilder = new StreamsBuilderJ
val textLines: KStreamJ[String, String] = streamBuilder.stream[String, String](inputTopicJ)
@@ -134,7 +134,7 @@ class WordCountTest extends JUnitSuite with WordCountTestData {
val wordCounts: KTableJ[String, java.lang.Long] = grouped.count()
- wordCounts.toStream.to(outputTopicJ, Produced.`with`(Serdes.String(), Serdes.Long()))
+ wordCounts.toStream.to(outputTopicJ, Produced.`with`(Serdes.String, Serdes.JavaLong))
val streams: KafkaStreamsJ = new KafkaStreamsJ(streamBuilder.build(), streamsConfiguration)
streams.start()
@@ -153,7 +153,7 @@ class WordCountTest extends JUnitSuite with WordCountTestData {
streamsConfiguration.put(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG, cluster.bootstrapServers())
streamsConfiguration.put(StreamsConfig.COMMIT_INTERVAL_MS_CONFIG, "10000")
streamsConfiguration.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest")
- streamsConfiguration.put(StreamsConfig.STATE_DIR_CONFIG, testFolder.getRoot().getPath())
+ streamsConfiguration.put(StreamsConfig.STATE_DIR_CONFIG, testFolder.getRoot.getPath)
streamsConfiguration
}
--
To stop receiving notification emails like this one, please contact
guozhang@apache.org.