You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@kafka.apache.org by vv...@apache.org on 2020/09/02 17:50:44 UTC

[kafka] branch trunk updated: MINOR: Fix build scala 2.12 build after KAFKA-10020 (#9245)

This is an automated email from the ASF dual-hosted git repository.

vvcephei pushed a commit to branch trunk
in repository https://gitbox.apache.org/repos/asf/kafka.git


The following commit(s) were added to refs/heads/trunk by this push:
     new 4662ed4  MINOR: Fix build scala 2.12 build after KAFKA-10020 (#9245)
4662ed4 is described below

commit 4662ed4aac79ed539f3a62140b8b154517837dbd
Author: Yuriy Badalyantc <lm...@gmail.com>
AuthorDate: Thu Sep 3 00:49:47 2020 +0700

    MINOR: Fix build scala 2.12 build after KAFKA-10020 (#9245)
    
    Fixes a problem in which the Serdes class in the same package as
    the tests (the old one) overshadows the one we explicitly imported
    (the new one), but only in Scala 2.12. Since users (hopefully) don't
    put their classes in our packages, they won't face the same problem.
    
    Reviewers: Chia-Ping Tsai <ch...@gmail.com>, David Arthur <mu...@gmail.com>, John Roesler <vv...@apache.org>
---
 ...bleJoinScalaIntegrationTestImplicitSerdes.scala | 16 ++++++------
 .../apache/kafka/streams/scala/TopologyTest.scala  | 30 +++++++++++-----------
 .../apache/kafka/streams/scala/WordCountTest.scala |  8 +++---
 3 files changed, 27 insertions(+), 27 deletions(-)

diff --git a/streams/streams-scala/src/test/scala/org/apache/kafka/streams/scala/StreamToTableJoinScalaIntegrationTestImplicitSerdes.scala b/streams/streams-scala/src/test/scala/org/apache/kafka/streams/scala/StreamToTableJoinScalaIntegrationTestImplicitSerdes.scala
index d4a1aa7..0c24454 100644
--- a/streams/streams-scala/src/test/scala/org/apache/kafka/streams/scala/StreamToTableJoinScalaIntegrationTestImplicitSerdes.scala
+++ b/streams/streams-scala/src/test/scala/org/apache/kafka/streams/scala/StreamToTableJoinScalaIntegrationTestImplicitSerdes.scala
@@ -19,7 +19,7 @@ package org.apache.kafka.streams.scala
 import java.util.Properties
 
 import org.apache.kafka.streams.{KafkaStreams, KeyValue, StreamsConfig}
-import org.apache.kafka.streams.scala.serialization.Serdes
+import org.apache.kafka.streams.scala.serialization.{Serdes => NewSerdes}
 import org.apache.kafka.streams.scala.ImplicitConversions._
 import org.apache.kafka.streams.scala.kstream._
 import org.apache.kafka.streams.scala.utils.StreamToTableJoinScalaIntegrationTestBase
@@ -133,16 +133,16 @@ class StreamToTableJoinScalaIntegrationTestImplicitSerdes extends StreamToTableJ
 
     val streamsConfiguration: Properties = getStreamsConfiguration()
 
-    streamsConfiguration.put(StreamsConfig.DEFAULT_KEY_SERDE_CLASS_CONFIG, Serdes.stringSerde.getClass.getName)
-    streamsConfiguration.put(StreamsConfig.DEFAULT_VALUE_SERDE_CLASS_CONFIG, Serdes.stringSerde.getClass.getName)
+    streamsConfiguration.put(StreamsConfig.DEFAULT_KEY_SERDE_CLASS_CONFIG, NewSerdes.stringSerde.getClass.getName)
+    streamsConfiguration.put(StreamsConfig.DEFAULT_VALUE_SERDE_CLASS_CONFIG, NewSerdes.stringSerde.getClass.getName)
 
     val builder: StreamsBuilderJ = new StreamsBuilderJ()
 
     val userClicksStream: KStreamJ[String, JLong] =
-      builder.stream[String, JLong](userClicksTopicJ, Consumed.`with`(Serdes.stringSerde, Serdes.javaLongSerde))
+      builder.stream[String, JLong](userClicksTopicJ, Consumed.`with`(NewSerdes.stringSerde, NewSerdes.javaLongSerde))
 
     val userRegionsTable: KTableJ[String, String] =
-      builder.table[String, String](userRegionsTopicJ, Consumed.`with`(Serdes.stringSerde, Serdes.stringSerde))
+      builder.table[String, String](userRegionsTopicJ, Consumed.`with`(NewSerdes.stringSerde, NewSerdes.stringSerde))
 
     // Join the stream against the table.
     val valueJoinerJ: ValueJoiner[JLong, String, (String, JLong)] =
@@ -150,7 +150,7 @@ class StreamToTableJoinScalaIntegrationTestImplicitSerdes extends StreamToTableJ
     val userClicksJoinRegion: KStreamJ[String, (String, JLong)] = userClicksStream.leftJoin(
       userRegionsTable,
       valueJoinerJ,
-      Joined.`with`[String, JLong, String](Serdes.stringSerde, Serdes.javaLongSerde, Serdes.stringSerde)
+      Joined.`with`[String, JLong, String](NewSerdes.stringSerde, NewSerdes.javaLongSerde, NewSerdes.stringSerde)
     )
 
     // Change the stream from <user> -> <region, clicks> to <region> -> <clicks>
@@ -160,11 +160,11 @@ class StreamToTableJoinScalaIntegrationTestImplicitSerdes extends StreamToTableJ
 
     // Compute the total per region by summing the individual click counts per region.
     val clicksPerRegion: KTableJ[String, JLong] = clicksByRegion
-      .groupByKey(Grouped.`with`(Serdes.stringSerde, Serdes.javaLongSerde))
+      .groupByKey(Grouped.`with`(NewSerdes.stringSerde, NewSerdes.javaLongSerde))
       .reduce((v1, v2) => v1 + v2)
 
     // Write the (continuously updating) results to the output topic.
-    clicksPerRegion.toStream.to(outputTopicJ, Produced.`with`(Serdes.stringSerde, Serdes.javaLongSerde))
+    clicksPerRegion.toStream.to(outputTopicJ, Produced.`with`(NewSerdes.stringSerde, NewSerdes.javaLongSerde))
 
     val streams = new KafkaStreamsJ(builder.build(), streamsConfiguration)
 
diff --git a/streams/streams-scala/src/test/scala/org/apache/kafka/streams/scala/TopologyTest.scala b/streams/streams-scala/src/test/scala/org/apache/kafka/streams/scala/TopologyTest.scala
index 1262431..eeed948 100644
--- a/streams/streams-scala/src/test/scala/org/apache/kafka/streams/scala/TopologyTest.scala
+++ b/streams/streams-scala/src/test/scala/org/apache/kafka/streams/scala/TopologyTest.scala
@@ -42,7 +42,7 @@ import org.apache.kafka.streams.kstream.{
 }
 import org.apache.kafka.streams.processor.{AbstractProcessor, ProcessorContext, ProcessorSupplier}
 import org.apache.kafka.streams.scala.ImplicitConversions._
-import org.apache.kafka.streams.scala.serialization.Serdes
+import org.apache.kafka.streams.scala.serialization.{Serdes => NewSerdes}
 import org.apache.kafka.streams.scala.serialization.Serdes._
 import org.apache.kafka.streams.scala.kstream._
 import org.apache.kafka.streams.{KeyValue, StreamsConfig, TopologyDescription, StreamsBuilder => StreamsBuilderJ}
@@ -355,7 +355,7 @@ class TopologyTest {
       val builder: StreamsBuilder = new StreamsBuilder
 
       val sourceStream: KStream[String, String] =
-        builder.stream(inputTopic)(Consumed.`with`(Serdes.stringSerde, Serdes.stringSerde))
+        builder.stream(inputTopic)(Consumed.`with`(NewSerdes.stringSerde, NewSerdes.stringSerde))
 
       val mappedStream: KStream[String, String] =
         sourceStream.map((k: String, v: String) => (k.toUpperCase(Locale.getDefault), v))
@@ -365,30 +365,30 @@ class TopologyTest {
         .process(() => new SimpleProcessor(processorValueCollector))
 
       val stream2 = mappedStream.groupByKey
-        .aggregate(0)(aggregator)(Materialized.`with`(Serdes.stringSerde, Serdes.intSerde))
+        .aggregate(0)(aggregator)(Materialized.`with`(NewSerdes.stringSerde, NewSerdes.intSerde))
         .toStream
-      stream2.to(AGGREGATION_TOPIC)(Produced.`with`(Serdes.stringSerde, Serdes.intSerde))
+      stream2.to(AGGREGATION_TOPIC)(Produced.`with`(NewSerdes.stringSerde, NewSerdes.intSerde))
 
       // adding operators for case where the repartition node is further downstream
       val stream3 = mappedStream
         .filter((_: String, _: String) => true)
         .peek((k: String, v: String) => System.out.println(k + ":" + v))
         .groupByKey
-        .reduce(reducer)(Materialized.`with`(Serdes.stringSerde, Serdes.stringSerde))
+        .reduce(reducer)(Materialized.`with`(NewSerdes.stringSerde, NewSerdes.stringSerde))
         .toStream
-      stream3.to(REDUCE_TOPIC)(Produced.`with`(Serdes.stringSerde, Serdes.stringSerde))
+      stream3.to(REDUCE_TOPIC)(Produced.`with`(NewSerdes.stringSerde, NewSerdes.stringSerde))
 
       mappedStream
         .filter((k: String, _: String) => k == "A")
         .join(stream2)((v1: String, v2: Int) => v1 + ":" + v2.toString, JoinWindows.of(Duration.ofMillis(5000)))(
-          StreamJoined.`with`(Serdes.stringSerde, Serdes.stringSerde, Serdes.intSerde)
+          StreamJoined.`with`(NewSerdes.stringSerde, NewSerdes.stringSerde, NewSerdes.intSerde)
         )
         .to(JOINED_TOPIC)
 
       mappedStream
         .filter((k: String, _: String) => k == "A")
         .join(stream3)((v1: String, v2: String) => v1 + ":" + v2.toString, JoinWindows.of(Duration.ofMillis(5000)))(
-          StreamJoined.`with`(Serdes.stringSerde, Serdes.stringSerde, Serdes.stringSerde)
+          StreamJoined.`with`(NewSerdes.stringSerde, NewSerdes.stringSerde, NewSerdes.stringSerde)
         )
         .to(JOINED_TOPIC)
 
@@ -411,7 +411,7 @@ class TopologyTest {
 
       val builder = new StreamsBuilderJ
 
-      val sourceStream = builder.stream(inputTopic, Consumed.`with`(Serdes.stringSerde, Serdes.stringSerde))
+      val sourceStream = builder.stream(inputTopic, Consumed.`with`(NewSerdes.stringSerde, NewSerdes.stringSerde))
 
       val mappedStream: KStreamJ[String, String] =
         sourceStream.map(keyValueMapper)
@@ -421,25 +421,25 @@ class TopologyTest {
         .process(processorSupplier)
 
       val stream2: KStreamJ[String, Integer] = mappedStream.groupByKey
-        .aggregate(initializer, aggregator, MaterializedJ.`with`(Serdes.stringSerde, SerdesJ.Integer))
+        .aggregate(initializer, aggregator, MaterializedJ.`with`(NewSerdes.stringSerde, SerdesJ.Integer))
         .toStream
-      stream2.to(AGGREGATION_TOPIC, Produced.`with`(Serdes.stringSerde, SerdesJ.Integer))
+      stream2.to(AGGREGATION_TOPIC, Produced.`with`(NewSerdes.stringSerde, SerdesJ.Integer))
 
       // adding operators for case where the repartition node is further downstream
       val stream3 = mappedStream
         .filter((_, _) => true)
         .peek((k, v) => System.out.println(k + ":" + v))
         .groupByKey
-        .reduce(reducer, MaterializedJ.`with`(Serdes.stringSerde, Serdes.stringSerde))
+        .reduce(reducer, MaterializedJ.`with`(NewSerdes.stringSerde, NewSerdes.stringSerde))
         .toStream
-      stream3.to(REDUCE_TOPIC, Produced.`with`(Serdes.stringSerde, Serdes.stringSerde))
+      stream3.to(REDUCE_TOPIC, Produced.`with`(NewSerdes.stringSerde, NewSerdes.stringSerde))
 
       mappedStream
         .filter((key, _) => key == "A")
         .join[Integer, String](stream2,
                                valueJoiner2,
                                JoinWindows.of(Duration.ofMillis(5000)),
-                               StreamJoinedJ.`with`(Serdes.stringSerde, Serdes.stringSerde, SerdesJ.Integer))
+                               StreamJoinedJ.`with`(NewSerdes.stringSerde, NewSerdes.stringSerde, SerdesJ.Integer))
         .to(JOINED_TOPIC)
 
       mappedStream
@@ -447,7 +447,7 @@ class TopologyTest {
         .join(stream3,
               valueJoiner3,
               JoinWindows.of(Duration.ofMillis(5000)),
-              StreamJoinedJ.`with`(Serdes.stringSerde, Serdes.stringSerde, SerdesJ.String))
+              StreamJoinedJ.`with`(NewSerdes.stringSerde, NewSerdes.stringSerde, SerdesJ.String))
         .to(JOINED_TOPIC)
 
       builder
diff --git a/streams/streams-scala/src/test/scala/org/apache/kafka/streams/scala/WordCountTest.scala b/streams/streams-scala/src/test/scala/org/apache/kafka/streams/scala/WordCountTest.scala
index bd1b279..16b3493 100644
--- a/streams/streams-scala/src/test/scala/org/apache/kafka/streams/scala/WordCountTest.scala
+++ b/streams/streams-scala/src/test/scala/org/apache/kafka/streams/scala/WordCountTest.scala
@@ -25,7 +25,7 @@ import java.util.regex.Pattern
 import org.junit.Assert._
 import org.junit._
 import org.junit.rules.TemporaryFolder
-import org.apache.kafka.streams.scala.serialization.Serdes
+import org.apache.kafka.streams.scala.serialization.{Serdes => NewSerdes}
 import org.apache.kafka.streams.{KafkaStreams, KeyValue, StreamsConfig}
 import org.apache.kafka.streams.scala.kstream._
 import org.apache.kafka.streams.integration.utils.{EmbeddedKafkaCluster, IntegrationTestUtils}
@@ -144,8 +144,8 @@ class WordCountTest extends WordCountTestData {
     import scala.jdk.CollectionConverters._
 
     val streamsConfiguration = getStreamsConfiguration()
-    streamsConfiguration.put(StreamsConfig.DEFAULT_KEY_SERDE_CLASS_CONFIG, Serdes.stringSerde.getClass.getName)
-    streamsConfiguration.put(StreamsConfig.DEFAULT_VALUE_SERDE_CLASS_CONFIG, Serdes.stringSerde.getClass.getName)
+    streamsConfiguration.put(StreamsConfig.DEFAULT_KEY_SERDE_CLASS_CONFIG, NewSerdes.stringSerde.getClass.getName)
+    streamsConfiguration.put(StreamsConfig.DEFAULT_VALUE_SERDE_CLASS_CONFIG, NewSerdes.stringSerde.getClass.getName)
 
     val streamBuilder = new StreamsBuilderJ
     val textLines: KStreamJ[String, String] = streamBuilder.stream[String, String](inputTopicJ)
@@ -162,7 +162,7 @@ class WordCountTest extends WordCountTestData {
 
     val wordCounts: KTableJ[String, java.lang.Long] = grouped.count()
 
-    wordCounts.toStream.to(outputTopicJ, Produced.`with`(Serdes.stringSerde, Serdes.javaLongSerde))
+    wordCounts.toStream.to(outputTopicJ, Produced.`with`(NewSerdes.stringSerde, NewSerdes.javaLongSerde))
 
     val streams: KafkaStreamsJ = new KafkaStreamsJ(streamBuilder.build(), streamsConfiguration)
     streams.start()