You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@kafka.apache.org by vv...@apache.org on 2019/12/21 23:40:06 UTC

[kafka] 01/02: KAFKA-9011: Scala bindings for flatTransform and flatTransformValues in KStream (#7520)

This is an automated email from the ASF dual-hosted git repository.

vvcephei pushed a commit to branch 2.4
in repository https://gitbox.apache.org/repos/asf/kafka.git

commit adeb5f7843d8e02aba6012c392957679fb21d20e
Author: Alex Kokachev <al...@zeroaccess.tech>
AuthorDate: Wed Nov 13 02:23:24 2019 +1100

    KAFKA-9011: Scala bindings for flatTransform and flatTransformValues in KStream (#7520)
    
    Reviewers: John Roesler <jo...@confluent.io>, Bill Bejeck <bb...@gmail.com>
---
 .../streams/scala/FunctionsCompatConversions.scala |  38 ++++++
 .../kafka/streams/scala/kstream/KStream.scala      |  57 ++++++++
 .../kafka/streams/scala/kstream/KStreamTest.scala  | 148 ++++++++++++++++++++-
 3 files changed, 242 insertions(+), 1 deletion(-)

diff --git a/streams/streams-scala/src/main/scala/org/apache/kafka/streams/scala/FunctionsCompatConversions.scala b/streams/streams-scala/src/main/scala/org/apache/kafka/streams/scala/FunctionsCompatConversions.scala
index 7cd3ac8..26756e0 100644
--- a/streams/streams-scala/src/main/scala/org/apache/kafka/streams/scala/FunctionsCompatConversions.scala
+++ b/streams/streams-scala/src/main/scala/org/apache/kafka/streams/scala/FunctionsCompatConversions.scala
@@ -21,6 +21,8 @@ import org.apache.kafka.streams.kstream._
 import scala.collection.JavaConverters._
 import java.lang.{Iterable => JIterable}
 
+import org.apache.kafka.streams.processor.ProcessorContext
+
 /**
  * Implicit classes that offer conversions of Scala function literals to
  * SAM (Single Abstract Method) objects in Java. These make the Scala APIs much
@@ -120,4 +122,40 @@ private[scala] object FunctionsCompatConversions {
       override def get(): Transformer[K, V, VO] = f()
     }
   }
+
+  implicit class TransformerSupplierAsJava[K, V, VO](val supplier: TransformerSupplier[K, V, Iterable[VO]])
+      extends AnyVal {
+    def asJava: TransformerSupplier[K, V, JIterable[VO]] = new TransformerSupplier[K, V, JIterable[VO]] {
+      override def get(): Transformer[K, V, JIterable[VO]] =
+        new Transformer[K, V, JIterable[VO]] {
+          override def transform(key: K, value: V): JIterable[VO] = supplier.get().transform(key, value).asJava
+          override def init(context: ProcessorContext): Unit = supplier.get().init(context)
+          override def close(): Unit = supplier.get().close()
+        }
+    }
+  }
+  implicit class ValueTransformerSupplierAsJava[V, VO](val supplier: ValueTransformerSupplier[V, Iterable[VO]])
+      extends AnyVal {
+    def asJava: ValueTransformerSupplier[V, JIterable[VO]] = new ValueTransformerSupplier[V, JIterable[VO]] {
+      override def get(): ValueTransformer[V, JIterable[VO]] =
+        new ValueTransformer[V, JIterable[VO]] {
+          override def transform(value: V): JIterable[VO] = supplier.get().transform(value).asJava
+          override def init(context: ProcessorContext): Unit = supplier.get().init(context)
+          override def close(): Unit = supplier.get().close()
+        }
+    }
+  }
+  implicit class ValueTransformerSupplierWithKeyAsJava[K, V, VO](
+    val supplier: ValueTransformerWithKeySupplier[K, V, Iterable[VO]]
+  ) extends AnyVal {
+    def asJava: ValueTransformerWithKeySupplier[K, V, JIterable[VO]] =
+      new ValueTransformerWithKeySupplier[K, V, JIterable[VO]] {
+        override def get(): ValueTransformerWithKey[K, V, JIterable[VO]] =
+          new ValueTransformerWithKey[K, V, JIterable[VO]] {
+            override def transform(key: K, value: V): JIterable[VO] = supplier.get().transform(key, value).asJava
+            override def init(context: ProcessorContext): Unit = supplier.get().init(context)
+            override def close(): Unit = supplier.get().close()
+          }
+      }
+  }
 }
diff --git a/streams/streams-scala/src/main/scala/org/apache/kafka/streams/scala/kstream/KStream.scala b/streams/streams-scala/src/main/scala/org/apache/kafka/streams/scala/kstream/KStream.scala
index 9d19418..affa347 100644
--- a/streams/streams-scala/src/main/scala/org/apache/kafka/streams/scala/kstream/KStream.scala
+++ b/streams/streams-scala/src/main/scala/org/apache/kafka/streams/scala/kstream/KStream.scala
@@ -308,6 +308,63 @@ class KStream[K, V](val inner: KStreamJ[K, V]) {
     inner.transform(transformerSupplier, stateStoreNames: _*)
 
   /**
+   * Transform each record of the input stream into zero or more records in the output stream (both key and value type
+   * can be altered arbitrarily).
+   * A `Transformer` (provided by the given `TransformerSupplier`) is applied to each input record
+   * and computes zero or more output records.
+   * In order to assign a state, the state must be created and added via `addStateStore` before they can be connected
+   * to the `Transformer`.
+   * It's not required to connect global state stores that are added via `addGlobalStore`;
+   * read-only access to global state stores is available by default.
+   *
+   * @param transformerSupplier the `TransformerSuplier` that generates `Transformer`
+   * @param stateStoreNames     the names of the state stores used by the processor
+   * @return a [[KStream]] that contains more or less records with new key and value (possibly of different type)
+   * @see `org.apache.kafka.streams.kstream.KStream#transform`
+   */
+  def flatTransform[K1, V1](transformerSupplier: TransformerSupplier[K, V, Iterable[KeyValue[K1, V1]]],
+                            stateStoreNames: String*): KStream[K1, V1] =
+    inner.flatTransform(transformerSupplier.asJava, stateStoreNames: _*)
+
+  /**
+   * Transform the value of each input record into zero or more records (with possible new type) in the
+   * output stream.
+   * A `ValueTransformer` (provided by the given `ValueTransformerSupplier`) is applied to each input
+   * record value and computes a new value for it.
+   * In order to assign a state, the state must be created and added via `addStateStore` before they can be connected
+   * to the `ValueTransformer`.
+   * It's not required to connect global state stores that are added via `addGlobalStore`;
+   * read-only access to global state stores is available by default.
+   *
+   * @param valueTransformerSupplier a instance of `ValueTransformerSupplier` that generates a `ValueTransformer`
+   * @param stateStoreNames          the names of the state stores used by the processor
+   * @return a [[KStream]] that contains records with unmodified key and new values (possibly of different type)
+   * @see `org.apache.kafka.streams.kstream.KStream#transformValues`
+   */
+  def flatTransformValues[VR](valueTransformerSupplier: ValueTransformerSupplier[V, Iterable[VR]],
+                              stateStoreNames: String*): KStream[K, VR] =
+    inner.flatTransformValues[VR](valueTransformerSupplier.asJava, stateStoreNames: _*)
+
+  /**
+   * Transform the value of each input record into zero or more records (with possible new type) in the
+   * output stream.
+   * A `ValueTransformer` (provided by the given `ValueTransformerSupplier`) is applied to each input
+   * record value and computes a new value for it.
+   * In order to assign a state, the state must be created and added via `addStateStore` before they can be connected
+   * to the `ValueTransformer`.
+   * It's not required to connect global state stores that are added via `addGlobalStore`;
+   * read-only access to global state stores is available by default.
+   *
+   * @param valueTransformerSupplier a instance of `ValueTransformerWithKeySupplier` that generates a `ValueTransformerWithKey`
+   * @param stateStoreNames          the names of the state stores used by the processor
+   * @return a [[KStream]] that contains records with unmodified key and new values (possibly of different type)
+   * @see `org.apache.kafka.streams.kstream.KStream#transformValues`
+   */
+  def flatTransformValues[VR](valueTransformerSupplier: ValueTransformerWithKeySupplier[K, V, Iterable[VR]],
+                              stateStoreNames: String*): KStream[K, VR] =
+    inner.flatTransformValues[VR](valueTransformerSupplier.asJava, stateStoreNames: _*)
+
+  /**
    * Transform the value of each input record into a new value (with possible new type) of the output record.
    * A `ValueTransformer` (provided by the given `ValueTransformerSupplier`) is applied to each input
    * record value and computes a new value for it.
diff --git a/streams/streams-scala/src/test/scala/org/apache/kafka/streams/scala/kstream/KStreamTest.scala b/streams/streams-scala/src/test/scala/org/apache/kafka/streams/scala/kstream/KStreamTest.scala
index b3bcfe9..36441a0 100644
--- a/streams/streams-scala/src/test/scala/org/apache/kafka/streams/scala/kstream/KStreamTest.scala
+++ b/streams/streams-scala/src/test/scala/org/apache/kafka/streams/scala/kstream/KStreamTest.scala
@@ -21,7 +21,17 @@ package org.apache.kafka.streams.scala.kstream
 import java.time.Duration.ofSeconds
 import java.time.Instant
 
-import org.apache.kafka.streams.kstream.JoinWindows
+import org.apache.kafka.streams.KeyValue
+import org.apache.kafka.streams.kstream.{
+  JoinWindows,
+  Transformer,
+  TransformerSupplier,
+  ValueTransformer,
+  ValueTransformerSupplier,
+  ValueTransformerWithKey,
+  ValueTransformerWithKeySupplier
+}
+import org.apache.kafka.streams.processor.ProcessorContext
 import org.apache.kafka.streams.scala.ImplicitConversions._
 import org.apache.kafka.streams.scala.Serdes._
 import org.apache.kafka.streams.scala.StreamsBuilder
@@ -173,4 +183,140 @@ class KStreamTest extends FlatSpec with Matchers with TestDriver {
 
     testDriver.close()
   }
+
+  "transform a KStream" should "transform correctly records" in {
+    class TestTransformer extends Transformer[String, String, KeyValue[String, String]] {
+      override def init(context: ProcessorContext): Unit = {}
+      override def transform(key: String, value: String): KeyValue[String, String] =
+        new KeyValue(s"$key-transformed", s"$value-transformed")
+      override def close(): Unit = {}
+    }
+    val builder = new StreamsBuilder()
+    val sourceTopic = "source"
+    val sinkTopic = "sink"
+
+    val stream = builder.stream[String, String](sourceTopic)
+    stream
+      .transform(new TransformerSupplier[String, String, KeyValue[String, String]] {
+        def get(): Transformer[String, String, KeyValue[String, String]] =
+          new TestTransformer
+      })
+      .to(sinkTopic)
+
+    val now = Instant.now()
+    val testDriver = createTestDriver(builder, now)
+    val testInput = testDriver.createInput[String, String](sourceTopic)
+    val testOutput = testDriver.createOutput[String, String](sinkTopic)
+
+    testInput.pipeInput("1", "value", now)
+
+    val result = testOutput.readKeyValue()
+    result.value shouldBe "value-transformed"
+    result.key shouldBe "1-transformed"
+
+    testOutput.isEmpty shouldBe true
+
+    testDriver.close()
+  }
+
+  "flatTransform a KStream" should "flatTransform correctly records" in {
+    class TestTransformer extends Transformer[String, String, Iterable[KeyValue[String, String]]] {
+      override def init(context: ProcessorContext): Unit = {}
+      override def transform(key: String, value: String): Iterable[KeyValue[String, String]] =
+        Array(new KeyValue(s"$key-transformed", s"$value-transformed"))
+      override def close(): Unit = {}
+    }
+    val builder = new StreamsBuilder()
+    val sourceTopic = "source"
+    val sinkTopic = "sink"
+
+    val stream = builder.stream[String, String](sourceTopic)
+    stream
+      .flatTransform(new TransformerSupplier[String, String, Iterable[KeyValue[String, String]]] {
+        def get(): Transformer[String, String, Iterable[KeyValue[String, String]]] =
+          new TestTransformer
+      })
+      .to(sinkTopic)
+
+    val now = Instant.now()
+    val testDriver = createTestDriver(builder, now)
+    val testInput = testDriver.createInput[String, String](sourceTopic)
+    val testOutput = testDriver.createOutput[String, String](sinkTopic)
+
+    testInput.pipeInput("1", "value", now)
+
+    val result = testOutput.readKeyValue()
+    result.value shouldBe "value-transformed"
+    result.key shouldBe "1-transformed"
+
+    testOutput.isEmpty shouldBe true
+
+    testDriver.close()
+  }
+
+  "flatTransformValues a KStream" should "correctly flatTransform values in records" in {
+    class TestTransformer extends ValueTransformer[String, Iterable[String]] {
+      override def init(context: ProcessorContext): Unit = {}
+      override def transform(value: String): Iterable[String] =
+        Array(s"$value-transformed")
+      override def close(): Unit = {}
+    }
+    val builder = new StreamsBuilder()
+    val sourceTopic = "source"
+    val sinkTopic = "sink"
+
+    val stream = builder.stream[String, String](sourceTopic)
+    stream
+      .flatTransformValues(new ValueTransformerSupplier[String, Iterable[String]] {
+        def get(): ValueTransformer[String, Iterable[String]] =
+          new TestTransformer
+      })
+      .to(sinkTopic)
+
+    val now = Instant.now()
+    val testDriver = createTestDriver(builder, now)
+    val testInput = testDriver.createInput[String, String](sourceTopic)
+    val testOutput = testDriver.createOutput[String, String](sinkTopic)
+
+    testInput.pipeInput("1", "value", now)
+
+    testOutput.readValue shouldBe "value-transformed"
+
+    testOutput.isEmpty shouldBe true
+
+    testDriver.close()
+  }
+
+  "flatTransformValues with key in a KStream" should "correctly flatTransformValues in records" in {
+    class TestTransformer extends ValueTransformerWithKey[String, String, Iterable[String]] {
+      override def init(context: ProcessorContext): Unit = {}
+      override def transform(key: String, value: String): Iterable[String] =
+        Array(s"$value-transformed-$key")
+      override def close(): Unit = {}
+    }
+    val builder = new StreamsBuilder()
+    val sourceTopic = "source"
+    val sinkTopic = "sink"
+
+    val stream = builder.stream[String, String](sourceTopic)
+    stream
+      .flatTransformValues(new ValueTransformerWithKeySupplier[String, String, Iterable[String]] {
+        def get(): ValueTransformerWithKey[String, String, Iterable[String]] =
+          new TestTransformer
+      })
+      .to(sinkTopic)
+
+    val now = Instant.now()
+    val testDriver = createTestDriver(builder, now)
+    val testInput = testDriver.createInput[String, String](sourceTopic)
+    val testOutput = testDriver.createOutput[String, String](sinkTopic)
+
+    testInput.pipeInput("1", "value", now)
+
+    testOutput.readValue shouldBe "value-transformed-1"
+
+    testOutput.isEmpty shouldBe true
+
+    testDriver.close()
+  }
 }