You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@kafka.apache.org by gu...@apache.org on 2018/05/07 03:55:23 UTC

[kafka] branch trunk updated: MINOR: Build and code sample updates for Kafka Streams DSL for Scala (#4949)

This is an automated email from the ASF dual-hosted git repository.

guozhang pushed a commit to branch trunk
in repository https://gitbox.apache.org/repos/asf/kafka.git


The following commit(s) were added to refs/heads/trunk by this push:
     new 893e044  MINOR: Build and code sample updates for Kafka Streams DSL for Scala (#4949)
893e044 is described below

commit 893e0445150614d3538654af0e25f78d87a717ba
Author: Sean Glover <se...@randonom.com>
AuthorDate: Sun May 6 23:55:12 2018 -0400

    MINOR: Build and code sample updates for Kafka Streams DSL for Scala (#4949)
    
    Several build and documentation updates were required after the merge of KAFKA-6670: Implement a Scala wrapper library for Kafka Streams.
    
    Encode Scala major version into streams-scala artifacts.
    To differentiate versions of the kafka-streams-scala artifact across Scala major versions it's required to encode the version into the artifact name before its published to a maven repository. This is accomplished by following a similar release process as kafka core, which encodes the Scala major version and then runs the build for each major version of Scala supported. This is considered standard practice when releasing Scala libraries, but is not handled for us automatically with th [...]
    
    After this change you can generate and install the kafka-streams-scala artifact into the local maven repository:
    
    $ ./gradlew -PscalaVersion=2.11 install
    $ ./gradlew -PscalaVersion=2.12 install
    
    Reviewers: Ismael Juma <is...@juma.me.uk>, Guozhang Wang <wa...@gmail.com>
---
 build.gradle                                               |  3 +--
 docs/streams/developer-guide/dsl-api.html                  | 14 ++++++++------
 docs/streams/index.html                                    |  7 ++++---
 ...reamToTableJoinScalaIntegrationTestImplicitSerdes.scala |  3 +--
 .../org/apache/kafka/streams/scala/TopologyTest.scala      |  3 +--
 .../org/apache/kafka/streams/scala/WordCountTest.scala     |  3 +--
 6 files changed, 16 insertions(+), 17 deletions(-)

diff --git a/build.gradle b/build.gradle
index d60ca8f..31026d9 100644
--- a/build.gradle
+++ b/build.gradle
@@ -981,7 +981,7 @@ project(':streams') {
 project(':streams:streams-scala') {
   println "Building project 'streams-scala' with Scala version ${versions.scala}"
   apply plugin: 'scala'
-  archivesBaseName = "kafka-streams-scala"
+  archivesBaseName = "kafka-streams-scala_${versions.baseScala}"
 
   dependencies {
     compile project(':streams')
@@ -992,7 +992,6 @@ project(':streams:streams-scala') {
     testCompile project(':core').sourceSets.test.output
     testCompile project(':streams').sourceSets.test.output
     testCompile project(':clients').sourceSets.test.output
-    testCompile libs.scalaLogging
 
     testCompile libs.junit
     testCompile libs.scalatest
diff --git a/docs/streams/developer-guide/dsl-api.html b/docs/streams/developer-guide/dsl-api.html
index ce60654..2b25072 100644
--- a/docs/streams/developer-guide/dsl-api.html
+++ b/docs/streams/developer-guide/dsl-api.html
@@ -3191,11 +3191,13 @@ import java.util.Properties
 import java.util.concurrent.TimeUnit
 
 import org.apache.kafka.streams.kstream.Materialized
+import org.apache.kafka.streams.scala.ImplicitConversions._
+import org.apache.kafka.streams.scala._
+import org.apache.kafka.streams.scala.kstream._
 import org.apache.kafka.streams.{KafkaStreams, StreamsConfig}
 
 object WordCountApplication extends App {
   import DefaultSerdes._
-  import ImplicitConversions._
 
   val config: Properties = {
     val p = new Properties()
@@ -3204,9 +3206,9 @@ object WordCountApplication extends App {
     p
   }
 
-  val builder = new StreamsBuilder()
-  val textLines = builder.stream[String, String]("TextLinesTopic")
-  val wordCounts = textLines
+  val builder: StreamsBuilder = new StreamsBuilder
+  val textLines: KStream[String, String] = builder.stream[String, String]("TextLinesTopic")
+  val wordCounts: KTable[String, Long] = textLines
     .flatMapValues(textLine => textLine.toLowerCase.split("\\W+"))
     .groupBy((_, word) => word)
     .count(Materialized.as("counts-store"))
@@ -3216,7 +3218,7 @@ object WordCountApplication extends App {
   streams.start()
 
   sys.ShutdownHookThread {
-    streams.close(10, TimeUnit.SECONDS)
+     streams.close(10, TimeUnit.SECONDS)
   }
 }
               </pre>
@@ -3290,7 +3292,7 @@ val clicksPerRegion: KTable[String, Long] =
    // Join the stream against the table.
    .leftJoin(userRegionsTable, (clicks: UserClicks, region: String) => (if (region == null) "UNKNOWN" else region, clicks.clicks))
 
-   // Change the stream from <user> -> <region, clicks> to <region> -> <clicks>
+   // Change the stream from &lt;user&gt; -&gt; &lt;region, clicks&gt; to &lt;region&gt; -&gt; &lt;clicks&gt;
    .map((_, regionWithClicks) => regionWithClicks)
 
    // Compute the total per region by summing the individual click counts per region.
diff --git a/docs/streams/index.html b/docs/streams/index.html
index 8992fc5..72e1323 100644
--- a/docs/streams/index.html
+++ b/docs/streams/index.html
@@ -255,12 +255,13 @@ import java.util.Properties
 import java.util.concurrent.TimeUnit
 
 import org.apache.kafka.streams.kstream.Materialized
+import org.apache.kafka.streams.scala.ImplicitConversions._
+import org.apache.kafka.streams.scala._
 import org.apache.kafka.streams.scala.kstream._
 import org.apache.kafka.streams.{KafkaStreams, StreamsConfig}
 
 object WordCountApplication extends App {
   import DefaultSerdes._
-  import ImplicitConversions._
 
   val config: Properties = {
     val p = new Properties()
@@ -269,7 +270,7 @@ object WordCountApplication extends App {
     p
   }
 
-  val builder: StreamsBuilder = new StreamsBuilder()
+  val builder: StreamsBuilder = new StreamsBuilder
   val textLines: KStream[String, String] = builder.stream[String, String]("TextLinesTopic")
   val wordCounts: KTable[String, Long] = textLines
     .flatMapValues(textLine => textLine.toLowerCase.split("\\W+"))
@@ -281,7 +282,7 @@ object WordCountApplication extends App {
   streams.start()
 
   sys.ShutdownHookThread {
-    streams.close(10, TimeUnit.SECONDS)
+     streams.close(10, TimeUnit.SECONDS)
   }
 }
                </pre>
diff --git a/streams/streams-scala/src/test/scala/org/apache/kafka/streams/scala/StreamToTableJoinScalaIntegrationTestImplicitSerdes.scala b/streams/streams-scala/src/test/scala/org/apache/kafka/streams/scala/StreamToTableJoinScalaIntegrationTestImplicitSerdes.scala
index 24974c4..e701431 100644
--- a/streams/streams-scala/src/test/scala/org/apache/kafka/streams/scala/StreamToTableJoinScalaIntegrationTestImplicitSerdes.scala
+++ b/streams/streams-scala/src/test/scala/org/apache/kafka/streams/scala/StreamToTableJoinScalaIntegrationTestImplicitSerdes.scala
@@ -34,7 +34,6 @@ import org.apache.kafka.streams._
 import org.apache.kafka.streams.scala.kstream._
 
 import ImplicitConversions._
-import com.typesafe.scalalogging.LazyLogging
 
 /**
  * Test suite that does an example to demonstrate stream-table joins in Kafka Streams
@@ -46,7 +45,7 @@ import com.typesafe.scalalogging.LazyLogging
  * Hence the native Java API based version is more verbose.
  */ 
 class StreamToTableJoinScalaIntegrationTestImplicitSerdes extends JUnitSuite
-  with StreamToTableJoinTestData with LazyLogging {
+  with StreamToTableJoinTestData {
 
   private val privateCluster: EmbeddedKafkaCluster = new EmbeddedKafkaCluster(1)
 
diff --git a/streams/streams-scala/src/test/scala/org/apache/kafka/streams/scala/TopologyTest.scala b/streams/streams-scala/src/test/scala/org/apache/kafka/streams/scala/TopologyTest.scala
index 89b2935..71d4834 100644
--- a/streams/streams-scala/src/test/scala/org/apache/kafka/streams/scala/TopologyTest.scala
+++ b/streams/streams-scala/src/test/scala/org/apache/kafka/streams/scala/TopologyTest.scala
@@ -31,7 +31,6 @@ import org.apache.kafka.streams.scala.kstream._
 import org.apache.kafka.common.serialization._
 
 import ImplicitConversions._
-import com.typesafe.scalalogging.LazyLogging
 
 import org.apache.kafka.streams.{KafkaStreams => KafkaStreamsJ, StreamsBuilder => StreamsBuilderJ, _}
 import org.apache.kafka.streams.kstream.{KTable => KTableJ, KStream => KStreamJ, KGroupedStream => KGroupedStreamJ, _}
@@ -40,7 +39,7 @@ import collection.JavaConverters._
 /**
  * Test suite that verifies that the topology built by the Java and Scala APIs match.
  */ 
-class TopologyTest extends JUnitSuite with LazyLogging {
+class TopologyTest extends JUnitSuite {
 
   val inputTopic = "input-topic"
   val userClicksTopic = "user-clicks-topic"
diff --git a/streams/streams-scala/src/test/scala/org/apache/kafka/streams/scala/WordCountTest.scala b/streams/streams-scala/src/test/scala/org/apache/kafka/streams/scala/WordCountTest.scala
index f71e0cb..e827a3c 100644
--- a/streams/streams-scala/src/test/scala/org/apache/kafka/streams/scala/WordCountTest.scala
+++ b/streams/streams-scala/src/test/scala/org/apache/kafka/streams/scala/WordCountTest.scala
@@ -40,7 +40,6 @@ import org.apache.kafka.common.utils.MockTime
 import org.apache.kafka.test.TestUtils
 
 import ImplicitConversions._
-import com.typesafe.scalalogging.LazyLogging
 
 /**
  * Test suite that does a classic word count example.
@@ -51,7 +50,7 @@ import com.typesafe.scalalogging.LazyLogging
  * Note: In the current project settings SAM type conversion is turned off as it's experimental in Scala 2.11.
  * Hence the native Java API based version is more verbose.
  */ 
-class WordCountTest extends JUnitSuite with WordCountTestData with LazyLogging {
+class WordCountTest extends JUnitSuite with WordCountTestData {
 
   private val privateCluster: EmbeddedKafkaCluster = new EmbeddedKafkaCluster(1)
 

-- 
To stop receiving notification emails like this one, please contact
guozhang@apache.org.