You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@kafka.apache.org by bb...@apache.org on 2019/03/15 14:57:07 UTC

[kafka] branch trunk updated: KAFKA-7027: Add an overload build method in scala (#6373)

This is an automated email from the ASF dual-hosted git repository.

bbejeck pushed a commit to branch trunk
in repository https://gitbox.apache.org/repos/asf/kafka.git


The following commit(s) were added to refs/heads/trunk by this push:
     new 853f24a  KAFKA-7027: Add an overload build method in scala (#6373)
853f24a is described below

commit 853f24a4a18b26b4e4cc03555673ee951067fa6e
Author: Massimo Siani <ma...@users.noreply.github.com>
AuthorDate: Fri Mar 15 15:56:48 2019 +0100

    KAFKA-7027: Add an overload build method in scala (#6373)
    
    The Java API can pass a Properties object to StreamsBuilder#build, to allow, e.g., topology optimization, while the Scala API does not yet. The latter only delegates the work to the underlying Java implementation.
    
    Reviewers: John Roesler <jo...@confluent.io>,  Bill Bejeck <bb...@gmail.com>
---
 .../kafka/streams/scala/StreamsBuilder.scala       |  11 ++
 .../apache/kafka/streams/scala/TopologyTest.scala  | 175 ++++++++++++++++++++-
 2 files changed, 184 insertions(+), 2 deletions(-)

diff --git a/streams/streams-scala/src/main/scala/org/apache/kafka/streams/scala/StreamsBuilder.scala b/streams/streams-scala/src/main/scala/org/apache/kafka/streams/scala/StreamsBuilder.scala
index 1fcba48..9c4e65a 100644
--- a/streams/streams-scala/src/main/scala/org/apache/kafka/streams/scala/StreamsBuilder.scala
+++ b/streams/streams-scala/src/main/scala/org/apache/kafka/streams/scala/StreamsBuilder.scala
@@ -19,6 +19,7 @@
  */
 package org.apache.kafka.streams.scala
 
+import java.util.Properties
 import java.util.regex.Pattern
 
 import org.apache.kafka.streams.kstream.GlobalKTable
@@ -183,4 +184,14 @@ class StreamsBuilder(inner: StreamsBuilderJ = new StreamsBuilderJ) {
     inner.addGlobalStore(storeBuilder, topic, consumed, stateUpdateSupplier)
 
   def build(): Topology = inner.build()
+
+  /**
+   * Returns the `Topology` that represents the specified processing logic and accepts
+   * a `Properties` instance used to indicate whether to optimize topology or not.
+   *
+   * @param props the `Properties` used for building possibly optimized topology
+   * @return the `Topology` that represents the specified processing logic
+   * @see `org.apache.kafka.streams.StreamsBuilder#build`
+   */
+  def build(props: Properties): Topology = inner.build(props)
 }
diff --git a/streams/streams-scala/src/test/scala/org/apache/kafka/streams/scala/TopologyTest.scala b/streams/streams-scala/src/test/scala/org/apache/kafka/streams/scala/TopologyTest.scala
index 6035dd0..3917552 100644
--- a/streams/streams-scala/src/test/scala/org/apache/kafka/streams/scala/TopologyTest.scala
+++ b/streams/streams-scala/src/test/scala/org/apache/kafka/streams/scala/TopologyTest.scala
@@ -19,20 +19,31 @@
  */
 package org.apache.kafka.streams.scala
 
+import java.time.Duration
+import java.util
+import java.util.{Locale, Properties}
 import java.util.regex.Pattern
 
+import org.apache.kafka.common.serialization.{Serdes => SerdesJ}
 import org.apache.kafka.streams.kstream.{
+  Aggregator,
+  ForeachAction,
+  Initializer,
+  JoinWindows,
   KeyValueMapper,
+  Predicate,
   Reducer,
   Transformer,
   TransformerSupplier,
   ValueJoiner,
   ValueMapper,
+  Joined => JoinedJ,
   KGroupedStream => KGroupedStreamJ,
   KStream => KStreamJ,
-  KTable => KTableJ
+  KTable => KTableJ,
+  Materialized => MaterializedJ
 }
-import org.apache.kafka.streams.processor.ProcessorContext
+import org.apache.kafka.streams.processor.{AbstractProcessor, ProcessorContext, ProcessorSupplier}
 import org.apache.kafka.streams.scala.ImplicitConversions._
 import org.apache.kafka.streams.scala.Serdes._
 import org.apache.kafka.streams.scala.kstream._
@@ -268,4 +279,164 @@ class TopologyTest extends JUnitSuite {
     // should match
     assertEquals(getTopologyScala, getTopologyJava)
   }
+
+  @Test def shouldBuildIdenticalTopologyInJavaNScalaProperties(): Unit = {
+
+    val props = new Properties()
+    props.put(StreamsConfig.TOPOLOGY_OPTIMIZATION, StreamsConfig.OPTIMIZE)
+
+    val propsNoOptimization = new Properties()
+    propsNoOptimization.put(StreamsConfig.TOPOLOGY_OPTIMIZATION, StreamsConfig.NO_OPTIMIZATION)
+
+    val AGGREGATION_TOPIC = "aggregationTopic"
+    val REDUCE_TOPIC = "reduceTopic"
+    val JOINED_TOPIC = "joinedTopic"
+
+    // build the Scala topology
+    def getTopologyScala: StreamsBuilder = {
+
+      val aggregator = (_: String, v: String, agg: Int) => agg + v.length
+      val reducer = (v1: String, v2: String) => v1 + ":" + v2
+      val processorValueCollector: util.List[String] = new util.ArrayList[String]
+
+      val builder: StreamsBuilder = new StreamsBuilder
+
+      val sourceStream: KStream[String, String] =
+        builder.stream(inputTopic)(Consumed.`with`(Serdes.String, Serdes.String))
+
+      val mappedStream: KStream[String, String] =
+        sourceStream.map((k: String, v: String) => (k.toUpperCase(Locale.getDefault), v))
+      mappedStream
+        .filter((k: String, _: String) => k == "B")
+        .mapValues((v: String) => v.toUpperCase(Locale.getDefault))
+        .process(() => new SimpleProcessor(processorValueCollector))
+
+      val stream2 = mappedStream.groupByKey
+        .aggregate(0)(aggregator)(Materialized.`with`(Serdes.String, Serdes.Integer))
+        .toStream
+      stream2.to(AGGREGATION_TOPIC)(Produced.`with`(Serdes.String, Serdes.Integer))
+
+      // adding operators for case where the repartition node is further downstream
+      val stream3 = mappedStream
+        .filter((_: String, _: String) => true)
+        .peek((k: String, v: String) => System.out.println(k + ":" + v))
+        .groupByKey
+        .reduce(reducer)(Materialized.`with`(Serdes.String, Serdes.String))
+        .toStream
+      stream3.to(REDUCE_TOPIC)(Produced.`with`(Serdes.String, Serdes.String))
+
+      mappedStream
+        .filter((k: String, _: String) => k == "A")
+        .join(stream2)((v1: String, v2: Int) => v1 + ":" + v2.toString, JoinWindows.of(Duration.ofMillis(5000)))(
+          Joined.`with`(Serdes.String, Serdes.String, Serdes.Integer)
+        )
+        .to(JOINED_TOPIC)
+
+      mappedStream
+        .filter((k: String, _: String) => k == "A")
+        .join(stream3)((v1: String, v2: String) => v1 + ":" + v2.toString, JoinWindows.of(Duration.ofMillis(5000)))(
+          Joined.`with`(Serdes.String, Serdes.String, Serdes.String)
+        )
+        .to(JOINED_TOPIC)
+
+      builder
+    }
+
+    // build the Java topology
+    def getTopologyJava: StreamsBuilderJ = {
+
+      val keyValueMapper: KeyValueMapper[String, String, KeyValue[String, String]] =
+        new KeyValueMapper[String, String, KeyValue[String, String]] {
+          override def apply(key: String, value: String): KeyValue[String, String] =
+            KeyValue.pair(key.toUpperCase(Locale.getDefault), value)
+        }
+      val initializer: Initializer[Integer] = new Initializer[Integer] {
+        override def apply(): Integer = 0
+      }
+      val aggregator: Aggregator[String, String, Integer] = new Aggregator[String, String, Integer] {
+        override def apply(key: String, value: String, aggregate: Integer): Integer = aggregate + value.length
+      }
+      val reducer: Reducer[String] = new Reducer[String] {
+        override def apply(v1: String, v2: String): String = v1 + ":" + v2
+      }
+      val valueMapper: ValueMapper[String, String] = new ValueMapper[String, String] {
+        override def apply(v: String): String = v.toUpperCase(Locale.getDefault)
+      }
+      val processorValueCollector = new util.ArrayList[String]
+      val processorSupplier: ProcessorSupplier[String, String] = new ProcessorSupplier[String, String] {
+        override def get() = new SimpleProcessor(processorValueCollector)
+      }
+      val valueJoiner2: ValueJoiner[String, Integer, String] = new ValueJoiner[String, Integer, String] {
+        override def apply(value1: String, value2: Integer): String = value1 + ":" + value2.toString
+      }
+      val valueJoiner3: ValueJoiner[String, String, String] = new ValueJoiner[String, String, String] {
+        override def apply(value1: String, value2: String): String = value1 + ":" + value2.toString
+      }
+
+      val builder = new StreamsBuilderJ
+
+      val sourceStream = builder.stream(inputTopic, Consumed.`with`(Serdes.String, Serdes.String))
+
+      val mappedStream: KStreamJ[String, String] =
+        sourceStream.map(keyValueMapper)
+      mappedStream
+        .filter(new Predicate[String, String] {
+          override def test(key: String, value: String): Boolean = key == "B"
+        })
+        .mapValues[String](valueMapper)
+        .process(processorSupplier)
+
+      val stream2 = mappedStream.groupByKey
+        .aggregate(initializer, aggregator, MaterializedJ.`with`(Serdes.String, SerdesJ.Integer))
+        .toStream
+      stream2.to(AGGREGATION_TOPIC, Produced.`with`(Serdes.String, SerdesJ.Integer))
+
+      // adding operators for case where the repartition node is further downstream
+      val stream3 = mappedStream
+        .filter(new Predicate[String, String] {
+          override def test(k: String, v: String) = true
+        })
+        .peek(new ForeachAction[String, String] {
+          override def apply(k: String, v: String) = System.out.println(k + ":" + v)
+        })
+        .groupByKey
+        .reduce(reducer, MaterializedJ.`with`(Serdes.String, Serdes.String))
+        .toStream
+      stream3.to(REDUCE_TOPIC, Produced.`with`(Serdes.String, Serdes.String))
+
+      mappedStream
+        .filter(new Predicate[String, String] {
+          override def test(key: String, value: String): Boolean = key == "A"
+        })
+        .join(stream2,
+              valueJoiner2,
+              JoinWindows.of(Duration.ofMillis(5000)),
+              JoinedJ.`with`(Serdes.String, Serdes.String, SerdesJ.Integer))
+        .to(JOINED_TOPIC)
+
+      mappedStream
+        .filter(new Predicate[String, String] {
+          override def test(key: String, value: String): Boolean = key == "A"
+        })
+        .join(stream3,
+              valueJoiner3,
+              JoinWindows.of(Duration.ofMillis(5000)),
+              JoinedJ.`with`(Serdes.String, Serdes.String, SerdesJ.String))
+        .to(JOINED_TOPIC)
+
+      builder
+    }
+
+    assertNotEquals(getTopologyScala.build(props).describe.toString,
+                    getTopologyScala.build(propsNoOptimization).describe.toString)
+    assertEquals(getTopologyScala.build(propsNoOptimization).describe.toString,
+                 getTopologyJava.build(propsNoOptimization).describe.toString)
+    assertEquals(getTopologyScala.build(props).describe.toString, getTopologyJava.build(props).describe.toString)
+  }
+
+  private class SimpleProcessor private[TopologyTest] (val valueList: util.List[String])
+      extends AbstractProcessor[String, String] {
+    override def process(key: String, value: String): Unit =
+      valueList.add(value)
+  }
 }