You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@kafka.apache.org by gu...@apache.org on 2019/03/07 23:05:44 UTC
[kafka] branch trunk updated: Minor resolve streams scala warnings
(#6369)
This is an automated email from the ASF dual-hosted git repository.
guozhang pushed a commit to branch trunk
in repository https://gitbox.apache.org/repos/asf/kafka.git
The following commit(s) were added to refs/heads/trunk by this push:
new ccd3af1 Minor resolve streams scala warnings (#6369)
ccd3af1 is described below
commit ccd3af15669d38bc8dba3a376a23e9615faf98a2
Author: John Roesler <vv...@users.noreply.github.com>
AuthorDate: Thu Mar 7 17:05:35 2019 -0600
Minor resolve streams scala warnings (#6369)
Resolves the compiler warnings when building streams-scala.
Reviewers: A. Sophie Blee-Goldman <ab...@gmail.com>, Ismael Juma <is...@juma.me.uk>, Guozhang Wang <wa...@gmail.com>
---
...bleJoinScalaIntegrationTestImplicitSerdes.scala | 8 ++-
.../apache/kafka/streams/scala/TopologyTest.scala | 74 +++++++++++-----------
2 files changed, 44 insertions(+), 38 deletions(-)
diff --git a/streams/streams-scala/src/test/scala/org/apache/kafka/streams/scala/StreamToTableJoinScalaIntegrationTestImplicitSerdes.scala b/streams/streams-scala/src/test/scala/org/apache/kafka/streams/scala/StreamToTableJoinScalaIntegrationTestImplicitSerdes.scala
index 523418d..f820c3e 100644
--- a/streams/streams-scala/src/test/scala/org/apache/kafka/streams/scala/StreamToTableJoinScalaIntegrationTestImplicitSerdes.scala
+++ b/streams/streams-scala/src/test/scala/org/apache/kafka/streams/scala/StreamToTableJoinScalaIntegrationTestImplicitSerdes.scala
@@ -75,6 +75,9 @@ class StreamToTableJoinScalaIntegrationTestImplicitSerdes extends StreamToTableJ
val actualClicksPerRegion: java.util.List[KeyValue[String, Long]] =
produceNConsume(userClicksTopic, userRegionsTopic, outputTopic)
+
+ Assert.assertTrue("Expected to process some data", !actualClicksPerRegion.isEmpty)
+
streams.close()
}
@@ -115,6 +118,9 @@ class StreamToTableJoinScalaIntegrationTestImplicitSerdes extends StreamToTableJ
val actualClicksPerRegion: java.util.List[KeyValue[String, Long]] =
produceNConsume(userClicksTopic, userRegionsTopic, outputTopic)
+
+ Assert.assertTrue("Expected to process some data", !actualClicksPerRegion.isEmpty)
+
streams.close()
}
@@ -163,7 +169,7 @@ class StreamToTableJoinScalaIntegrationTestImplicitSerdes extends StreamToTableJ
.groupByKey(Grouped.`with`[String, JLong](Serdes.String, Serdes.JavaLong))
.reduce {
new Reducer[JLong] {
- def apply(v1: JLong, v2: JLong) = v1 + v2
+ def apply(v1: JLong, v2: JLong): JLong = v1 + v2
}
}
diff --git a/streams/streams-scala/src/test/scala/org/apache/kafka/streams/scala/TopologyTest.scala b/streams/streams-scala/src/test/scala/org/apache/kafka/streams/scala/TopologyTest.scala
index a826401..6035dd0 100644
--- a/streams/streams-scala/src/test/scala/org/apache/kafka/streams/scala/TopologyTest.scala
+++ b/streams/streams-scala/src/test/scala/org/apache/kafka/streams/scala/TopologyTest.scala
@@ -48,16 +48,16 @@ import _root_.scala.collection.JavaConverters._
*/
class TopologyTest extends JUnitSuite {
- val inputTopic = "input-topic"
- val userClicksTopic = "user-clicks-topic"
- val userRegionsTopic = "user-regions-topic"
+ private val inputTopic = "input-topic"
+ private val userClicksTopic = "user-clicks-topic"
+ private val userRegionsTopic = "user-regions-topic"
- val pattern = Pattern.compile("\\W+", Pattern.UNICODE_CHARACTER_CLASS)
+ private val pattern = Pattern.compile("\\W+", Pattern.UNICODE_CHARACTER_CLASS)
- @Test def shouldBuildIdenticalTopologyInJavaNScalaSimple() = {
+ @Test def shouldBuildIdenticalTopologyInJavaNScalaSimple(): Unit = {
// build the Scala topology
- def getTopologyScala(): TopologyDescription = {
+ def getTopologyScala: TopologyDescription = {
import Serdes._
@@ -71,27 +71,27 @@ class TopologyTest extends JUnitSuite {
}
// build the Java topology
- def getTopologyJava(): TopologyDescription = {
+ def getTopologyJava: TopologyDescription = {
val streamBuilder = new StreamsBuilderJ
val textLines = streamBuilder.stream[String, String](inputTopic)
- val _: KStreamJ[String, String] = textLines.flatMapValues {
+ val _: KStreamJ[String, String] = textLines.flatMapValues(
new ValueMapper[String, java.lang.Iterable[String]] {
def apply(s: String): java.lang.Iterable[String] = pattern.split(s.toLowerCase).toIterable.asJava
}
- }
+ )
streamBuilder.build().describe()
}
// should match
- assertEquals(getTopologyScala(), getTopologyJava())
+ assertEquals(getTopologyScala, getTopologyJava)
}
- @Test def shouldBuildIdenticalTopologyInJavaNScalaAggregate() = {
+ @Test def shouldBuildIdenticalTopologyInJavaNScalaAggregate(): Unit = {
// build the Scala topology
- def getTopologyScala(): TopologyDescription = {
+ def getTopologyScala: TopologyDescription = {
import Serdes._
@@ -101,23 +101,23 @@ class TopologyTest extends JUnitSuite {
val _: KTable[String, Long] =
textLines
.flatMapValues(v => pattern.split(v.toLowerCase))
- .groupBy((k, v) => v)
+ .groupBy((_, v) => v)
.count()
streamBuilder.build().describe()
}
// build the Java topology
- def getTopologyJava(): TopologyDescription = {
+ def getTopologyJava: TopologyDescription = {
val streamBuilder = new StreamsBuilderJ
val textLines: KStreamJ[String, String] = streamBuilder.stream[String, String](inputTopic)
- val splits: KStreamJ[String, String] = textLines.flatMapValues {
+ val splits: KStreamJ[String, String] = textLines.flatMapValues(
new ValueMapper[String, java.lang.Iterable[String]] {
def apply(s: String): java.lang.Iterable[String] = pattern.split(s.toLowerCase).toIterable.asJava
}
- }
+ )
val grouped: KGroupedStreamJ[String, String] = splits.groupBy {
new KeyValueMapper[String, String, String] {
@@ -125,19 +125,19 @@ class TopologyTest extends JUnitSuite {
}
}
- val wordCounts: KTableJ[String, java.lang.Long] = grouped.count()
+ grouped.count()
streamBuilder.build().describe()
}
// should match
- assertEquals(getTopologyScala(), getTopologyJava())
+ assertEquals(getTopologyScala, getTopologyJava)
}
- @Test def shouldBuildIdenticalTopologyInJavaNScalaJoin() = {
+ @Test def shouldBuildIdenticalTopologyInJavaNScalaJoin(): Unit = {
// build the Scala topology
- def getTopologyScala(): TopologyDescription = {
+ def getTopologyScala: TopologyDescription = {
import Serdes._
val builder = new StreamsBuilder()
@@ -146,18 +146,18 @@ class TopologyTest extends JUnitSuite {
val userRegionsTable: KTable[String, String] = builder.table(userRegionsTopic)
- val clicksPerRegion: KTable[String, Long] =
- userClicksStream
- .leftJoin(userRegionsTable)((clicks, region) => (if (region == null) "UNKNOWN" else region, clicks))
- .map((_, regionWithClicks) => regionWithClicks)
- .groupByKey
- .reduce(_ + _)
+ // clicks per region
+ userClicksStream
+ .leftJoin(userRegionsTable)((clicks, region) => (if (region == null) "UNKNOWN" else region, clicks))
+ .map((_, regionWithClicks) => regionWithClicks)
+ .groupByKey
+ .reduce(_ + _)
builder.build().describe()
}
// build the Java topology
- def getTopologyJava(): TopologyDescription = {
+ def getTopologyJava: TopologyDescription = {
import java.lang.{Long => JLong}
@@ -190,11 +190,11 @@ class TopologyTest extends JUnitSuite {
}
// Compute the total per region by summing the individual click counts per region.
- val clicksPerRegion: KTableJ[String, JLong] = clicksByRegion
+ clicksByRegion
.groupByKey(Grouped.`with`[String, JLong])
.reduce {
new Reducer[JLong] {
- def apply(v1: JLong, v2: JLong) = v1 + v2
+ def apply(v1: JLong, v2: JLong): JLong = v1 + v2
}
}
@@ -202,20 +202,19 @@ class TopologyTest extends JUnitSuite {
}
// should match
- assertEquals(getTopologyScala(), getTopologyJava())
+ assertEquals(getTopologyScala, getTopologyJava)
}
- @Test def shouldBuildIdenticalTopologyInJavaNScalaTransform() = {
+ @Test def shouldBuildIdenticalTopologyInJavaNScalaTransform(): Unit = {
// build the Scala topology
- def getTopologyScala(): TopologyDescription = {
+ def getTopologyScala: TopologyDescription = {
import Serdes._
val streamBuilder = new StreamsBuilder
val textLines = streamBuilder.stream[String, String](inputTopic)
- //noinspection ConvertExpressionToSAM due to 2.11 build
val _: KTable[String, Long] =
textLines
.transform(new TransformerSupplier[String, String, KeyValue[String, String]] {
@@ -229,14 +228,14 @@ class TopologyTest extends JUnitSuite {
override def close(): Unit = Unit
}
})
- .groupBy((k, v) => v)
+ .groupBy((_, v) => v)
.count()
streamBuilder.build().describe()
}
// build the Java topology
- def getTopologyJava(): TopologyDescription = {
+ def getTopologyJava: TopologyDescription = {
val streamBuilder = new StreamsBuilderJ
val textLines: KStreamJ[String, String] = streamBuilder.stream[String, String](inputTopic)
@@ -260,12 +259,13 @@ class TopologyTest extends JUnitSuite {
}
}
- val wordCounts: KTableJ[String, java.lang.Long] = grouped.count()
+ // word counts
+ grouped.count()
streamBuilder.build().describe()
}
// should match
- assertEquals(getTopologyScala(), getTopologyJava())
+ assertEquals(getTopologyScala, getTopologyJava)
}
}