You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@kafka.apache.org by gu...@apache.org on 2019/03/07 23:05:44 UTC

[kafka] branch trunk updated: Minor resolve streams scala warnings (#6369)

This is an automated email from the ASF dual-hosted git repository.

guozhang pushed a commit to branch trunk
in repository https://gitbox.apache.org/repos/asf/kafka.git


The following commit(s) were added to refs/heads/trunk by this push:
     new ccd3af1  Minor resolve streams scala warnings (#6369)
ccd3af1 is described below

commit ccd3af15669d38bc8dba3a376a23e9615faf98a2
Author: John Roesler <vv...@users.noreply.github.com>
AuthorDate: Thu Mar 7 17:05:35 2019 -0600

    Minor resolve streams scala warnings (#6369)
    
    Resolves the compiler warnings when building streams-scala.
    
    Reviewers: A. Sophie Blee-Goldman <ab...@gmail.com>, Ismael Juma <is...@juma.me.uk>, Guozhang Wang <wa...@gmail.com>
---
 ...bleJoinScalaIntegrationTestImplicitSerdes.scala |  8 ++-
 .../apache/kafka/streams/scala/TopologyTest.scala  | 74 +++++++++++-----------
 2 files changed, 44 insertions(+), 38 deletions(-)

diff --git a/streams/streams-scala/src/test/scala/org/apache/kafka/streams/scala/StreamToTableJoinScalaIntegrationTestImplicitSerdes.scala b/streams/streams-scala/src/test/scala/org/apache/kafka/streams/scala/StreamToTableJoinScalaIntegrationTestImplicitSerdes.scala
index 523418d..f820c3e 100644
--- a/streams/streams-scala/src/test/scala/org/apache/kafka/streams/scala/StreamToTableJoinScalaIntegrationTestImplicitSerdes.scala
+++ b/streams/streams-scala/src/test/scala/org/apache/kafka/streams/scala/StreamToTableJoinScalaIntegrationTestImplicitSerdes.scala
@@ -75,6 +75,9 @@ class StreamToTableJoinScalaIntegrationTestImplicitSerdes extends StreamToTableJ
 
     val actualClicksPerRegion: java.util.List[KeyValue[String, Long]] =
       produceNConsume(userClicksTopic, userRegionsTopic, outputTopic)
+
+    Assert.assertTrue("Expected to process some data", !actualClicksPerRegion.isEmpty)
+
     streams.close()
   }
 
@@ -115,6 +118,9 @@ class StreamToTableJoinScalaIntegrationTestImplicitSerdes extends StreamToTableJ
 
     val actualClicksPerRegion: java.util.List[KeyValue[String, Long]] =
       produceNConsume(userClicksTopic, userRegionsTopic, outputTopic)
+
+    Assert.assertTrue("Expected to process some data", !actualClicksPerRegion.isEmpty)
+
     streams.close()
   }
 
@@ -163,7 +169,7 @@ class StreamToTableJoinScalaIntegrationTestImplicitSerdes extends StreamToTableJ
       .groupByKey(Grouped.`with`[String, JLong](Serdes.String, Serdes.JavaLong))
       .reduce {
         new Reducer[JLong] {
-          def apply(v1: JLong, v2: JLong) = v1 + v2
+          def apply(v1: JLong, v2: JLong): JLong = v1 + v2
         }
       }
 
diff --git a/streams/streams-scala/src/test/scala/org/apache/kafka/streams/scala/TopologyTest.scala b/streams/streams-scala/src/test/scala/org/apache/kafka/streams/scala/TopologyTest.scala
index a826401..6035dd0 100644
--- a/streams/streams-scala/src/test/scala/org/apache/kafka/streams/scala/TopologyTest.scala
+++ b/streams/streams-scala/src/test/scala/org/apache/kafka/streams/scala/TopologyTest.scala
@@ -48,16 +48,16 @@ import _root_.scala.collection.JavaConverters._
  */
 class TopologyTest extends JUnitSuite {
 
-  val inputTopic = "input-topic"
-  val userClicksTopic = "user-clicks-topic"
-  val userRegionsTopic = "user-regions-topic"
+  private val inputTopic = "input-topic"
+  private val userClicksTopic = "user-clicks-topic"
+  private val userRegionsTopic = "user-regions-topic"
 
-  val pattern = Pattern.compile("\\W+", Pattern.UNICODE_CHARACTER_CLASS)
+  private val pattern = Pattern.compile("\\W+", Pattern.UNICODE_CHARACTER_CLASS)
 
-  @Test def shouldBuildIdenticalTopologyInJavaNScalaSimple() = {
+  @Test def shouldBuildIdenticalTopologyInJavaNScalaSimple(): Unit = {
 
     // build the Scala topology
-    def getTopologyScala(): TopologyDescription = {
+    def getTopologyScala: TopologyDescription = {
 
       import Serdes._
 
@@ -71,27 +71,27 @@ class TopologyTest extends JUnitSuite {
     }
 
     // build the Java topology
-    def getTopologyJava(): TopologyDescription = {
+    def getTopologyJava: TopologyDescription = {
 
       val streamBuilder = new StreamsBuilderJ
       val textLines = streamBuilder.stream[String, String](inputTopic)
 
-      val _: KStreamJ[String, String] = textLines.flatMapValues {
+      val _: KStreamJ[String, String] = textLines.flatMapValues(
         new ValueMapper[String, java.lang.Iterable[String]] {
           def apply(s: String): java.lang.Iterable[String] = pattern.split(s.toLowerCase).toIterable.asJava
         }
-      }
+      )
       streamBuilder.build().describe()
     }
 
     // should match
-    assertEquals(getTopologyScala(), getTopologyJava())
+    assertEquals(getTopologyScala, getTopologyJava)
   }
 
-  @Test def shouldBuildIdenticalTopologyInJavaNScalaAggregate() = {
+  @Test def shouldBuildIdenticalTopologyInJavaNScalaAggregate(): Unit = {
 
     // build the Scala topology
-    def getTopologyScala(): TopologyDescription = {
+    def getTopologyScala: TopologyDescription = {
 
       import Serdes._
 
@@ -101,23 +101,23 @@ class TopologyTest extends JUnitSuite {
       val _: KTable[String, Long] =
         textLines
           .flatMapValues(v => pattern.split(v.toLowerCase))
-          .groupBy((k, v) => v)
+          .groupBy((_, v) => v)
           .count()
 
       streamBuilder.build().describe()
     }
 
     // build the Java topology
-    def getTopologyJava(): TopologyDescription = {
+    def getTopologyJava: TopologyDescription = {
 
       val streamBuilder = new StreamsBuilderJ
       val textLines: KStreamJ[String, String] = streamBuilder.stream[String, String](inputTopic)
 
-      val splits: KStreamJ[String, String] = textLines.flatMapValues {
+      val splits: KStreamJ[String, String] = textLines.flatMapValues(
         new ValueMapper[String, java.lang.Iterable[String]] {
           def apply(s: String): java.lang.Iterable[String] = pattern.split(s.toLowerCase).toIterable.asJava
         }
-      }
+      )
 
       val grouped: KGroupedStreamJ[String, String] = splits.groupBy {
         new KeyValueMapper[String, String, String] {
@@ -125,19 +125,19 @@ class TopologyTest extends JUnitSuite {
         }
       }
 
-      val wordCounts: KTableJ[String, java.lang.Long] = grouped.count()
+      grouped.count()
 
       streamBuilder.build().describe()
     }
 
     // should match
-    assertEquals(getTopologyScala(), getTopologyJava())
+    assertEquals(getTopologyScala, getTopologyJava)
   }
 
-  @Test def shouldBuildIdenticalTopologyInJavaNScalaJoin() = {
+  @Test def shouldBuildIdenticalTopologyInJavaNScalaJoin(): Unit = {
 
     // build the Scala topology
-    def getTopologyScala(): TopologyDescription = {
+    def getTopologyScala: TopologyDescription = {
       import Serdes._
 
       val builder = new StreamsBuilder()
@@ -146,18 +146,18 @@ class TopologyTest extends JUnitSuite {
 
       val userRegionsTable: KTable[String, String] = builder.table(userRegionsTopic)
 
-      val clicksPerRegion: KTable[String, Long] =
-        userClicksStream
-          .leftJoin(userRegionsTable)((clicks, region) => (if (region == null) "UNKNOWN" else region, clicks))
-          .map((_, regionWithClicks) => regionWithClicks)
-          .groupByKey
-          .reduce(_ + _)
+      // clicks per region
+      userClicksStream
+        .leftJoin(userRegionsTable)((clicks, region) => (if (region == null) "UNKNOWN" else region, clicks))
+        .map((_, regionWithClicks) => regionWithClicks)
+        .groupByKey
+        .reduce(_ + _)
 
       builder.build().describe()
     }
 
     // build the Java topology
-    def getTopologyJava(): TopologyDescription = {
+    def getTopologyJava: TopologyDescription = {
 
       import java.lang.{Long => JLong}
 
@@ -190,11 +190,11 @@ class TopologyTest extends JUnitSuite {
         }
 
       // Compute the total per region by summing the individual click counts per region.
-      val clicksPerRegion: KTableJ[String, JLong] = clicksByRegion
+      clicksByRegion
         .groupByKey(Grouped.`with`[String, JLong])
         .reduce {
           new Reducer[JLong] {
-            def apply(v1: JLong, v2: JLong) = v1 + v2
+            def apply(v1: JLong, v2: JLong): JLong = v1 + v2
           }
         }
 
@@ -202,20 +202,19 @@ class TopologyTest extends JUnitSuite {
     }
 
     // should match
-    assertEquals(getTopologyScala(), getTopologyJava())
+    assertEquals(getTopologyScala, getTopologyJava)
   }
 
-  @Test def shouldBuildIdenticalTopologyInJavaNScalaTransform() = {
+  @Test def shouldBuildIdenticalTopologyInJavaNScalaTransform(): Unit = {
 
     // build the Scala topology
-    def getTopologyScala(): TopologyDescription = {
+    def getTopologyScala: TopologyDescription = {
 
       import Serdes._
 
       val streamBuilder = new StreamsBuilder
       val textLines = streamBuilder.stream[String, String](inputTopic)
 
-      //noinspection ConvertExpressionToSAM due to 2.11 build
       val _: KTable[String, Long] =
         textLines
           .transform(new TransformerSupplier[String, String, KeyValue[String, String]] {
@@ -229,14 +228,14 @@ class TopologyTest extends JUnitSuite {
                 override def close(): Unit = Unit
               }
           })
-          .groupBy((k, v) => v)
+          .groupBy((_, v) => v)
           .count()
 
       streamBuilder.build().describe()
     }
 
     // build the Java topology
-    def getTopologyJava(): TopologyDescription = {
+    def getTopologyJava: TopologyDescription = {
 
       val streamBuilder = new StreamsBuilderJ
       val textLines: KStreamJ[String, String] = streamBuilder.stream[String, String](inputTopic)
@@ -260,12 +259,13 @@ class TopologyTest extends JUnitSuite {
         }
       }
 
-      val wordCounts: KTableJ[String, java.lang.Long] = grouped.count()
+      // word counts
+      grouped.count()
 
       streamBuilder.build().describe()
     }
 
     // should match
-    assertEquals(getTopologyScala(), getTopologyJava())
+    assertEquals(getTopologyScala, getTopologyJava)
   }
 }