You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@kafka.apache.org by bb...@apache.org on 2019/04/16 16:10:56 UTC

[kafka] branch trunk updated: KAFKA-7875: Add KStream.flatTransformValues (#6424)

This is an automated email from the ASF dual-hosted git repository.

bbejeck pushed a commit to branch trunk
in repository https://gitbox.apache.org/repos/asf/kafka.git


The following commit(s) were added to refs/heads/trunk by this push:
     new 05668e9  KAFKA-7875: Add KStream.flatTransformValues (#6424)
05668e9 is described below

commit 05668e98f531cf4d6ddb0696f0f72675ca128581
Author: cadonna <br...@confluent.io>
AuthorDate: Tue Apr 16 09:10:38 2019 -0700

    KAFKA-7875: Add KStream.flatTransformValues (#6424)
    
    Adds flatTrasformValues methods in KStream
    Adds processor supplier and processor for flatTransformValues
    Improves API documentation of transformValues
    
    Reviewers: Matthias J. Sax <mj...@apache.org>,  John Roesler <jo...@confluent.io>, Bill Bejeck <bb...@gmail.com>
---
 .../org/apache/kafka/streams/kstream/KStream.java  | 225 +++++++++++++++++--
 .../internals/KStreamFlatTransformValues.java      |  70 ++++++
 .../streams/kstream/internals/KStreamImpl.java     |  39 +++-
 .../KStreamTransformIntegrationTest.java           | 241 +++++++++++++++++----
 .../internals/KStreamFlatTransformValuesTest.java  | 135 ++++++++++++
 .../streams/kstream/internals/KStreamImplTest.java |  34 ++-
 6 files changed, 679 insertions(+), 65 deletions(-)

diff --git a/streams/src/main/java/org/apache/kafka/streams/kstream/KStream.java b/streams/src/main/java/org/apache/kafka/streams/kstream/KStream.java
index 7faba82..8375336 100644
--- a/streams/src/main/java/org/apache/kafka/streams/kstream/KStream.java
+++ b/streams/src/main/java/org/apache/kafka/streams/kstream/KStream.java
@@ -263,6 +263,8 @@ public interface KStream<K, V> {
      * @see #flatTransform(TransformerSupplier, String...)
      * @see #transformValues(ValueTransformerSupplier, String...)
      * @see #transformValues(ValueTransformerWithKeySupplier, String...)
+     * @see #flatTransformValues(ValueTransformerSupplier, String...)
+     * @see #flatTransformValues(ValueTransformerWithKeySupplier, String...)
      */
     <KR, VR> KStream<KR, VR> flatMap(final KeyValueMapper<? super K, ? super V, ? extends Iterable<? extends KeyValue<? extends KR, ? extends VR>>> mapper);
 
@@ -304,6 +306,8 @@ public interface KStream<K, V> {
      * @see #flatTransform(TransformerSupplier, String...)
      * @see #transformValues(ValueTransformerSupplier, String...)
      * @see #transformValues(ValueTransformerWithKeySupplier, String...)
+     * @see #flatTransformValues(ValueTransformerSupplier, String...)
+     * @see #flatTransformValues(ValueTransformerWithKeySupplier, String...)
      */
     <VR> KStream<K, VR> flatMapValues(final ValueMapper<? super V, ? extends Iterable<? extends VR>> mapper);
 
@@ -351,6 +355,8 @@ public interface KStream<K, V> {
      * @see #flatTransform(TransformerSupplier, String...)
      * @see #transformValues(ValueTransformerSupplier, String...)
      * @see #transformValues(ValueTransformerWithKeySupplier, String...)
+     * @see #flatTransformValues(ValueTransformerSupplier, String...)
+     * @see #flatTransformValues(ValueTransformerWithKeySupplier, String...)
      */
     <VR> KStream<K, VR> flatMapValues(final ValueMapperWithKey<? super K, ? super V, ? extends Iterable<? extends VR>> mapper);
 
@@ -627,7 +633,7 @@ public interface KStream<K, V> {
      *             Iterable<KeyValue> transform(K key, V value) {
      *                 // can access this.state
      *                 List<KeyValue> result = new ArrayList<>();
-     *                 for (int i = 0; i < n; i++) {
+     *                 for (int i = 0; i < 3; i++) {
      *                     result.add(new KeyValue(key, value));
      *                 }
      *                 return result; // emits a list of key-value pairs via return
@@ -672,7 +678,7 @@ public interface KStream<K, V> {
                                            final String... stateStoreNames);
 
     /**
-     * Transform the value of each input record into a new value (with possible new type) of the output record.
+     * Transform the value of each input record into a new value (with possibly a new type) of the output record.
      * A {@link ValueTransformer} (provided by the given {@link ValueTransformerSupplier}) is applied to each input
      * record value and computes a new value for it.
      * Thus, an input record {@code <K,V>} can be transformed into an output record {@code <K:V'>}.
@@ -680,8 +686,8 @@ public interface KStream<K, V> {
      * Furthermore, via {@link org.apache.kafka.streams.processor.Punctuator#punctuate(long)} the processing progress
      * can be observed and additional periodic actions can be performed.
      * <p>
-     * In order to assign a state, the state must be created and registered beforehand (it's not required to connect
-     * global state stores; read-only access to global state stores is available by default):
+     * In order to assign a state store, the state store must be created and registered beforehand (it's not required to
+     * connect global state stores; read-only access to global state stores is available by default):
      * <pre>{@code
      * // create store
      * StoreBuilder<KeyValueStore<String,String>> keyValueStoreBuilder =
@@ -693,12 +699,16 @@ public interface KStream<K, V> {
      *
      * KStream outputStream = inputStream.transformValues(new ValueTransformerSupplier() { ... }, "myValueTransformState");
      * }</pre>
-     * Within the {@link ValueTransformer}, the state is obtained via the
-     * {@link ProcessorContext}.
+     * Within the {@link ValueTransformer}, the state store is obtained via the {@link ProcessorContext}.
      * To trigger periodic actions via {@link org.apache.kafka.streams.processor.Punctuator#punctuate(long) punctuate()},
      * a schedule must be registered.
+     * The {@link ValueTransformer} must return the new value in {@link ValueTransformer#transform(Object) transform()}.
+     * If the return value of {@link ValueTransformer#transform(Object) ValueTransformer#transform()} is {@null}, no
+     * records are emitted.
      * In contrast to {@link #transform(TransformerSupplier, String...) transform()}, no additional {@link KeyValue}
-     * pairs should be emitted via {@link ProcessorContext#forward(Object, Object) ProcessorContext.forward()}.
+     * pairs can be emitted via {@link ProcessorContext#forward(Object, Object) ProcessorContext.forward()}.
+     * A {@link org.apache.kafka.streams.errors.StreamsException} is thrown if the {@link ValueTransformer} tries to
+     * emit a {@link KeyValue} pair.
      * <pre>{@code
      * new ValueTransformerSupplier() {
      *     ValueTransformer get() {
@@ -724,7 +734,8 @@ public interface KStream<K, V> {
      * }
      * }</pre>
      * Even if any upstream operation was key-changing, no auto-repartition is triggered.
-     * If repartitioning is required, a call to {@link #through(String)} should be performed before {@code transform()}.
+     * If repartitioning is required, a call to {@link #through(String) through()} should be performed before
+     * {@code transformValues()}.
      * <p>
      * Setting a new value preserves data co-location with respect to the key.
      * Thus, <em>no</em> internal data redistribution is required if a key based operator (like an aggregation or join)
@@ -743,7 +754,7 @@ public interface KStream<K, V> {
                                         final String... stateStoreNames);
 
     /**
-     * Transform the value of each input record into a new value (with possible new type) of the output record.
+     * Transform the value of each input record into a new value (with possibly a new type) of the output record.
      * A {@link ValueTransformerWithKey} (provided by the given {@link ValueTransformerWithKeySupplier}) is applied to
      * each input record value and computes a new value for it.
      * Thus, an input record {@code <K,V>} can be transformed into an output record {@code <K:V'>}.
@@ -751,8 +762,8 @@ public interface KStream<K, V> {
      * Furthermore, via {@link org.apache.kafka.streams.processor.Punctuator#punctuate(long)} the processing progress
      * can be observed and additional periodic actions can be performed.
      * <p>
-     * In order to assign a state, the state must be created and registered beforehand (it's not required to connect
-     * global state stores; read-only access to global state stores is available by default):
+     * In order to assign a state store, the state store must be created and registered beforehand (it's not required to
+     * connect global state stores; read-only access to global state stores is available by default):
      * <pre>{@code
      * // create store
      * StoreBuilder<KeyValueStore<String,String>> keyValueStoreBuilder =
@@ -764,13 +775,18 @@ public interface KStream<K, V> {
      *
      * KStream outputStream = inputStream.transformValues(new ValueTransformerWithKeySupplier() { ... }, "myValueTransformState");
      * }</pre>
-     * Within the {@link ValueTransformerWithKey}, the state is obtained via the
-     * {@link ProcessorContext}.
+     * Within the {@link ValueTransformerWithKey}, the state store is obtained via the {@link ProcessorContext}.
      * To trigger periodic actions via {@link org.apache.kafka.streams.processor.Punctuator#punctuate(long) punctuate()},
      * a schedule must be registered.
-     * In contrast to {@link #transform(TransformerSupplier, String...) transform()}, no additional {@link KeyValue}
-     * pairs should be emitted via {@link ProcessorContext#forward(Object, Object)
-     * ProcessorContext.forward()}.
+     * The {@link ValueTransformerWithKey} must return the new value in
+     * {@link ValueTransformerWithKey#transform(Object, Object) transform()}.
+     * If the return value of {@link ValueTransformerWithKey#transform(Object, Object) ValueTransformerWithKey#transform()}
+     * is {@null}, no records are emitted.
+     * In contrast to {@link #transform(TransformerSupplier, String...) transform()} and
+     * {@link #flatTransform(TransformerSupplier, String...) flatTransform()}, no additional {@link KeyValue} pairs
+     * can be emitted via {@link ProcessorContext#forward(Object, Object) ProcessorContext.forward()}.
+     * A {@link org.apache.kafka.streams.errors.StreamsException} is thrown if the {@link ValueTransformerWithKey} tries
+     * to emit a {@link KeyValue} pair.
      * <pre>{@code
      * new ValueTransformerWithKeySupplier() {
      *     ValueTransformerWithKey get() {
@@ -796,7 +812,8 @@ public interface KStream<K, V> {
      * }
      * }</pre>
      * Even if any upstream operation was key-changing, no auto-repartition is triggered.
-     * If repartitioning is required, a call to {@link #through(String)} should be performed before {@code transform()}.
+     * If repartitioning is required, a call to {@link #through(String) through()} should be performed before
+     * {@code transformValues()}.
      * <p>
      * Note that the key is read-only and should not be modified, as this can lead to corrupt partitioning.
      * So, setting a new value preserves data co-location with respect to the key.
@@ -816,6 +833,180 @@ public interface KStream<K, V> {
                                         final String... stateStoreNames);
 
     /**
+     * Transform the value of each input record into zero or more new values (with possibly a new
+     * type) and emit for each new value a record with the same key of the input record and the value.
+     * A {@link ValueTransformer} (provided by the given {@link ValueTransformerSupplier}) is applied to each input
+     * record value and computes zero or more new values.
+     * Thus, an input record {@code <K,V>} can be transformed into output records {@code <K:V'>, <K:V''>, ...}.
+     * This is a stateful record-by-record operation (cf. {@link #mapValues(ValueMapper) mapValues()}).
+     * Furthermore, via {@link org.apache.kafka.streams.processor.Punctuator#punctuate(long) Punctuator#punctuate()}
+     * the processing progress can be observed and additional periodic actions can be performed.
+     * <p>
+     * In order to assign a state store, the state store must be created and registered beforehand:
+     * <pre>{@code
+     * // create store
+     * StoreBuilder<KeyValueStore<String,String>> keyValueStoreBuilder =
+     *         Stores.keyValueStoreBuilder(Stores.persistentKeyValueStore("myValueTransformState"),
+     *                 Serdes.String(),
+     *                 Serdes.String());
+     * // register store
+     * builder.addStateStore(keyValueStoreBuilder);
+     *
+     * KStream outputStream = inputStream.flatTransformValues(new ValueTransformerSupplier() { ... }, "myValueTransformState");
+     * }</pre>
+     * Within the {@link ValueTransformer}, the state store is obtained via the {@link ProcessorContext}.
+     * To trigger periodic actions via {@link org.apache.kafka.streams.processor.Punctuator#punctuate(long) punctuate()},
+     * a schedule must be registered.
+     * The {@link ValueTransformer} must return an {@link java.lang.Iterable} type (e.g., any
+     * {@link java.util.Collection} type) in {@link ValueTransformer#transform(Object)
+     * transform()}.
+     * If the return value of {@link ValueTransformer#transform(Object) ValueTransformer#transform()} is an empty
+     * {@link java.lang.Iterable Iterable} or {@null}, no records are emitted.
+     * In contrast to {@link #transform(TransformerSupplier, String...) transform()} and
+     * {@link #flatTransform(TransformerSupplier, String...) flatTransform()}, no additional {@link KeyValue} pairs
+     * can be emitted via {@link ProcessorContext#forward(Object, Object) ProcessorContext.forward()}.
+     * A {@link org.apache.kafka.streams.errors.StreamsException} is thrown if the {@link ValueTransformer} tries to
+     * emit a {@link KeyValue} pair.
+     * <pre>{@code
+     * new ValueTransformerSupplier() {
+     *     ValueTransformer get() {
+     *         return new ValueTransformer() {
+     *             private StateStore state;
+     *
+     *             void init(ProcessorContext context) {
+     *                 this.state = context.getStateStore("myValueTransformState");
+     *                 // punctuate each second, can access this.state
+     *                 context.schedule(Duration.ofSeconds(1), PunctuationType.WALL_CLOCK_TIME, new Punctuator(..));
+     *             }
+     *
+     *             Iterable<NewValueType> transform(V value) {
+     *                 // can access this.state
+     *                 List<NewValueType> result = new ArrayList<>();
+     *                 for (int i = 0; i < 3; i++) {
+     *                     result.add(new NewValueType(value));
+     *                 }
+     *                 return result; // values
+     *             }
+     *
+     *             void close() {
+     *                 // can access this.state
+     *             }
+     *         }
+     *     }
+     * }
+     * }</pre>
+     * Even if any upstream operation was key-changing, no auto-repartition is triggered.
+     * If repartitioning is required, a call to {@link #through(String) through()} should be performed before
+     * {@code flatTransformValues()}.
+     * <p>
+     * Setting a new value preserves data co-location with respect to the key.
+     * Thus, <em>no</em> internal data redistribution is required if a key based operator (like an aggregation or join)
+     * is applied to the result {@code KStream}. (cf. {@link #flatTransform(TransformerSupplier, String...)
+     * flatTransform()})
+     *
+     * @param valueTransformerSupplier an instance of {@link ValueTransformerSupplier} that generates a
+     *                                 {@link ValueTransformer}
+     * @param stateStoreNames          the names of the state stores used by the processor
+     * @param <VR>                     the value type of the result stream
+     * @return a {@code KStream} that contains more or less records with unmodified key and new values (possibly of
+     * different type)
+     * @see #mapValues(ValueMapper)
+     * @see #mapValues(ValueMapperWithKey)
+     * @see #transform(TransformerSupplier, String...)
+     * @see #flatTransform(TransformerSupplier, String...)
+     */
+    <VR> KStream<K, VR> flatTransformValues(final ValueTransformerSupplier<? super V, Iterable<VR>> valueTransformerSupplier,
+                                            final String... stateStoreNames);
+
+    /**
+     * Transform the value of each input record into zero or more new values (with possibly a new
+     * type) and emit for each new value a record with the same key of the input record and the value.
+     * A {@link ValueTransformerWithKey} (provided by the given {@link ValueTransformerWithKeySupplier}) is applied to
+     * each input record value and computes zero or more new values.
+     * Thus, an input record {@code <K,V>} can be transformed into output records {@code <K:V'>, <K:V''>, ...}.
+     * This is a stateful record-by-record operation (cf. {@link #flatMapValues(ValueMapperWithKey) flatMapValues()}).
+     * Furthermore, via {@link org.apache.kafka.streams.processor. Punctuator#punctuate()} the processing progress can
+     * be observed and additional periodic actions can be performed.
+     * <p>
+     * In order to assign a state store, the state store must be created and registered beforehand:
+     * <pre>{@code
+     * // create store
+     * StoreBuilder<KeyValueStore<String,String>> keyValueStoreBuilder =
+     *         Stores.keyValueStoreBuilder(Stores.persistentKeyValueStore("myValueTransformState"),
+     *                 Serdes.String(),
+     *                 Serdes.String());
+     * // register store
+     * builder.addStateStore(keyValueStoreBuilder);
+     *
+     * KStream outputStream = inputStream.flatTransformValues(new ValueTransformerWithKeySupplier() { ... }, "myValueTransformState");
+     * }</pre>
+     * Within the {@link ValueTransformerWithKey}, the state store is obtained via the {@link ProcessorContext}.
+     * To trigger periodic actions via {@link org.apache.kafka.streams.processor.Punctuator#punctuate(long) punctuate()},
+     * a schedule must be registered.
+     * The {@link ValueTransformerWithKey} must return an {@link java.lang.Iterable} type (e.g., any
+     * {@link java.util.Collection} type) in {@link ValueTransformerWithKey#transform(Object, Object)
+     * transform()}.
+     * If the return value of {@link ValueTransformerWithKey#transform(Object, Object) ValueTransformerWithKey#transform()}
+     * is an empty {@link java.lang.Iterable Iterable} or {@null}, no records are emitted.
+     * In contrast to {@link #transform(TransformerSupplier, String...) transform()} and
+     * {@link #flatTransform(TransformerSupplier, String...) flatTransform()}, no additional {@link KeyValue} pairs
+     * can be emitted via {@link ProcessorContext#forward(Object, Object) ProcessorContext.forward()}.
+     * A {@link org.apache.kafka.streams.errors.StreamsException} is thrown if the {@link ValueTransformerWithKey} tries
+     * to emit a {@link KeyValue} pair.
+     * <pre>{@code
+     * new ValueTransformerWithKeySupplier() {
+     *     ValueTransformerWithKey get() {
+     *         return new ValueTransformerWithKey() {
+     *             private StateStore state;
+     *
+     *             void init(ProcessorContext context) {
+     *                 this.state = context.getStateStore("myValueTransformState");
+     *                 // punctuate each second, can access this.state
+     *                 context.schedule(Duration.ofSeconds(1), PunctuationType.WALL_CLOCK_TIME, new Punctuator(..));
+     *             }
+     *
+     *             Iterable<NewValueType> transform(K readOnlyKey, V value) {
+     *                 // can access this.state and use read-only key
+     *                 List<NewValueType> result = new ArrayList<>();
+     *                 for (int i = 0; i < 3; i++) {
+     *                     result.add(new NewValueType(readOnlyKey));
+     *                 }
+     *                 return result; // values
+     *             }
+     *
+     *             void close() {
+     *                 // can access this.state
+     *             }
+     *         }
+     *     }
+     * }
+     * }</pre>
+     * Even if any upstream operation was key-changing, no auto-repartition is triggered.
+     * If repartitioning is required, a call to {@link #through(String) through()} should be performed before
+     * {@code flatTransformValues()}.
+     * <p>
+     * Note that the key is read-only and should not be modified, as this can lead to corrupt partitioning.
+     * So, setting a new value preserves data co-location with respect to the key.
+     * Thus, <em>no</em> internal data redistribution is required if a key based operator (like an aggregation or join)
+     * is applied to the result {@code KStream}. (cf. {@link #flatTransform(TransformerSupplier, String...)
+     * flatTransform()})
+     *
+     * @param valueTransformerSupplier a instance of {@link ValueTransformerWithKeySupplier} that generates a
+     *                                 {@link ValueTransformerWithKey}
+     * @param stateStoreNames          the names of the state stores used by the processor
+     * @param <VR>                     the value type of the result stream
+     * @return a {@code KStream} that contains more or less records with unmodified key and new values (possibly of
+     * different type)
+     * @see #mapValues(ValueMapper)
+     * @see #mapValues(ValueMapperWithKey)
+     * @see #transform(TransformerSupplier, String...)
+     * @see #flatTransform(TransformerSupplier, String...)
+     */
+    <VR> KStream<K, VR> flatTransformValues(final ValueTransformerWithKeySupplier<? super K, ? super V, Iterable<VR>> valueTransformerSupplier,
+                                            final String... stateStoreNames);
+
+
+    /**
      * Process all records in this stream, one record at a time, by applying a {@link Processor} (provided by the given
      * {@link ProcessorSupplier}).
      * This is a stateful record-by-record operation (cf. {@link #foreach(ForeachAction)}).
diff --git a/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KStreamFlatTransformValues.java b/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KStreamFlatTransformValues.java
new file mode 100644
index 0000000..40e4b37
--- /dev/null
+++ b/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KStreamFlatTransformValues.java
@@ -0,0 +1,70 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.streams.kstream.internals;
+
+import org.apache.kafka.streams.kstream.ValueTransformerWithKey;
+import org.apache.kafka.streams.kstream.ValueTransformerWithKeySupplier;
+import org.apache.kafka.streams.processor.Processor;
+import org.apache.kafka.streams.processor.ProcessorContext;
+import org.apache.kafka.streams.processor.ProcessorSupplier;
+import org.apache.kafka.streams.processor.internals.ForwardingDisabledProcessorContext;
+
+public class KStreamFlatTransformValues<KIn, VIn, VOut> implements ProcessorSupplier<KIn, VIn> {
+
+    private final ValueTransformerWithKeySupplier<KIn, VIn, Iterable<VOut>> valueTransformerSupplier;
+
+    public KStreamFlatTransformValues(final ValueTransformerWithKeySupplier<KIn, VIn, Iterable<VOut>> valueTransformerWithKeySupplier) {
+        this.valueTransformerSupplier = valueTransformerWithKeySupplier;
+    }
+
+    @Override
+    public Processor<KIn, VIn> get() {
+        return new KStreamFlatTransformValuesProcessor<>(valueTransformerSupplier.get());
+    }
+
+    public static class KStreamFlatTransformValuesProcessor<KIn, VIn, VOut> implements Processor<KIn, VIn> {
+
+        private final ValueTransformerWithKey<KIn, VIn, Iterable<VOut>> valueTransformer;
+        private ProcessorContext context;
+
+        KStreamFlatTransformValuesProcessor(final ValueTransformerWithKey<KIn, VIn, Iterable<VOut>> valueTransformer) {
+            this.valueTransformer = valueTransformer;
+        }
+
+        @Override
+        public void init(final ProcessorContext context) {
+            valueTransformer.init(new ForwardingDisabledProcessorContext(context));
+            this.context = context;
+        }
+
+        @Override
+        public void process(final KIn key, final VIn value) {
+            final Iterable<VOut> transformedValues = valueTransformer.transform(key, value);
+            if (transformedValues != null) {
+                for (final VOut transformedValue : transformedValues) {
+                    context.forward(key, transformedValue);
+                }
+            }
+        }
+
+        @Override
+        public void close() {
+            valueTransformer.close();
+        }
+    }
+
+}
diff --git a/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KStreamImpl.java b/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KStreamImpl.java
index 41260c5..75df058 100644
--- a/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KStreamImpl.java
+++ b/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KStreamImpl.java
@@ -472,7 +472,7 @@ public class KStreamImpl<K, V> extends AbstractStream<K, V> implements KStream<K
     @Override
     public <VR> KStream<K, VR> transformValues(final ValueTransformerSupplier<? super V, ? extends VR> valueTransformerSupplier,
                                                final String... stateStoreNames) {
-        Objects.requireNonNull(valueTransformerSupplier, "valueTransformSupplier can't be null");
+        Objects.requireNonNull(valueTransformerSupplier, "valueTransformerSupplier can't be null");
 
         return doTransformValues(toValueTransformerWithKeySupplier(valueTransformerSupplier), stateStoreNames);
     }
@@ -480,7 +480,7 @@ public class KStreamImpl<K, V> extends AbstractStream<K, V> implements KStream<K
     @Override
     public <VR> KStream<K, VR> transformValues(final ValueTransformerWithKeySupplier<? super K, ? super V, ? extends VR> valueTransformerSupplier,
                                                final String... stateStoreNames) {
-        Objects.requireNonNull(valueTransformerSupplier, "valueTransformSupplier can't be null");
+        Objects.requireNonNull(valueTransformerSupplier, "valueTransformerSupplier can't be null");
 
         return doTransformValues(valueTransformerSupplier, stateStoreNames);
     }
@@ -499,7 +499,40 @@ public class KStreamImpl<K, V> extends AbstractStream<K, V> implements KStream<K
         builder.addGraphNode(this.streamsGraphNode, transformNode);
 
         // cannot inherit value serde
-        return new KStreamImpl<>(name, keySerde, null, sourceNodes, this.repartitionRequired, transformNode, builder);
+        return new KStreamImpl<>(name, keySerde, null, sourceNodes, repartitionRequired, transformNode, builder);
+    }
+
+    @Override
+    public <VR> KStream<K, VR> flatTransformValues(final ValueTransformerSupplier<? super V, Iterable<VR>> valueTransformerSupplier,
+                                                   final String... stateStoreNames) {
+        Objects.requireNonNull(valueTransformerSupplier, "valueTransformerSupplier can't be null");
+
+        return doFlatTransformValues(toValueTransformerWithKeySupplier(valueTransformerSupplier), stateStoreNames);
+    }
+
+    @Override
+    public <VR> KStream<K, VR> flatTransformValues(final ValueTransformerWithKeySupplier<? super K, ? super V, Iterable<VR>> valueTransformerSupplier,
+                                                   final String... stateStoreNames) {
+        Objects.requireNonNull(valueTransformerSupplier, "valueTransformerSupplier can't be null");
+
+        return doFlatTransformValues(valueTransformerSupplier, stateStoreNames);
+    }
+
+    private <VR> KStream<K, VR> doFlatTransformValues(final ValueTransformerWithKeySupplier<? super K, ? super V, Iterable<VR>> valueTransformerWithKeySupplier,
+                                                      final String... stateStoreNames) {
+        final String name = builder.newProcessorName(TRANSFORMVALUES_NAME);
+
+        final StatefulProcessorNode<? super K, ? super V> transformNode = new StatefulProcessorNode<>(
+            name,
+            new ProcessorParameters<>(new KStreamFlatTransformValues<>(valueTransformerWithKeySupplier), name),
+            stateStoreNames
+        );
+
+        transformNode.setValueChangingOperation(true);
+        builder.addGraphNode(this.streamsGraphNode, transformNode);
+
+        // cannot inherit value serde
+        return new KStreamImpl<>(name, keySerde, null, sourceNodes, repartitionRequired, transformNode, builder);
     }
 
     @Override
diff --git a/streams/src/test/java/org/apache/kafka/streams/integration/KStreamTransformIntegrationTest.java b/streams/src/test/java/org/apache/kafka/streams/integration/KStreamTransformIntegrationTest.java
index 8af3375..fa4e33a 100644
--- a/streams/src/test/java/org/apache/kafka/streams/integration/KStreamTransformIntegrationTest.java
+++ b/streams/src/test/java/org/apache/kafka/streams/integration/KStreamTransformIntegrationTest.java
@@ -24,6 +24,8 @@ import org.apache.kafka.streams.KeyValue;
 import org.apache.kafka.streams.StreamsBuilder;
 import org.apache.kafka.streams.kstream.KStream;
 import org.apache.kafka.streams.kstream.Transformer;
+import org.apache.kafka.streams.kstream.ValueTransformer;
+import org.apache.kafka.streams.kstream.ValueTransformerWithKey;
 import org.apache.kafka.streams.processor.ProcessorContext;
 import org.apache.kafka.streams.state.KeyValueStore;
 import org.apache.kafka.streams.state.StoreBuilder;
@@ -53,16 +55,11 @@ public class KStreamTransformIntegrationTest {
     private final String topic = "stream";
     private final String stateStoreName = "myTransformState";
     private final List<KeyValue<Integer, Integer>> results = new ArrayList<>();
-    private final ForeachAction<Integer, Integer> action = new ForeachAction<Integer, Integer>() {
-        @Override
-        public void apply(final Integer key, final Integer value) {
-            results.add(KeyValue.pair(key, value));
-        }
-    };
+    private final ForeachAction<Integer, Integer> action = (key, value) -> results.add(KeyValue.pair(key, value));
     private KStream<Integer, Integer> stream;
 
     @Before
-    public void before() throws InterruptedException {
+    public void before() {
         builder = new StreamsBuilder();
         final StoreBuilder<KeyValueStore<Integer, Integer>> keyValueStoreBuilder =
                 Stores.keyValueStoreBuilder(Stores.persistentKeyValueStore(stateStoreName),
@@ -80,15 +77,52 @@ public class KStreamTransformIntegrationTest {
             driver.pipeInput(recordFactory.create(topic, Arrays.asList(new KeyValue<>(1, 1),
                                                                        new KeyValue<>(2, 2),
                                                                        new KeyValue<>(3, 3),
-                                                                       new KeyValue<>(1, 4),
-                                                                       new KeyValue<>(2, 5),
-                                                                       new KeyValue<>(3, 6))));
+                                                                       new KeyValue<>(2, 1),
+                                                                       new KeyValue<>(2, 3),
+                                                                       new KeyValue<>(1, 3))));
         }
         assertThat(results, equalTo(expected));
     }
 
     @Test
-    public void shouldFlatTransform() throws Exception {
+    public void shouldTransform() {
+        stream
+            .transform(() -> new Transformer<Integer, Integer, KeyValue<Integer, Integer>>() {
+                private KeyValueStore<Integer, Integer> state;
+
+                @SuppressWarnings("unchecked")
+                @Override
+                public void init(final ProcessorContext context) {
+                    state = (KeyValueStore<Integer, Integer>) context.getStateStore(stateStoreName);
+                }
+
+                @Override
+                public KeyValue<Integer, Integer> transform(final Integer key, final Integer value) {
+                    state.putIfAbsent(key, 0);
+                    Integer storedValue = state.get(key);
+                    final KeyValue<Integer, Integer> result = new KeyValue<>(key + 1, value + storedValue++);
+                    state.put(key, storedValue);
+                    return result;
+                }
+
+                @Override
+                public void close() {
+                }
+            }, "myTransformState")
+            .foreach(action);
+
+        final List<KeyValue<Integer, Integer>> expected = Arrays.asList(
+            KeyValue.pair(2, 1),
+            KeyValue.pair(3, 2),
+            KeyValue.pair(4, 3),
+            KeyValue.pair(3, 2),
+            KeyValue.pair(3, 5),
+            KeyValue.pair(2, 4));
+        verifyResult(expected);
+    }
+
+    @Test
+    public void shouldFlatTransform() {
         stream
             .flatTransform(() -> new Transformer<Integer, Integer, Iterable<KeyValue<Integer, Integer>>>() {
                 private KeyValueStore<Integer, Integer> state;
@@ -103,12 +137,11 @@ public class KStreamTransformIntegrationTest {
                 public Iterable<KeyValue<Integer, Integer>> transform(final Integer key, final Integer value) {
                     final List<KeyValue<Integer, Integer>> result = new ArrayList<>();
                     state.putIfAbsent(key, 0);
-                    final Integer storedValue = state.get(key);
-                    int outputValue = storedValue.intValue();
+                    Integer storedValue = state.get(key);
                     for (int i = 0; i < 3; i++) {
-                        result.add(new KeyValue<Integer, Integer>(key + i, value + outputValue++));
+                        result.add(new KeyValue<>(key + i, value + storedValue++));
                     }
-                    state.put(key, new Integer(outputValue));
+                    state.put(key, storedValue);
                     return result;
                 }
 
@@ -128,37 +161,160 @@ public class KStreamTransformIntegrationTest {
             KeyValue.pair(3, 3),
             KeyValue.pair(4, 4),
             KeyValue.pair(5, 5),
-            KeyValue.pair(1, 7),
-            KeyValue.pair(2, 8),
-            KeyValue.pair(3, 9),
-            KeyValue.pair(2, 8),
-            KeyValue.pair(3, 9),
-            KeyValue.pair(4, 10),
-            KeyValue.pair(3, 9),
-            KeyValue.pair(4, 10),
-            KeyValue.pair(5, 11));
+            KeyValue.pair(2, 4),
+            KeyValue.pair(3, 5),
+            KeyValue.pair(4, 6),
+            KeyValue.pair(2, 9),
+            KeyValue.pair(3, 10),
+            KeyValue.pair(4, 11),
+            KeyValue.pair(1, 6),
+            KeyValue.pair(2, 7),
+            KeyValue.pair(3, 8));
         verifyResult(expected);
     }
 
     @Test
-    public void shouldTransform() throws Exception {
+    public void shouldTransformValuesWithValueTransformerWithKey() {
         stream
-            .transform(() -> new Transformer<Integer, Integer, KeyValue<Integer, Integer>>() {
+            .transformValues(() -> new ValueTransformerWithKey<Integer, Integer, Integer>() {
                 private KeyValueStore<Integer, Integer> state;
 
-                @SuppressWarnings("unchecked")
                 @Override
                 public void init(final ProcessorContext context) {
-                    state = (KeyValueStore<Integer, Integer>) context.getStateStore(stateStoreName);
+                    state = (KeyValueStore<Integer, Integer>) context.getStateStore("myTransformState");
                 }
 
                 @Override
-                public KeyValue<Integer, Integer> transform(final Integer key, final Integer value) {
+                public Integer transform(final Integer key, final Integer value) {
+                    state.putIfAbsent(key, 0);
+                    Integer storedValue = state.get(key);
+                    final Integer result = value + storedValue++;
+                    state.put(key, storedValue);
+                    return result;
+                }
+
+                @Override
+                public void close() {
+                }
+            }, "myTransformState")
+            .foreach(action);
+
+        final List<KeyValue<Integer, Integer>> expected = Arrays.asList(
+            KeyValue.pair(1, 1),
+            KeyValue.pair(2, 2),
+            KeyValue.pair(3, 3),
+            KeyValue.pair(2, 2),
+            KeyValue.pair(2, 5),
+            KeyValue.pair(1, 4));
+        verifyResult(expected);
+    }
+
+    @Test
+    public void shouldTransformValuesWithValueTransformerWithoutKey() {
+        stream
+            .transformValues(() -> new ValueTransformer<Integer, Integer>() {
+                private KeyValueStore<Integer, Integer> state;
+
+                @Override
+                public void init(final ProcessorContext context) {
+                    state = (KeyValueStore<Integer, Integer>) context.getStateStore("myTransformState");
+                }
+
+                @Override
+                public Integer transform(final Integer value) {
+                    state.putIfAbsent(value, 0);
+                    Integer counter = state.get(value);
+                    state.put(value, ++counter);
+                    return counter;
+                }
+
+                @Override
+                public void close() {
+                }
+            }, "myTransformState")
+            .foreach(action);
+
+        final List<KeyValue<Integer, Integer>> expected = Arrays.asList(
+            KeyValue.pair(1, 1),
+            KeyValue.pair(2, 1),
+            KeyValue.pair(3, 1),
+            KeyValue.pair(2, 2),
+            KeyValue.pair(2, 2),
+            KeyValue.pair(1, 3));
+        verifyResult(expected);
+    }
+
+    @Test
+    public void shouldFlatTransformValuesWithKey() {
+        stream
+            .flatTransformValues(() -> new ValueTransformerWithKey<Integer, Integer, Iterable<Integer>>() {
+                private KeyValueStore<Integer, Integer> state;
+
+                @Override
+                public void init(final ProcessorContext context) {
+                    state = (KeyValueStore<Integer, Integer>) context.getStateStore("myTransformState");
+                }
+
+                @Override
+                public Iterable<Integer> transform(final Integer key, final Integer value) {
+                    final List<Integer> result = new ArrayList<>();
                     state.putIfAbsent(key, 0);
-                    final Integer storedValue = state.get(key);
-                    int outputValue = storedValue.intValue();
-                    final KeyValue<Integer, Integer> result = new KeyValue<>(key + 1, value + outputValue++);
-                    state.put(key, outputValue);
+                    Integer storedValue = state.get(key);
+                    for (int i = 0; i < 3; i++) {
+                        result.add(value + storedValue++);
+                    }
+                    state.put(key, storedValue);
+                    return result;
+                }
+
+                @Override
+                public void close() {
+                }
+            }, "myTransformState")
+            .foreach(action);
+
+        final List<KeyValue<Integer, Integer>> expected = Arrays.asList(
+            KeyValue.pair(1, 1),
+            KeyValue.pair(1, 2),
+            KeyValue.pair(1, 3),
+            KeyValue.pair(2, 2),
+            KeyValue.pair(2, 3),
+            KeyValue.pair(2, 4),
+            KeyValue.pair(3, 3),
+            KeyValue.pair(3, 4),
+            KeyValue.pair(3, 5),
+            KeyValue.pair(2, 4),
+            KeyValue.pair(2, 5),
+            KeyValue.pair(2, 6),
+            KeyValue.pair(2, 9),
+            KeyValue.pair(2, 10),
+            KeyValue.pair(2, 11),
+            KeyValue.pair(1, 6),
+            KeyValue.pair(1, 7),
+            KeyValue.pair(1, 8));
+        verifyResult(expected);
+    }
+
+    @Test
+    public void shouldFlatTransformValuesWithValueTransformerWithoutKey() {
+        stream
+            .flatTransformValues(() -> new ValueTransformer<Integer, Iterable<Integer>>() {
+                private KeyValueStore<Integer, Integer> state;
+
+                @Override
+                public void init(final ProcessorContext context) {
+                    state = (KeyValueStore<Integer, Integer>) context.getStateStore("myTransformState");
+                }
+
+                @Override
+                public Iterable<Integer> transform(final Integer value) {
+                    final List<Integer> result = new ArrayList<>();
+                    state.putIfAbsent(value, 0);
+                    Integer counter = state.get(value);
+                    for (int i = 0; i < 3; i++) {
+                        result.add(++counter);
+                    }
+                    state.put(value, counter);
                     return result;
                 }
 
@@ -169,13 +325,24 @@ public class KStreamTransformIntegrationTest {
             .foreach(action);
 
         final List<KeyValue<Integer, Integer>> expected = Arrays.asList(
+            KeyValue.pair(1, 1),
+            KeyValue.pair(1, 2),
+            KeyValue.pair(1, 3),
             KeyValue.pair(2, 1),
+            KeyValue.pair(2, 2),
+            KeyValue.pair(2, 3),
+            KeyValue.pair(3, 1),
             KeyValue.pair(3, 2),
-            KeyValue.pair(4, 3),
+            KeyValue.pair(3, 3),
+            KeyValue.pair(2, 4),
+            KeyValue.pair(2, 5),
+            KeyValue.pair(2, 6),
+            KeyValue.pair(2, 4),
             KeyValue.pair(2, 5),
-            KeyValue.pair(3, 6),
-            KeyValue.pair(4, 7));
+            KeyValue.pair(2, 6),
+            KeyValue.pair(1, 7),
+            KeyValue.pair(1, 8),
+            KeyValue.pair(1, 9));
         verifyResult(expected);
     }
-
 }
diff --git a/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KStreamFlatTransformValuesTest.java b/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KStreamFlatTransformValuesTest.java
new file mode 100644
index 0000000..36167c0
--- /dev/null
+++ b/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KStreamFlatTransformValuesTest.java
@@ -0,0 +1,135 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.streams.kstream.internals;
+
+import static org.junit.Assert.assertTrue;
+
+import java.util.Arrays;
+import java.util.Collections;
+
+import org.apache.kafka.streams.kstream.ValueTransformerWithKey;
+import org.apache.kafka.streams.kstream.ValueTransformerWithKeySupplier;
+import org.apache.kafka.streams.kstream.internals.KStreamFlatTransformValues.KStreamFlatTransformValuesProcessor;
+import org.apache.kafka.streams.processor.Processor;
+import org.apache.kafka.streams.processor.ProcessorContext;
+import org.apache.kafka.streams.processor.internals.ForwardingDisabledProcessorContext;
+import org.easymock.EasyMock;
+import org.easymock.EasyMockSupport;
+import org.junit.Before;
+import org.junit.Test;
+
+public class KStreamFlatTransformValuesTest extends EasyMockSupport {
+
+    private Integer inputKey;
+    private Integer inputValue;
+
+    private ValueTransformerWithKey<Integer, Integer, Iterable<String>> valueTransformer;
+    private ProcessorContext context;
+
+    private KStreamFlatTransformValuesProcessor<Integer, Integer, String> processor;
+
+    @Before
+    public void setUp() {
+        inputKey = 1;
+        inputValue = 10;
+        valueTransformer = mock(ValueTransformerWithKey.class);
+        context = strictMock(ProcessorContext.class);
+        processor = new KStreamFlatTransformValuesProcessor<>(valueTransformer);
+    }
+
+    @Test
+    public void shouldInitializeFlatTransformValuesProcessor() {
+        valueTransformer.init(EasyMock.isA(ForwardingDisabledProcessorContext.class));
+        replayAll();
+
+        processor.init(context);
+
+        verifyAll();
+    }
+
+    @Test
+    public void shouldTransformInputRecordToMultipleOutputValues() {
+        final Iterable<String> outputValues = Arrays.asList(
+                "Hello",
+                "Blue",
+                "Planet");
+        processor.init(context);
+        EasyMock.reset(valueTransformer);
+
+        EasyMock.expect(valueTransformer.transform(inputKey, inputValue)).andReturn(outputValues);
+        for (final String outputValue : outputValues) {
+            context.forward(inputKey, outputValue);
+        }
+        replayAll();
+
+        processor.process(inputKey, inputValue);
+
+        verifyAll();
+    }
+
+    @Test
+    public void shouldEmitNoRecordIfTransformReturnsEmptyList() {
+        processor.init(context);
+        EasyMock.reset(valueTransformer);
+
+        EasyMock.expect(valueTransformer.transform(inputKey, inputValue)).andReturn(Collections.<String>emptyList());
+        replayAll();
+
+        processor.process(inputKey, inputValue);
+
+        verifyAll();
+    }
+
+    @Test
+    public void shouldEmitNoRecordIfTransformReturnsNull() {
+        processor.init(context);
+        EasyMock.reset(valueTransformer);
+
+        EasyMock.expect(valueTransformer.transform(inputKey, inputValue)).andReturn(null);
+        replayAll();
+
+        processor.process(inputKey, inputValue);
+
+        verifyAll();
+    }
+
+    @Test
+    public void shouldCloseFlatTransformValuesProcessor() {
+        valueTransformer.close();
+        replayAll();
+
+        processor.close();
+
+        verifyAll();
+    }
+
+    @Test
+    public void shouldGetFlatTransformValuesProcessor() {
+        final ValueTransformerWithKeySupplier<Integer, Integer, Iterable<String>> valueTransformerSupplier =
+            mock(ValueTransformerWithKeySupplier.class);
+        final KStreamFlatTransformValues<Integer, Integer, String> processorSupplier =
+            new KStreamFlatTransformValues<>(valueTransformerSupplier);
+
+        EasyMock.expect(valueTransformerSupplier.get()).andReturn(valueTransformer);
+        replayAll();
+
+        final Processor<Integer, Integer> processor = processorSupplier.get();
+
+        verifyAll();
+        assertTrue(processor instanceof KStreamFlatTransformValuesProcessor);
+    }
+}
diff --git a/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KStreamImplTest.java b/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KStreamImplTest.java
index 3b450b1..5a1b579 100644
--- a/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KStreamImplTest.java
+++ b/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KStreamImplTest.java
@@ -486,25 +486,43 @@ public class KStreamImplTest {
     }
 
     @Test
-    public void shouldNotAllowNullTransformSupplierOnTransform() {
+    public void shouldNotAllowNullTransformerSupplierOnTransform() {
         final Exception e = assertThrows(NullPointerException.class, () -> testStream.transform(null));
         assertEquals("transformerSupplier can't be null", e.getMessage());
     }
 
     @Test
-    public void shouldNotAllowNullTransformSupplierOnFlatTransform() {
+    public void shouldNotAllowNullTransformerSupplierOnFlatTransform() {
         final Exception e = assertThrows(NullPointerException.class, () -> testStream.flatTransform(null));
         assertEquals("transformerSupplier can't be null", e.getMessage());
     }
 
-    @Test(expected = NullPointerException.class)
-    public void shouldNotAllowNullTransformSupplierOnTransformValues() {
-        testStream.transformValues((ValueTransformerSupplier) null);
+    @Test
+    public void shouldNotAllowNullValueTransformerWithKeySupplierOnTransformValues() {
+        final Exception e =
+            assertThrows(NullPointerException.class, () -> testStream.transformValues((ValueTransformerWithKeySupplier) null));
+        assertEquals("valueTransformerSupplier can't be null", e.getMessage());
     }
 
-    @Test(expected = NullPointerException.class)
-    public void shouldNotAllowNullTransformSupplierOnTransformValuesWithKey() {
-        testStream.transformValues((ValueTransformerWithKeySupplier) null);
+    @Test
+    public void shouldNotAllowNullValueTransformerSupplierOnTransformValues() {
+        final Exception e =
+            assertThrows(NullPointerException.class, () -> testStream.transformValues((ValueTransformerSupplier) null));
+        assertEquals("valueTransformerSupplier can't be null", e.getMessage());
+    }
+
+    @Test
+    public void shouldNotAllowNullValueTransformerWithKeySupplierOnFlatTransformValues() {
+        final Exception e =
+            assertThrows(NullPointerException.class, () -> testStream.flatTransformValues((ValueTransformerWithKeySupplier) null));
+        assertEquals("valueTransformerSupplier can't be null", e.getMessage());
+    }
+
+    @Test
+    public void shouldNotAllowNullValueTransformerSupplierOnFlatTransformValues() {
+        final Exception e =
+            assertThrows(NullPointerException.class, () -> testStream.flatTransformValues((ValueTransformerSupplier) null));
+        assertEquals("valueTransformerSupplier can't be null", e.getMessage());
     }
 
     @Test(expected = NullPointerException.class)