You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@kafka.apache.org by gu...@apache.org on 2018/09/11 23:17:53 UTC

[kafka] branch trunk updated: KAFKA-7386: streams-scala should not cache serdes (#5622)

This is an automated email from the ASF dual-hosted git repository.

guozhang pushed a commit to branch trunk
in repository https://gitbox.apache.org/repos/asf/kafka.git


The following commit(s) were added to refs/heads/trunk by this push:
     new 9dac615  KAFKA-7386: streams-scala should not cache serdes (#5622)
9dac615 is described below

commit 9dac615d228c5b3464c6322aea9f9ce70f9ef37b
Author: John Roesler <vv...@users.noreply.github.com>
AuthorDate: Tue Sep 11 18:17:47 2018 -0500

    KAFKA-7386: streams-scala should not cache serdes (#5622)
    
    Currently, scala.Serdes.String, for example, invokes Serdes.String() once and caches the result.
    
    However, the implementation of the String serde has a non-empty configure method that is variant in whether it's used as a key or value serde. So we won't get correct execution if we create one serde and use it for both keys and values.
    
    Reviewers: Bill Bejeck <bi...@confluent.io>, Guozhang Wang <wa...@gmail.com>
---
 .../org/apache/kafka/streams/scala/Serdes.scala    | 22 +++++++++++-----------
 1 file changed, 11 insertions(+), 11 deletions(-)

diff --git a/streams/streams-scala/src/main/scala/org/apache/kafka/streams/scala/Serdes.scala b/streams/streams-scala/src/main/scala/org/apache/kafka/streams/scala/Serdes.scala
index 8bfb083..02e5380 100644
--- a/streams/streams-scala/src/main/scala/org/apache/kafka/streams/scala/Serdes.scala
+++ b/streams/streams-scala/src/main/scala/org/apache/kafka/streams/scala/Serdes.scala
@@ -25,17 +25,17 @@ import org.apache.kafka.common.serialization.{Deserializer, Serde, Serializer, S
 import org.apache.kafka.streams.kstream.WindowedSerdes
 
 object Serdes {
-  implicit val String: Serde[String] = JSerdes.String()
-  implicit val Long: Serde[Long] = JSerdes.Long().asInstanceOf[Serde[Long]]
-  implicit val JavaLong: Serde[java.lang.Long] = JSerdes.Long()
-  implicit val ByteArray: Serde[Array[Byte]] = JSerdes.ByteArray()
-  implicit val Bytes: Serde[org.apache.kafka.common.utils.Bytes] = JSerdes.Bytes()
-  implicit val Float: Serde[Float] = JSerdes.Float().asInstanceOf[Serde[Float]]
-  implicit val JavaFloat: Serde[java.lang.Float] = JSerdes.Float()
-  implicit val Double: Serde[Double] = JSerdes.Double().asInstanceOf[Serde[Double]]
-  implicit val JavaDouble: Serde[java.lang.Double] = JSerdes.Double()
-  implicit val Integer: Serde[Int] = JSerdes.Integer().asInstanceOf[Serde[Int]]
-  implicit val JavaInteger: Serde[java.lang.Integer] = JSerdes.Integer()
+  implicit def String: Serde[String] = JSerdes.String()
+  implicit def Long: Serde[Long] = JSerdes.Long().asInstanceOf[Serde[Long]]
+  implicit def JavaLong: Serde[java.lang.Long] = JSerdes.Long()
+  implicit def ByteArray: Serde[Array[Byte]] = JSerdes.ByteArray()
+  implicit def Bytes: Serde[org.apache.kafka.common.utils.Bytes] = JSerdes.Bytes()
+  implicit def Float: Serde[Float] = JSerdes.Float().asInstanceOf[Serde[Float]]
+  implicit def JavaFloat: Serde[java.lang.Float] = JSerdes.Float()
+  implicit def Double: Serde[Double] = JSerdes.Double().asInstanceOf[Serde[Double]]
+  implicit def JavaDouble: Serde[java.lang.Double] = JSerdes.Double()
+  implicit def Integer: Serde[Int] = JSerdes.Integer().asInstanceOf[Serde[Int]]
+  implicit def JavaInteger: Serde[java.lang.Integer] = JSerdes.Integer()
 
   implicit def timeWindowedSerde[T]: WindowedSerdes.TimeWindowedSerde[T] = new WindowedSerdes.TimeWindowedSerde[T]()
   implicit def sessionWindowedSerde[T]: WindowedSerdes.SessionWindowedSerde[T] =