You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@kafka.apache.org by vv...@apache.org on 2020/09/02 17:50:44 UTC
[kafka] branch trunk updated: MINOR: Fix build scala 2.12 build
after KAFKA-10020 (#9245)
This is an automated email from the ASF dual-hosted git repository.
vvcephei pushed a commit to branch trunk
in repository https://gitbox.apache.org/repos/asf/kafka.git
The following commit(s) were added to refs/heads/trunk by this push:
new 4662ed4 MINOR: Fix build scala 2.12 build after KAFKA-10020 (#9245)
4662ed4 is described below
commit 4662ed4aac79ed539f3a62140b8b154517837dbd
Author: Yuriy Badalyantc <lm...@gmail.com>
AuthorDate: Thu Sep 3 00:49:47 2020 +0700
MINOR: Fix build scala 2.12 build after KAFKA-10020 (#9245)
Fixes a problem in which the Serdes class in the same package as
the tests (the old one) overshadows the one we explicitly imported
(the new one), but only in Scala 2.12. Since users (hopefully) don't
put their classes in our packages, they won't face the same problem.
Reviewers: Chia-Ping Tsai <ch...@gmail.com>, David Arthur <mu...@gmail.com>, John Roesler <vv...@apache.org>
---
...bleJoinScalaIntegrationTestImplicitSerdes.scala | 16 ++++++------
.../apache/kafka/streams/scala/TopologyTest.scala | 30 +++++++++++-----------
.../apache/kafka/streams/scala/WordCountTest.scala | 8 +++---
3 files changed, 27 insertions(+), 27 deletions(-)
diff --git a/streams/streams-scala/src/test/scala/org/apache/kafka/streams/scala/StreamToTableJoinScalaIntegrationTestImplicitSerdes.scala b/streams/streams-scala/src/test/scala/org/apache/kafka/streams/scala/StreamToTableJoinScalaIntegrationTestImplicitSerdes.scala
index d4a1aa7..0c24454 100644
--- a/streams/streams-scala/src/test/scala/org/apache/kafka/streams/scala/StreamToTableJoinScalaIntegrationTestImplicitSerdes.scala
+++ b/streams/streams-scala/src/test/scala/org/apache/kafka/streams/scala/StreamToTableJoinScalaIntegrationTestImplicitSerdes.scala
@@ -19,7 +19,7 @@ package org.apache.kafka.streams.scala
import java.util.Properties
import org.apache.kafka.streams.{KafkaStreams, KeyValue, StreamsConfig}
-import org.apache.kafka.streams.scala.serialization.Serdes
+import org.apache.kafka.streams.scala.serialization.{Serdes => NewSerdes}
import org.apache.kafka.streams.scala.ImplicitConversions._
import org.apache.kafka.streams.scala.kstream._
import org.apache.kafka.streams.scala.utils.StreamToTableJoinScalaIntegrationTestBase
@@ -133,16 +133,16 @@ class StreamToTableJoinScalaIntegrationTestImplicitSerdes extends StreamToTableJ
val streamsConfiguration: Properties = getStreamsConfiguration()
- streamsConfiguration.put(StreamsConfig.DEFAULT_KEY_SERDE_CLASS_CONFIG, Serdes.stringSerde.getClass.getName)
- streamsConfiguration.put(StreamsConfig.DEFAULT_VALUE_SERDE_CLASS_CONFIG, Serdes.stringSerde.getClass.getName)
+ streamsConfiguration.put(StreamsConfig.DEFAULT_KEY_SERDE_CLASS_CONFIG, NewSerdes.stringSerde.getClass.getName)
+ streamsConfiguration.put(StreamsConfig.DEFAULT_VALUE_SERDE_CLASS_CONFIG, NewSerdes.stringSerde.getClass.getName)
val builder: StreamsBuilderJ = new StreamsBuilderJ()
val userClicksStream: KStreamJ[String, JLong] =
- builder.stream[String, JLong](userClicksTopicJ, Consumed.`with`(Serdes.stringSerde, Serdes.javaLongSerde))
+ builder.stream[String, JLong](userClicksTopicJ, Consumed.`with`(NewSerdes.stringSerde, NewSerdes.javaLongSerde))
val userRegionsTable: KTableJ[String, String] =
- builder.table[String, String](userRegionsTopicJ, Consumed.`with`(Serdes.stringSerde, Serdes.stringSerde))
+ builder.table[String, String](userRegionsTopicJ, Consumed.`with`(NewSerdes.stringSerde, NewSerdes.stringSerde))
// Join the stream against the table.
val valueJoinerJ: ValueJoiner[JLong, String, (String, JLong)] =
@@ -150,7 +150,7 @@ class StreamToTableJoinScalaIntegrationTestImplicitSerdes extends StreamToTableJ
val userClicksJoinRegion: KStreamJ[String, (String, JLong)] = userClicksStream.leftJoin(
userRegionsTable,
valueJoinerJ,
- Joined.`with`[String, JLong, String](Serdes.stringSerde, Serdes.javaLongSerde, Serdes.stringSerde)
+ Joined.`with`[String, JLong, String](NewSerdes.stringSerde, NewSerdes.javaLongSerde, NewSerdes.stringSerde)
)
// Change the stream from <user> -> <region, clicks> to <region> -> <clicks>
@@ -160,11 +160,11 @@ class StreamToTableJoinScalaIntegrationTestImplicitSerdes extends StreamToTableJ
// Compute the total per region by summing the individual click counts per region.
val clicksPerRegion: KTableJ[String, JLong] = clicksByRegion
- .groupByKey(Grouped.`with`(Serdes.stringSerde, Serdes.javaLongSerde))
+ .groupByKey(Grouped.`with`(NewSerdes.stringSerde, NewSerdes.javaLongSerde))
.reduce((v1, v2) => v1 + v2)
// Write the (continuously updating) results to the output topic.
- clicksPerRegion.toStream.to(outputTopicJ, Produced.`with`(Serdes.stringSerde, Serdes.javaLongSerde))
+ clicksPerRegion.toStream.to(outputTopicJ, Produced.`with`(NewSerdes.stringSerde, NewSerdes.javaLongSerde))
val streams = new KafkaStreamsJ(builder.build(), streamsConfiguration)
diff --git a/streams/streams-scala/src/test/scala/org/apache/kafka/streams/scala/TopologyTest.scala b/streams/streams-scala/src/test/scala/org/apache/kafka/streams/scala/TopologyTest.scala
index 1262431..eeed948 100644
--- a/streams/streams-scala/src/test/scala/org/apache/kafka/streams/scala/TopologyTest.scala
+++ b/streams/streams-scala/src/test/scala/org/apache/kafka/streams/scala/TopologyTest.scala
@@ -42,7 +42,7 @@ import org.apache.kafka.streams.kstream.{
}
import org.apache.kafka.streams.processor.{AbstractProcessor, ProcessorContext, ProcessorSupplier}
import org.apache.kafka.streams.scala.ImplicitConversions._
-import org.apache.kafka.streams.scala.serialization.Serdes
+import org.apache.kafka.streams.scala.serialization.{Serdes => NewSerdes}
import org.apache.kafka.streams.scala.serialization.Serdes._
import org.apache.kafka.streams.scala.kstream._
import org.apache.kafka.streams.{KeyValue, StreamsConfig, TopologyDescription, StreamsBuilder => StreamsBuilderJ}
@@ -355,7 +355,7 @@ class TopologyTest {
val builder: StreamsBuilder = new StreamsBuilder
val sourceStream: KStream[String, String] =
- builder.stream(inputTopic)(Consumed.`with`(Serdes.stringSerde, Serdes.stringSerde))
+ builder.stream(inputTopic)(Consumed.`with`(NewSerdes.stringSerde, NewSerdes.stringSerde))
val mappedStream: KStream[String, String] =
sourceStream.map((k: String, v: String) => (k.toUpperCase(Locale.getDefault), v))
@@ -365,30 +365,30 @@ class TopologyTest {
.process(() => new SimpleProcessor(processorValueCollector))
val stream2 = mappedStream.groupByKey
- .aggregate(0)(aggregator)(Materialized.`with`(Serdes.stringSerde, Serdes.intSerde))
+ .aggregate(0)(aggregator)(Materialized.`with`(NewSerdes.stringSerde, NewSerdes.intSerde))
.toStream
- stream2.to(AGGREGATION_TOPIC)(Produced.`with`(Serdes.stringSerde, Serdes.intSerde))
+ stream2.to(AGGREGATION_TOPIC)(Produced.`with`(NewSerdes.stringSerde, NewSerdes.intSerde))
// adding operators for case where the repartition node is further downstream
val stream3 = mappedStream
.filter((_: String, _: String) => true)
.peek((k: String, v: String) => System.out.println(k + ":" + v))
.groupByKey
- .reduce(reducer)(Materialized.`with`(Serdes.stringSerde, Serdes.stringSerde))
+ .reduce(reducer)(Materialized.`with`(NewSerdes.stringSerde, NewSerdes.stringSerde))
.toStream
- stream3.to(REDUCE_TOPIC)(Produced.`with`(Serdes.stringSerde, Serdes.stringSerde))
+ stream3.to(REDUCE_TOPIC)(Produced.`with`(NewSerdes.stringSerde, NewSerdes.stringSerde))
mappedStream
.filter((k: String, _: String) => k == "A")
.join(stream2)((v1: String, v2: Int) => v1 + ":" + v2.toString, JoinWindows.of(Duration.ofMillis(5000)))(
- StreamJoined.`with`(Serdes.stringSerde, Serdes.stringSerde, Serdes.intSerde)
+ StreamJoined.`with`(NewSerdes.stringSerde, NewSerdes.stringSerde, NewSerdes.intSerde)
)
.to(JOINED_TOPIC)
mappedStream
.filter((k: String, _: String) => k == "A")
.join(stream3)((v1: String, v2: String) => v1 + ":" + v2.toString, JoinWindows.of(Duration.ofMillis(5000)))(
- StreamJoined.`with`(Serdes.stringSerde, Serdes.stringSerde, Serdes.stringSerde)
+ StreamJoined.`with`(NewSerdes.stringSerde, NewSerdes.stringSerde, NewSerdes.stringSerde)
)
.to(JOINED_TOPIC)
@@ -411,7 +411,7 @@ class TopologyTest {
val builder = new StreamsBuilderJ
- val sourceStream = builder.stream(inputTopic, Consumed.`with`(Serdes.stringSerde, Serdes.stringSerde))
+ val sourceStream = builder.stream(inputTopic, Consumed.`with`(NewSerdes.stringSerde, NewSerdes.stringSerde))
val mappedStream: KStreamJ[String, String] =
sourceStream.map(keyValueMapper)
@@ -421,25 +421,25 @@ class TopologyTest {
.process(processorSupplier)
val stream2: KStreamJ[String, Integer] = mappedStream.groupByKey
- .aggregate(initializer, aggregator, MaterializedJ.`with`(Serdes.stringSerde, SerdesJ.Integer))
+ .aggregate(initializer, aggregator, MaterializedJ.`with`(NewSerdes.stringSerde, SerdesJ.Integer))
.toStream
- stream2.to(AGGREGATION_TOPIC, Produced.`with`(Serdes.stringSerde, SerdesJ.Integer))
+ stream2.to(AGGREGATION_TOPIC, Produced.`with`(NewSerdes.stringSerde, SerdesJ.Integer))
// adding operators for case where the repartition node is further downstream
val stream3 = mappedStream
.filter((_, _) => true)
.peek((k, v) => System.out.println(k + ":" + v))
.groupByKey
- .reduce(reducer, MaterializedJ.`with`(Serdes.stringSerde, Serdes.stringSerde))
+ .reduce(reducer, MaterializedJ.`with`(NewSerdes.stringSerde, NewSerdes.stringSerde))
.toStream
- stream3.to(REDUCE_TOPIC, Produced.`with`(Serdes.stringSerde, Serdes.stringSerde))
+ stream3.to(REDUCE_TOPIC, Produced.`with`(NewSerdes.stringSerde, NewSerdes.stringSerde))
mappedStream
.filter((key, _) => key == "A")
.join[Integer, String](stream2,
valueJoiner2,
JoinWindows.of(Duration.ofMillis(5000)),
- StreamJoinedJ.`with`(Serdes.stringSerde, Serdes.stringSerde, SerdesJ.Integer))
+ StreamJoinedJ.`with`(NewSerdes.stringSerde, NewSerdes.stringSerde, SerdesJ.Integer))
.to(JOINED_TOPIC)
mappedStream
@@ -447,7 +447,7 @@ class TopologyTest {
.join(stream3,
valueJoiner3,
JoinWindows.of(Duration.ofMillis(5000)),
- StreamJoinedJ.`with`(Serdes.stringSerde, Serdes.stringSerde, SerdesJ.String))
+ StreamJoinedJ.`with`(NewSerdes.stringSerde, NewSerdes.stringSerde, SerdesJ.String))
.to(JOINED_TOPIC)
builder
diff --git a/streams/streams-scala/src/test/scala/org/apache/kafka/streams/scala/WordCountTest.scala b/streams/streams-scala/src/test/scala/org/apache/kafka/streams/scala/WordCountTest.scala
index bd1b279..16b3493 100644
--- a/streams/streams-scala/src/test/scala/org/apache/kafka/streams/scala/WordCountTest.scala
+++ b/streams/streams-scala/src/test/scala/org/apache/kafka/streams/scala/WordCountTest.scala
@@ -25,7 +25,7 @@ import java.util.regex.Pattern
import org.junit.Assert._
import org.junit._
import org.junit.rules.TemporaryFolder
-import org.apache.kafka.streams.scala.serialization.Serdes
+import org.apache.kafka.streams.scala.serialization.{Serdes => NewSerdes}
import org.apache.kafka.streams.{KafkaStreams, KeyValue, StreamsConfig}
import org.apache.kafka.streams.scala.kstream._
import org.apache.kafka.streams.integration.utils.{EmbeddedKafkaCluster, IntegrationTestUtils}
@@ -144,8 +144,8 @@ class WordCountTest extends WordCountTestData {
import scala.jdk.CollectionConverters._
val streamsConfiguration = getStreamsConfiguration()
- streamsConfiguration.put(StreamsConfig.DEFAULT_KEY_SERDE_CLASS_CONFIG, Serdes.stringSerde.getClass.getName)
- streamsConfiguration.put(StreamsConfig.DEFAULT_VALUE_SERDE_CLASS_CONFIG, Serdes.stringSerde.getClass.getName)
+ streamsConfiguration.put(StreamsConfig.DEFAULT_KEY_SERDE_CLASS_CONFIG, NewSerdes.stringSerde.getClass.getName)
+ streamsConfiguration.put(StreamsConfig.DEFAULT_VALUE_SERDE_CLASS_CONFIG, NewSerdes.stringSerde.getClass.getName)
val streamBuilder = new StreamsBuilderJ
val textLines: KStreamJ[String, String] = streamBuilder.stream[String, String](inputTopicJ)
@@ -162,7 +162,7 @@ class WordCountTest extends WordCountTestData {
val wordCounts: KTableJ[String, java.lang.Long] = grouped.count()
- wordCounts.toStream.to(outputTopicJ, Produced.`with`(Serdes.stringSerde, Serdes.javaLongSerde))
+ wordCounts.toStream.to(outputTopicJ, Produced.`with`(NewSerdes.stringSerde, NewSerdes.javaLongSerde))
val streams: KafkaStreamsJ = new KafkaStreamsJ(streamBuilder.build(), streamsConfiguration)
streams.start()