You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@kafka.apache.org by bb...@apache.org on 2019/03/15 14:57:07 UTC
[kafka] branch trunk updated: KAFKA-7027: Add an overload build
method in scala (#6373)
This is an automated email from the ASF dual-hosted git repository.
bbejeck pushed a commit to branch trunk
in repository https://gitbox.apache.org/repos/asf/kafka.git
The following commit(s) were added to refs/heads/trunk by this push:
new 853f24a KAFKA-7027: Add an overload build method in scala (#6373)
853f24a is described below
commit 853f24a4a18b26b4e4cc03555673ee951067fa6e
Author: Massimo Siani <ma...@users.noreply.github.com>
AuthorDate: Fri Mar 15 15:56:48 2019 +0100
KAFKA-7027: Add an overload build method in scala (#6373)
The Java API can pass a Properties object to StreamsBuilder#build, to allow, e.g., topology optimization, while the Scala API does not yet. The latter only delegates the work to the underlying Java implementation.
Reviewers: John Roesler <jo...@confluent.io>, Bill Bejeck <bb...@gmail.com>
---
.../kafka/streams/scala/StreamsBuilder.scala | 11 ++
.../apache/kafka/streams/scala/TopologyTest.scala | 175 ++++++++++++++++++++-
2 files changed, 184 insertions(+), 2 deletions(-)
diff --git a/streams/streams-scala/src/main/scala/org/apache/kafka/streams/scala/StreamsBuilder.scala b/streams/streams-scala/src/main/scala/org/apache/kafka/streams/scala/StreamsBuilder.scala
index 1fcba48..9c4e65a 100644
--- a/streams/streams-scala/src/main/scala/org/apache/kafka/streams/scala/StreamsBuilder.scala
+++ b/streams/streams-scala/src/main/scala/org/apache/kafka/streams/scala/StreamsBuilder.scala
@@ -19,6 +19,7 @@
*/
package org.apache.kafka.streams.scala
+import java.util.Properties
import java.util.regex.Pattern
import org.apache.kafka.streams.kstream.GlobalKTable
@@ -183,4 +184,14 @@ class StreamsBuilder(inner: StreamsBuilderJ = new StreamsBuilderJ) {
inner.addGlobalStore(storeBuilder, topic, consumed, stateUpdateSupplier)
def build(): Topology = inner.build()
+
+ /**
+ * Returns the `Topology` that represents the specified processing logic and accepts
+ * a `Properties` instance used to indicate whether to optimize topology or not.
+ *
+ * @param props the `Properties` used for building possibly optimized topology
+ * @return the `Topology` that represents the specified processing logic
+ * @see `org.apache.kafka.streams.StreamsBuilder#build`
+ */
+ def build(props: Properties): Topology = inner.build(props)
}
diff --git a/streams/streams-scala/src/test/scala/org/apache/kafka/streams/scala/TopologyTest.scala b/streams/streams-scala/src/test/scala/org/apache/kafka/streams/scala/TopologyTest.scala
index 6035dd0..3917552 100644
--- a/streams/streams-scala/src/test/scala/org/apache/kafka/streams/scala/TopologyTest.scala
+++ b/streams/streams-scala/src/test/scala/org/apache/kafka/streams/scala/TopologyTest.scala
@@ -19,20 +19,31 @@
*/
package org.apache.kafka.streams.scala
+import java.time.Duration
+import java.util
+import java.util.{Locale, Properties}
import java.util.regex.Pattern
+import org.apache.kafka.common.serialization.{Serdes => SerdesJ}
import org.apache.kafka.streams.kstream.{
+ Aggregator,
+ ForeachAction,
+ Initializer,
+ JoinWindows,
KeyValueMapper,
+ Predicate,
Reducer,
Transformer,
TransformerSupplier,
ValueJoiner,
ValueMapper,
+ Joined => JoinedJ,
KGroupedStream => KGroupedStreamJ,
KStream => KStreamJ,
- KTable => KTableJ
+ KTable => KTableJ,
+ Materialized => MaterializedJ
}
-import org.apache.kafka.streams.processor.ProcessorContext
+import org.apache.kafka.streams.processor.{AbstractProcessor, ProcessorContext, ProcessorSupplier}
import org.apache.kafka.streams.scala.ImplicitConversions._
import org.apache.kafka.streams.scala.Serdes._
import org.apache.kafka.streams.scala.kstream._
@@ -268,4 +279,164 @@ class TopologyTest extends JUnitSuite {
// should match
assertEquals(getTopologyScala, getTopologyJava)
}
+
+ @Test def shouldBuildIdenticalTopologyInJavaNScalaProperties(): Unit = {
+
+ val props = new Properties()
+ props.put(StreamsConfig.TOPOLOGY_OPTIMIZATION, StreamsConfig.OPTIMIZE)
+
+ val propsNoOptimization = new Properties()
+ propsNoOptimization.put(StreamsConfig.TOPOLOGY_OPTIMIZATION, StreamsConfig.NO_OPTIMIZATION)
+
+ val AGGREGATION_TOPIC = "aggregationTopic"
+ val REDUCE_TOPIC = "reduceTopic"
+ val JOINED_TOPIC = "joinedTopic"
+
+ // build the Scala topology
+ def getTopologyScala: StreamsBuilder = {
+
+ val aggregator = (_: String, v: String, agg: Int) => agg + v.length
+ val reducer = (v1: String, v2: String) => v1 + ":" + v2
+ val processorValueCollector: util.List[String] = new util.ArrayList[String]
+
+ val builder: StreamsBuilder = new StreamsBuilder
+
+ val sourceStream: KStream[String, String] =
+ builder.stream(inputTopic)(Consumed.`with`(Serdes.String, Serdes.String))
+
+ val mappedStream: KStream[String, String] =
+ sourceStream.map((k: String, v: String) => (k.toUpperCase(Locale.getDefault), v))
+ mappedStream
+ .filter((k: String, _: String) => k == "B")
+ .mapValues((v: String) => v.toUpperCase(Locale.getDefault))
+ .process(() => new SimpleProcessor(processorValueCollector))
+
+ val stream2 = mappedStream.groupByKey
+ .aggregate(0)(aggregator)(Materialized.`with`(Serdes.String, Serdes.Integer))
+ .toStream
+ stream2.to(AGGREGATION_TOPIC)(Produced.`with`(Serdes.String, Serdes.Integer))
+
+ // adding operators for case where the repartition node is further downstream
+ val stream3 = mappedStream
+ .filter((_: String, _: String) => true)
+ .peek((k: String, v: String) => System.out.println(k + ":" + v))
+ .groupByKey
+ .reduce(reducer)(Materialized.`with`(Serdes.String, Serdes.String))
+ .toStream
+ stream3.to(REDUCE_TOPIC)(Produced.`with`(Serdes.String, Serdes.String))
+
+ mappedStream
+ .filter((k: String, _: String) => k == "A")
+ .join(stream2)((v1: String, v2: Int) => v1 + ":" + v2.toString, JoinWindows.of(Duration.ofMillis(5000)))(
+ Joined.`with`(Serdes.String, Serdes.String, Serdes.Integer)
+ )
+ .to(JOINED_TOPIC)
+
+ mappedStream
+ .filter((k: String, _: String) => k == "A")
+ .join(stream3)((v1: String, v2: String) => v1 + ":" + v2.toString, JoinWindows.of(Duration.ofMillis(5000)))(
+ Joined.`with`(Serdes.String, Serdes.String, Serdes.String)
+ )
+ .to(JOINED_TOPIC)
+
+ builder
+ }
+
+ // build the Java topology
+ def getTopologyJava: StreamsBuilderJ = {
+
+ val keyValueMapper: KeyValueMapper[String, String, KeyValue[String, String]] =
+ new KeyValueMapper[String, String, KeyValue[String, String]] {
+ override def apply(key: String, value: String): KeyValue[String, String] =
+ KeyValue.pair(key.toUpperCase(Locale.getDefault), value)
+ }
+ val initializer: Initializer[Integer] = new Initializer[Integer] {
+ override def apply(): Integer = 0
+ }
+ val aggregator: Aggregator[String, String, Integer] = new Aggregator[String, String, Integer] {
+ override def apply(key: String, value: String, aggregate: Integer): Integer = aggregate + value.length
+ }
+ val reducer: Reducer[String] = new Reducer[String] {
+ override def apply(v1: String, v2: String): String = v1 + ":" + v2
+ }
+ val valueMapper: ValueMapper[String, String] = new ValueMapper[String, String] {
+ override def apply(v: String): String = v.toUpperCase(Locale.getDefault)
+ }
+ val processorValueCollector = new util.ArrayList[String]
+ val processorSupplier: ProcessorSupplier[String, String] = new ProcessorSupplier[String, String] {
+ override def get() = new SimpleProcessor(processorValueCollector)
+ }
+ val valueJoiner2: ValueJoiner[String, Integer, String] = new ValueJoiner[String, Integer, String] {
+ override def apply(value1: String, value2: Integer): String = value1 + ":" + value2.toString
+ }
+ val valueJoiner3: ValueJoiner[String, String, String] = new ValueJoiner[String, String, String] {
+ override def apply(value1: String, value2: String): String = value1 + ":" + value2.toString
+ }
+
+ val builder = new StreamsBuilderJ
+
+ val sourceStream = builder.stream(inputTopic, Consumed.`with`(Serdes.String, Serdes.String))
+
+ val mappedStream: KStreamJ[String, String] =
+ sourceStream.map(keyValueMapper)
+ mappedStream
+ .filter(new Predicate[String, String] {
+ override def test(key: String, value: String): Boolean = key == "B"
+ })
+ .mapValues[String](valueMapper)
+ .process(processorSupplier)
+
+ val stream2 = mappedStream.groupByKey
+ .aggregate(initializer, aggregator, MaterializedJ.`with`(Serdes.String, SerdesJ.Integer))
+ .toStream
+ stream2.to(AGGREGATION_TOPIC, Produced.`with`(Serdes.String, SerdesJ.Integer))
+
+ // adding operators for case where the repartition node is further downstream
+ val stream3 = mappedStream
+ .filter(new Predicate[String, String] {
+ override def test(k: String, v: String) = true
+ })
+ .peek(new ForeachAction[String, String] {
+ override def apply(k: String, v: String) = System.out.println(k + ":" + v)
+ })
+ .groupByKey
+ .reduce(reducer, MaterializedJ.`with`(Serdes.String, Serdes.String))
+ .toStream
+ stream3.to(REDUCE_TOPIC, Produced.`with`(Serdes.String, Serdes.String))
+
+ mappedStream
+ .filter(new Predicate[String, String] {
+ override def test(key: String, value: String): Boolean = key == "A"
+ })
+ .join(stream2,
+ valueJoiner2,
+ JoinWindows.of(Duration.ofMillis(5000)),
+ JoinedJ.`with`(Serdes.String, Serdes.String, SerdesJ.Integer))
+ .to(JOINED_TOPIC)
+
+ mappedStream
+ .filter(new Predicate[String, String] {
+ override def test(key: String, value: String): Boolean = key == "A"
+ })
+ .join(stream3,
+ valueJoiner3,
+ JoinWindows.of(Duration.ofMillis(5000)),
+ JoinedJ.`with`(Serdes.String, Serdes.String, SerdesJ.String))
+ .to(JOINED_TOPIC)
+
+ builder
+ }
+
+ assertNotEquals(getTopologyScala.build(props).describe.toString,
+ getTopologyScala.build(propsNoOptimization).describe.toString)
+ assertEquals(getTopologyScala.build(propsNoOptimization).describe.toString,
+ getTopologyJava.build(propsNoOptimization).describe.toString)
+ assertEquals(getTopologyScala.build(props).describe.toString, getTopologyJava.build(props).describe.toString)
+ }
+
+ private class SimpleProcessor private[TopologyTest] (val valueList: util.List[String])
+ extends AbstractProcessor[String, String] {
+ override def process(key: String, value: String): Unit =
+ valueList.add(value)
+ }
}