You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@kafka.apache.org by vv...@apache.org on 2019/12/21 23:40:06 UTC
[kafka] 01/02: KAFKA-9011: Scala bindings for flatTransform and
flatTransformValues in KStream (#7520)
This is an automated email from the ASF dual-hosted git repository.
vvcephei pushed a commit to branch 2.4
in repository https://gitbox.apache.org/repos/asf/kafka.git
commit adeb5f7843d8e02aba6012c392957679fb21d20e
Author: Alex Kokachev <al...@zeroaccess.tech>
AuthorDate: Wed Nov 13 02:23:24 2019 +1100
KAFKA-9011: Scala bindings for flatTransform and flatTransformValues in KStream (#7520)
Reviewers: John Roesler <jo...@confluent.io>, Bill Bejeck <bb...@gmail.com>
---
.../streams/scala/FunctionsCompatConversions.scala | 38 ++++++
.../kafka/streams/scala/kstream/KStream.scala | 57 ++++++++
.../kafka/streams/scala/kstream/KStreamTest.scala | 148 ++++++++++++++++++++-
3 files changed, 242 insertions(+), 1 deletion(-)
diff --git a/streams/streams-scala/src/main/scala/org/apache/kafka/streams/scala/FunctionsCompatConversions.scala b/streams/streams-scala/src/main/scala/org/apache/kafka/streams/scala/FunctionsCompatConversions.scala
index 7cd3ac8..26756e0 100644
--- a/streams/streams-scala/src/main/scala/org/apache/kafka/streams/scala/FunctionsCompatConversions.scala
+++ b/streams/streams-scala/src/main/scala/org/apache/kafka/streams/scala/FunctionsCompatConversions.scala
@@ -21,6 +21,8 @@ import org.apache.kafka.streams.kstream._
import scala.collection.JavaConverters._
import java.lang.{Iterable => JIterable}
+import org.apache.kafka.streams.processor.ProcessorContext
+
/**
* Implicit classes that offer conversions of Scala function literals to
* SAM (Single Abstract Method) objects in Java. These make the Scala APIs much
@@ -120,4 +122,40 @@ private[scala] object FunctionsCompatConversions {
override def get(): Transformer[K, V, VO] = f()
}
}
+
+ implicit class TransformerSupplierAsJava[K, V, VO](val supplier: TransformerSupplier[K, V, Iterable[VO]])
+ extends AnyVal {
+ def asJava: TransformerSupplier[K, V, JIterable[VO]] = new TransformerSupplier[K, V, JIterable[VO]] {
+ override def get(): Transformer[K, V, JIterable[VO]] =
+ new Transformer[K, V, JIterable[VO]] {
+ override def transform(key: K, value: V): JIterable[VO] = supplier.get().transform(key, value).asJava
+ override def init(context: ProcessorContext): Unit = supplier.get().init(context)
+ override def close(): Unit = supplier.get().close()
+ }
+ }
+ }
+ implicit class ValueTransformerSupplierAsJava[V, VO](val supplier: ValueTransformerSupplier[V, Iterable[VO]])
+ extends AnyVal {
+ def asJava: ValueTransformerSupplier[V, JIterable[VO]] = new ValueTransformerSupplier[V, JIterable[VO]] {
+ override def get(): ValueTransformer[V, JIterable[VO]] =
+ new ValueTransformer[V, JIterable[VO]] {
+ override def transform(value: V): JIterable[VO] = supplier.get().transform(value).asJava
+ override def init(context: ProcessorContext): Unit = supplier.get().init(context)
+ override def close(): Unit = supplier.get().close()
+ }
+ }
+ }
+ implicit class ValueTransformerSupplierWithKeyAsJava[K, V, VO](
+ val supplier: ValueTransformerWithKeySupplier[K, V, Iterable[VO]]
+ ) extends AnyVal {
+ def asJava: ValueTransformerWithKeySupplier[K, V, JIterable[VO]] =
+ new ValueTransformerWithKeySupplier[K, V, JIterable[VO]] {
+ override def get(): ValueTransformerWithKey[K, V, JIterable[VO]] =
+ new ValueTransformerWithKey[K, V, JIterable[VO]] {
+ override def transform(key: K, value: V): JIterable[VO] = supplier.get().transform(key, value).asJava
+ override def init(context: ProcessorContext): Unit = supplier.get().init(context)
+ override def close(): Unit = supplier.get().close()
+ }
+ }
+ }
}
diff --git a/streams/streams-scala/src/main/scala/org/apache/kafka/streams/scala/kstream/KStream.scala b/streams/streams-scala/src/main/scala/org/apache/kafka/streams/scala/kstream/KStream.scala
index 9d19418..affa347 100644
--- a/streams/streams-scala/src/main/scala/org/apache/kafka/streams/scala/kstream/KStream.scala
+++ b/streams/streams-scala/src/main/scala/org/apache/kafka/streams/scala/kstream/KStream.scala
@@ -308,6 +308,63 @@ class KStream[K, V](val inner: KStreamJ[K, V]) {
inner.transform(transformerSupplier, stateStoreNames: _*)
/**
+ * Transform each record of the input stream into zero or more records in the output stream (both key and value type
+ * can be altered arbitrarily).
+ * A `Transformer` (provided by the given `TransformerSupplier`) is applied to each input record
+ * and computes zero or more output records.
+ * In order to assign a state, the state must be created and added via `addStateStore` before they can be connected
+ * to the `Transformer`.
+ * It's not required to connect global state stores that are added via `addGlobalStore`;
+ * read-only access to global state stores is available by default.
+ *
+ * @param transformerSupplier the `TransformerSuplier` that generates `Transformer`
+ * @param stateStoreNames the names of the state stores used by the processor
+ * @return a [[KStream]] that contains more or less records with new key and value (possibly of different type)
+ * @see `org.apache.kafka.streams.kstream.KStream#transform`
+ */
+ def flatTransform[K1, V1](transformerSupplier: TransformerSupplier[K, V, Iterable[KeyValue[K1, V1]]],
+ stateStoreNames: String*): KStream[K1, V1] =
+ inner.flatTransform(transformerSupplier.asJava, stateStoreNames: _*)
+
+ /**
+ * Transform the value of each input record into zero or more records (with possible new type) in the
+ * output stream.
+ * A `ValueTransformer` (provided by the given `ValueTransformerSupplier`) is applied to each input
+ * record value and computes a new value for it.
+ * In order to assign a state, the state must be created and added via `addStateStore` before they can be connected
+ * to the `ValueTransformer`.
+ * It's not required to connect global state stores that are added via `addGlobalStore`;
+ * read-only access to global state stores is available by default.
+ *
+ * @param valueTransformerSupplier a instance of `ValueTransformerSupplier` that generates a `ValueTransformer`
+ * @param stateStoreNames the names of the state stores used by the processor
+ * @return a [[KStream]] that contains records with unmodified key and new values (possibly of different type)
+ * @see `org.apache.kafka.streams.kstream.KStream#transformValues`
+ */
+ def flatTransformValues[VR](valueTransformerSupplier: ValueTransformerSupplier[V, Iterable[VR]],
+ stateStoreNames: String*): KStream[K, VR] =
+ inner.flatTransformValues[VR](valueTransformerSupplier.asJava, stateStoreNames: _*)
+
+ /**
+ * Transform the value of each input record into zero or more records (with possible new type) in the
+ * output stream.
+ * A `ValueTransformer` (provided by the given `ValueTransformerSupplier`) is applied to each input
+ * record value and computes a new value for it.
+ * In order to assign a state, the state must be created and added via `addStateStore` before they can be connected
+ * to the `ValueTransformer`.
+ * It's not required to connect global state stores that are added via `addGlobalStore`;
+ * read-only access to global state stores is available by default.
+ *
+ * @param valueTransformerSupplier a instance of `ValueTransformerWithKeySupplier` that generates a `ValueTransformerWithKey`
+ * @param stateStoreNames the names of the state stores used by the processor
+ * @return a [[KStream]] that contains records with unmodified key and new values (possibly of different type)
+ * @see `org.apache.kafka.streams.kstream.KStream#transformValues`
+ */
+ def flatTransformValues[VR](valueTransformerSupplier: ValueTransformerWithKeySupplier[K, V, Iterable[VR]],
+ stateStoreNames: String*): KStream[K, VR] =
+ inner.flatTransformValues[VR](valueTransformerSupplier.asJava, stateStoreNames: _*)
+
+ /**
* Transform the value of each input record into a new value (with possible new type) of the output record.
* A `ValueTransformer` (provided by the given `ValueTransformerSupplier`) is applied to each input
* record value and computes a new value for it.
diff --git a/streams/streams-scala/src/test/scala/org/apache/kafka/streams/scala/kstream/KStreamTest.scala b/streams/streams-scala/src/test/scala/org/apache/kafka/streams/scala/kstream/KStreamTest.scala
index b3bcfe9..36441a0 100644
--- a/streams/streams-scala/src/test/scala/org/apache/kafka/streams/scala/kstream/KStreamTest.scala
+++ b/streams/streams-scala/src/test/scala/org/apache/kafka/streams/scala/kstream/KStreamTest.scala
@@ -21,7 +21,17 @@ package org.apache.kafka.streams.scala.kstream
import java.time.Duration.ofSeconds
import java.time.Instant
-import org.apache.kafka.streams.kstream.JoinWindows
+import org.apache.kafka.streams.KeyValue
+import org.apache.kafka.streams.kstream.{
+ JoinWindows,
+ Transformer,
+ TransformerSupplier,
+ ValueTransformer,
+ ValueTransformerSupplier,
+ ValueTransformerWithKey,
+ ValueTransformerWithKeySupplier
+}
+import org.apache.kafka.streams.processor.ProcessorContext
import org.apache.kafka.streams.scala.ImplicitConversions._
import org.apache.kafka.streams.scala.Serdes._
import org.apache.kafka.streams.scala.StreamsBuilder
@@ -173,4 +183,140 @@ class KStreamTest extends FlatSpec with Matchers with TestDriver {
testDriver.close()
}
+
+ "transform a KStream" should "transform correctly records" in {
+ class TestTransformer extends Transformer[String, String, KeyValue[String, String]] {
+ override def init(context: ProcessorContext): Unit = {}
+ override def transform(key: String, value: String): KeyValue[String, String] =
+ new KeyValue(s"$key-transformed", s"$value-transformed")
+ override def close(): Unit = {}
+ }
+ val builder = new StreamsBuilder()
+ val sourceTopic = "source"
+ val sinkTopic = "sink"
+
+ val stream = builder.stream[String, String](sourceTopic)
+ stream
+ .transform(new TransformerSupplier[String, String, KeyValue[String, String]] {
+ def get(): Transformer[String, String, KeyValue[String, String]] =
+ new TestTransformer
+ })
+ .to(sinkTopic)
+
+ val now = Instant.now()
+ val testDriver = createTestDriver(builder, now)
+ val testInput = testDriver.createInput[String, String](sourceTopic)
+ val testOutput = testDriver.createOutput[String, String](sinkTopic)
+
+ testInput.pipeInput("1", "value", now)
+
+ val result = testOutput.readKeyValue()
+ result.value shouldBe "value-transformed"
+ result.key shouldBe "1-transformed"
+
+ testOutput.isEmpty shouldBe true
+
+ testDriver.close()
+ }
+
+ "flatTransform a KStream" should "flatTransform correctly records" in {
+ class TestTransformer extends Transformer[String, String, Iterable[KeyValue[String, String]]] {
+ override def init(context: ProcessorContext): Unit = {}
+ override def transform(key: String, value: String): Iterable[KeyValue[String, String]] =
+ Array(new KeyValue(s"$key-transformed", s"$value-transformed"))
+ override def close(): Unit = {}
+ }
+ val builder = new StreamsBuilder()
+ val sourceTopic = "source"
+ val sinkTopic = "sink"
+
+ val stream = builder.stream[String, String](sourceTopic)
+ stream
+ .flatTransform(new TransformerSupplier[String, String, Iterable[KeyValue[String, String]]] {
+ def get(): Transformer[String, String, Iterable[KeyValue[String, String]]] =
+ new TestTransformer
+ })
+ .to(sinkTopic)
+
+ val now = Instant.now()
+ val testDriver = createTestDriver(builder, now)
+ val testInput = testDriver.createInput[String, String](sourceTopic)
+ val testOutput = testDriver.createOutput[String, String](sinkTopic)
+
+ testInput.pipeInput("1", "value", now)
+
+ val result = testOutput.readKeyValue()
+ result.value shouldBe "value-transformed"
+ result.key shouldBe "1-transformed"
+
+ testOutput.isEmpty shouldBe true
+
+ testDriver.close()
+ }
+
+ "flatTransformValues a KStream" should "correctly flatTransform values in records" in {
+ class TestTransformer extends ValueTransformer[String, Iterable[String]] {
+ override def init(context: ProcessorContext): Unit = {}
+ override def transform(value: String): Iterable[String] =
+ Array(s"$value-transformed")
+ override def close(): Unit = {}
+ }
+ val builder = new StreamsBuilder()
+ val sourceTopic = "source"
+ val sinkTopic = "sink"
+
+ val stream = builder.stream[String, String](sourceTopic)
+ stream
+ .flatTransformValues(new ValueTransformerSupplier[String, Iterable[String]] {
+ def get(): ValueTransformer[String, Iterable[String]] =
+ new TestTransformer
+ })
+ .to(sinkTopic)
+
+ val now = Instant.now()
+ val testDriver = createTestDriver(builder, now)
+ val testInput = testDriver.createInput[String, String](sourceTopic)
+ val testOutput = testDriver.createOutput[String, String](sinkTopic)
+
+ testInput.pipeInput("1", "value", now)
+
+ testOutput.readValue shouldBe "value-transformed"
+
+ testOutput.isEmpty shouldBe true
+
+ testDriver.close()
+ }
+
+ "flatTransformValues with key in a KStream" should "correctly flatTransformValues in records" in {
+ class TestTransformer extends ValueTransformerWithKey[String, String, Iterable[String]] {
+ override def init(context: ProcessorContext): Unit = {}
+ override def transform(key: String, value: String): Iterable[String] =
+ Array(s"$value-transformed-$key")
+ override def close(): Unit = {}
+ }
+ val builder = new StreamsBuilder()
+ val sourceTopic = "source"
+ val sinkTopic = "sink"
+
+ val stream = builder.stream[String, String](sourceTopic)
+ stream
+ .flatTransformValues(new ValueTransformerWithKeySupplier[String, String, Iterable[String]] {
+ def get(): ValueTransformerWithKey[String, String, Iterable[String]] =
+ new TestTransformer
+ })
+ .to(sinkTopic)
+
+ val now = Instant.now()
+ val testDriver = createTestDriver(builder, now)
+ val testInput = testDriver.createInput[String, String](sourceTopic)
+ val testOutput = testDriver.createOutput[String, String](sinkTopic)
+
+ testInput.pipeInput("1", "value", now)
+
+ testOutput.readValue shouldBe "value-transformed-1"
+
+ testOutput.isEmpty shouldBe true
+
+ testDriver.close()
+ }
}