You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@kafka.apache.org by bb...@apache.org on 2019/05/31 13:30:02 UTC

[kafka] branch trunk updated: KAFKA-6958: Overload KStream methods to allow to name operation name using the new Named class (#6411)

This is an automated email from the ASF dual-hosted git repository.

bbejeck pushed a commit to branch trunk
in repository https://gitbox.apache.org/repos/asf/kafka.git


The following commit(s) were added to refs/heads/trunk by this push:
     new 78c55c8  KAFKA-6958: Overload KStream methods to allow to name operation name using the new Named class (#6411)
78c55c8 is described below

commit 78c55c8d66f5570d975caa53a9751b126ca10538
Author: Florian Hussonnois <fl...@gmail.com>
AuthorDate: Fri May 31 15:29:43 2019 +0200

    KAFKA-6958: Overload KStream methods to allow to name operation name using the new Named class (#6411)
    
    Sub-task required to allow to define custom processor names with KStreams DSL(KIP-307) :
     - overload methods for stateless operations to accept a Named parameter (filter, filterNot, map, mapValues, foreach, peek, branch, transform, transformValue, flatTransform)
     - overload process method to accept a Named parameter
     - overload join/leftJoin/outerJoin methods
    
    Reviewers: John Roesler <jo...@confluent.io>, Boyang Chen <bo...@confluent.io>,
    Bill Bejeck <bb...@gmail.com>
---
 .../org/apache/kafka/streams/kstream/KStream.java  | 1229 ++++++++++++++++++--
 .../streams/kstream/internals/KStreamImpl.java     |  374 ++++--
 .../streams/kstream/internals/NamedInternal.java   |   19 +-
 .../apache/kafka/streams/StreamsBuilderTest.java   |  245 +++-
 .../kstream/RepartitionTopicNamingTest.java        |   66 +-
 5 files changed, 1694 insertions(+), 239 deletions(-)

diff --git a/streams/src/main/java/org/apache/kafka/streams/kstream/KStream.java b/streams/src/main/java/org/apache/kafka/streams/kstream/KStream.java
index cd64f75..8313add 100644
--- a/streams/src/main/java/org/apache/kafka/streams/kstream/KStream.java
+++ b/streams/src/main/java/org/apache/kafka/streams/kstream/KStream.java
@@ -67,6 +67,19 @@ public interface KStream<K, V> {
     KStream<K, V> filter(final Predicate<? super K, ? super V> predicate);
 
     /**
+     * Create a new {@code KStream} that consists of all records of this stream which satisfy the given predicate.
+     * All records that do not satisfy the predicate are dropped.
+     * This is a stateless record-by-record operation.
+     *
+     * @param predicate a filter {@link Predicate} that is applied to each record
+     * @param named     a {@link Named} config used to name the processor in the topology
+     * @return a {@code KStream} that contains only those records that satisfy the given predicate
+     * @see #filterNot(Predicate)
+     */
+    KStream<K, V> filter(final Predicate<? super K, ? super V> predicate, final Named named);
+
+
+    /**
      * Create a new {@code KStream} that consists all records of this stream which do <em>not</em> satisfy the given
      * predicate.
      * All records that <em>do</em> satisfy the predicate are dropped.
@@ -79,6 +92,19 @@ public interface KStream<K, V> {
     KStream<K, V> filterNot(final Predicate<? super K, ? super V> predicate);
 
     /**
+     * Create a new {@code KStream} that consists all records of this stream which do <em>not</em> satisfy the given
+     * predicate.
+     * All records that <em>do</em> satisfy the predicate are dropped.
+     * This is a stateless record-by-record operation.
+     *
+     * @param predicate a filter {@link Predicate} that is applied to each record
+     * @param named     a {@link Named} config used to name the processor in the topology
+     * @return a {@code KStream} that contains only those records that do <em>not</em> satisfy the given predicate
+     * @see #filter(Predicate)
+     */
+    KStream<K, V> filterNot(final Predicate<? super K, ? super V> predicate, final Named named);
+
+    /**
      * Set a new key (with possibly new type) for each input record.
      * The provided {@link KeyValueMapper} is applied to each input record and computes a new key for it.
      * Thus, an input record {@code <K,V>} can be transformed into an output record {@code <K':V>}.
@@ -111,6 +137,40 @@ public interface KStream<K, V> {
     <KR> KStream<KR, V> selectKey(final KeyValueMapper<? super K, ? super V, ? extends KR> mapper);
 
     /**
+     * Set a new key (with possibly new type) for each input record.
+     * The provided {@link KeyValueMapper} is applied to each input record and computes a new key for it.
+     * Thus, an input record {@code <K,V>} can be transformed into an output record {@code <K':V>}.
+     * This is a stateless record-by-record operation.
+     * <p>
+     * For example, you can use this transformation to set a key for a key-less input record {@code <null,V>} by
+     * extracting a key from the value within your {@link KeyValueMapper}. The example below computes the new key as the
+     * length of the value string.
+     * <pre>{@code
+     * KStream<Byte[], String> keyLessStream = builder.stream("key-less-topic");
+     * KStream<Integer, String> keyedStream = keyLessStream.selectKey(new KeyValueMapper<Byte[], String, Integer> {
+     *     Integer apply(Byte[] key, String value) {
+     *         return value.length();
+     *     }
+     * });
+     * }</pre>
+     * Setting a new key might result in an internal data redistribution if a key based operator (like an aggregation or
+     * join) is applied to the result {@code KStream}.
+     *
+     * @param mapper a {@link KeyValueMapper} that computes a new key for each record
+     * @param named  a {@link Named} config used to name the processor in the topology
+     * @param <KR>   the new key type of the result stream
+     * @return a {@code KStream} that contains records with new key (possibly of different type) and unmodified value
+     * @see #map(KeyValueMapper)
+     * @see #flatMap(KeyValueMapper)
+     * @see #mapValues(ValueMapper)
+     * @see #mapValues(ValueMapperWithKey)
+     * @see #flatMapValues(ValueMapper)
+     * @see #flatMapValues(ValueMapperWithKey)
+     */
+    <KR> KStream<KR, V> selectKey(final KeyValueMapper<? super K, ? super V, ? extends KR> mapper,
+                                  final Named named);
+
+    /**
      * Transform each record of the input stream into a new record in the output stream (both key and value type can be
      * altered arbitrarily).
      * The provided {@link KeyValueMapper} is applied to each input record and computes a new output record.
@@ -149,6 +209,46 @@ public interface KStream<K, V> {
     <KR, VR> KStream<KR, VR> map(final KeyValueMapper<? super K, ? super V, ? extends KeyValue<? extends KR, ? extends VR>> mapper);
 
     /**
+     * Transform each record of the input stream into a new record in the output stream (both key and value type can be
+     * altered arbitrarily).
+     * The provided {@link KeyValueMapper} is applied to each input record and computes a new output record.
+     * Thus, an input record {@code <K,V>} can be transformed into an output record {@code <K':V'>}.
+     * This is a stateless record-by-record operation (cf. {@link #transform(TransformerSupplier, String...)} for
+     * stateful record transformation).
+     * <p>
+     * The example below normalizes the String key to upper-case letters and counts the number of token of the value string.
+     * <pre>{@code
+     * KStream<String, String> inputStream = builder.stream("topic");
+     * KStream<String, Integer> outputStream = inputStream.map(new KeyValueMapper<String, String, KeyValue<String, Integer>> {
+     *     KeyValue<String, Integer> apply(String key, String value) {
+     *         return new KeyValue<>(key.toUpperCase(), value.split(" ").length);
+     *     }
+     * });
+     * }</pre>
+     * The provided {@link KeyValueMapper} must return a {@link KeyValue} type and must not return {@code null}.
+     * <p>
+     * Mapping records might result in an internal data redistribution if a key based operator (like an aggregation or
+     * join) is applied to the result {@code KStream}. (cf. {@link #mapValues(ValueMapper)})
+     *
+     * @param mapper a {@link KeyValueMapper} that computes a new output record
+     * @param named  a {@link Named} config used to name the processor in the topology
+     * @param <KR>   the key type of the result stream
+     * @param <VR>   the value type of the result stream
+     * @return a {@code KStream} that contains records with new key and value (possibly both of different type)
+     * @see #selectKey(KeyValueMapper)
+     * @see #flatMap(KeyValueMapper)
+     * @see #mapValues(ValueMapper)
+     * @see #mapValues(ValueMapperWithKey)
+     * @see #flatMapValues(ValueMapper)
+     * @see #flatMapValues(ValueMapperWithKey)
+     * @see #transform(TransformerSupplier, String...)
+     * @see #transformValues(ValueTransformerSupplier, String...)
+     * @see #transformValues(ValueTransformerWithKeySupplier, String...)
+     */
+    <KR, VR> KStream<KR, VR> map(final KeyValueMapper<? super K, ? super V, ? extends KeyValue<? extends KR, ? extends VR>> mapper,
+                                 final Named named);
+
+    /**
      * Transform the value of each input record into a new value (with possible new type) of the output record.
      * The provided {@link ValueMapper} is applied to each input record value and computes a new value for it.
      * Thus, an input record {@code <K,V>} can be transformed into an output record {@code <K:V'>}.
@@ -182,6 +282,43 @@ public interface KStream<K, V> {
      */
     <VR> KStream<K, VR> mapValues(final ValueMapper<? super V, ? extends VR> mapper);
 
+
+    /**
+     * Transform the value of each input record into a new value (with possible new type) of the output record.
+     * The provided {@link ValueMapper} is applied to each input record value and computes a new value for it.
+     * Thus, an input record {@code <K,V>} can be transformed into an output record {@code <K:V'>}.
+     * This is a stateless record-by-record operation (cf.
+     * {@link #transformValues(ValueTransformerSupplier, String...)} for stateful value transformation).
+     * <p>
+     * The example below counts the number of token of the value string.
+     * <pre>{@code
+     * KStream<String, String> inputStream = builder.stream("topic");
+     * KStream<String, Integer> outputStream = inputStream.mapValues(new ValueMapper<String, Integer> {
+     *     Integer apply(String value) {
+     *         return value.split(" ").length;
+     *     }
+     * });
+     * }</pre>
+     * Setting a new value preserves data co-location with respect to the key.
+     * Thus, <em>no</em> internal data redistribution is required if a key based operator (like an aggregation or join)
+     * is applied to the result {@code KStream}. (cf. {@link #map(KeyValueMapper)})
+     *
+     * @param mapper a {@link ValueMapper} that computes a new output value
+     * @param named  a {@link Named} config used to name the processor in the topology
+     * @param <VR>   the value type of the result stream
+     * @return a {@code KStream} that contains records with unmodified key and new values (possibly of different type)
+     * @see #selectKey(KeyValueMapper)
+     * @see #map(KeyValueMapper)
+     * @see #flatMap(KeyValueMapper)
+     * @see #flatMapValues(ValueMapper)
+     * @see #flatMapValues(ValueMapperWithKey)
+     * @see #transform(TransformerSupplier, String...)
+     * @see #transformValues(ValueTransformerSupplier, String...)
+     * @see #transformValues(ValueTransformerWithKeySupplier, String...)
+     */
+    <VR> KStream<K, VR> mapValues(final ValueMapper<? super V, ? extends VR> mapper,
+                                  final Named named);
+
     /**
      * Transform the value of each input record into a new value (with possible new type) of the output record.
      * The provided {@link ValueMapperWithKey} is applied to each input record value and computes a new value for it.
@@ -218,6 +355,43 @@ public interface KStream<K, V> {
     <VR> KStream<K, VR> mapValues(final ValueMapperWithKey<? super K, ? super V, ? extends VR> mapper);
 
     /**
+     * Transform the value of each input record into a new value (with possible new type) of the output record.
+     * The provided {@link ValueMapperWithKey} is applied to each input record value and computes a new value for it.
+     * Thus, an input record {@code <K,V>} can be transformed into an output record {@code <K:V'>}.
+     * This is a stateless record-by-record operation (cf.
+     * {@link #transformValues(ValueTransformerWithKeySupplier, String...)} for stateful value transformation).
+     * <p>
+     * The example below counts the number of tokens of key and value strings.
+     * <pre>{@code
+     * KStream<String, String> inputStream = builder.stream("topic");
+     * KStream<String, Integer> outputStream = inputStream.mapValues(new ValueMapperWithKey<String, String, Integer> {
+     *     Integer apply(String readOnlyKey, String value) {
+     *         return readOnlyKey.split(" ").length + value.split(" ").length;
+     *     }
+     * });
+     * }</pre>
+     * Note that the key is read-only and should not be modified, as this can lead to corrupt partitioning.
+     * So, setting a new value preserves data co-location with respect to the key.
+     * Thus, <em>no</em> internal data redistribution is required if a key based operator (like an aggregation or join)
+     * is applied to the result {@code KStream}. (cf. {@link #map(KeyValueMapper)})
+     *
+     * @param mapper a {@link ValueMapperWithKey} that computes a new output value
+     * @param named  a {@link Named} config used to name the processor in the topology
+     * @param <VR>   the value type of the result stream
+     * @return a {@code KStream} that contains records with unmodified key and new values (possibly of different type)
+     * @see #selectKey(KeyValueMapper)
+     * @see #map(KeyValueMapper)
+     * @see #flatMap(KeyValueMapper)
+     * @see #flatMapValues(ValueMapper)
+     * @see #flatMapValues(ValueMapperWithKey)
+     * @see #transform(TransformerSupplier, String...)
+     * @see #transformValues(ValueTransformerSupplier, String...)
+     * @see #transformValues(ValueTransformerWithKeySupplier, String...)
+     */
+    <VR> KStream<K, VR> mapValues(final ValueMapperWithKey<? super K, ? super V, ? extends VR> mapper,
+                                  final Named named);
+
+    /**
      * Transform each record of the input stream into zero or more records in the output stream (both key and value type
      * can be altered arbitrarily).
      * The provided {@link KeyValueMapper} is applied to each input record and computes zero or more output records.
@@ -269,6 +443,59 @@ public interface KStream<K, V> {
     <KR, VR> KStream<KR, VR> flatMap(final KeyValueMapper<? super K, ? super V, ? extends Iterable<? extends KeyValue<? extends KR, ? extends VR>>> mapper);
 
     /**
+     * Transform each record of the input stream into zero or more records in the output stream (both key and value type
+     * can be altered arbitrarily).
+     * The provided {@link KeyValueMapper} is applied to each input record and computes zero or more output records.
+     * Thus, an input record {@code <K,V>} can be transformed into output records {@code <K':V'>, <K'':V''>, ...}.
+     * This is a stateless record-by-record operation (cf. {@link #transform(TransformerSupplier, String...)} for
+     * stateful record transformation).
+     * <p>
+     * The example below splits input records {@code <null:String>} containing sentences as values into their words
+     * and emit a record {@code <word:1>} for each word.
+     * <pre>{@code
+     * KStream<byte[], String> inputStream = builder.stream("topic");
+     * KStream<String, Integer> outputStream = inputStream.flatMap(
+     *     new KeyValueMapper<byte[], String, Iterable<KeyValue<String, Integer>>> {
+     *         Iterable<KeyValue<String, Integer>> apply(byte[] key, String value) {
+     *             String[] tokens = value.split(" ");
+     *             List<KeyValue<String, Integer>> result = new ArrayList<>(tokens.length);
+     *
+     *             for(String token : tokens) {
+     *                 result.add(new KeyValue<>(token, 1));
+     *             }
+     *
+     *             return result;
+     *         }
+     *     });
+     * }</pre>
+     * The provided {@link KeyValueMapper} must return an {@link Iterable} (e.g., any {@link java.util.Collection} type)
+     * and the return value must not be {@code null}.
+     * <p>
+     * Flat-mapping records might result in an internal data redistribution if a key based operator (like an aggregation
+     * or join) is applied to the result {@code KStream}. (cf. {@link #flatMapValues(ValueMapper)})
+     *
+     * @param mapper a {@link KeyValueMapper} that computes the new output records
+     * @param named  a {@link Named} config used to name the processor in the topology
+     * @param <KR>   the key type of the result stream
+     * @param <VR>   the value type of the result stream
+     * @return a {@code KStream} that contains more or less records with new key and value (possibly of different type)
+     * @see #selectKey(KeyValueMapper)
+     * @see #map(KeyValueMapper)
+     * @see #mapValues(ValueMapper)
+     * @see #mapValues(ValueMapperWithKey)
+     * @see #flatMapValues(ValueMapper)
+     * @see #flatMapValues(ValueMapperWithKey)
+     * @see #transform(TransformerSupplier, String...)
+     * @see #flatTransform(TransformerSupplier, String...)
+     * @see #transformValues(ValueTransformerSupplier, String...)
+     * @see #transformValues(ValueTransformerWithKeySupplier, String...)
+     * @see #flatTransformValues(ValueTransformerSupplier, String...)
+     * @see #flatTransformValues(ValueTransformerWithKeySupplier, String...)
+     */
+    <KR, VR> KStream<KR, VR> flatMap(final KeyValueMapper<? super K, ? super V, ? extends Iterable<? extends KeyValue<? extends KR, ? extends VR>>> mapper,
+                                     final Named named);
+
+    /**
      * Create a new {@code KStream} by transforming the value of each record in this stream into zero or more values
      * with the same key in the new stream.
      * Transform the value of each input record into zero or more records with the same (unmodified) key in the output
@@ -316,6 +543,50 @@ public interface KStream<K, V> {
      * with the same key in the new stream.
      * Transform the value of each input record into zero or more records with the same (unmodified) key in the output
      * stream (value type can be altered arbitrarily).
+     * The provided {@link ValueMapper} is applied to each input record and computes zero or more output values.
+     * Thus, an input record {@code <K,V>} can be transformed into output records {@code <K:V'>, <K:V''>, ...}.
+     * This is a stateless record-by-record operation (cf. {@link #transformValues(ValueTransformerSupplier, String...)}
+     * for stateful value transformation).
+     * <p>
+     * The example below splits input records {@code <null:String>} containing sentences as values into their words.
+     * <pre>{@code
+     * KStream<byte[], String> inputStream = builder.stream("topic");
+     * KStream<byte[], String> outputStream = inputStream.flatMapValues(new ValueMapper<String, Iterable<String>> {
+     *     Iterable<String> apply(String value) {
+     *         return Arrays.asList(value.split(" "));
+     *     }
+     * });
+     * }</pre>
+     * The provided {@link ValueMapper} must return an {@link Iterable} (e.g., any {@link java.util.Collection} type)
+     * and the return value must not be {@code null}.
+     * <p>
+     * Splitting a record into multiple records with the same key preserves data co-location with respect to the key.
+     * Thus, <em>no</em> internal data redistribution is required if a key based operator (like an aggregation or join)
+     * is applied to the result {@code KStream}. (cf. {@link #flatMap(KeyValueMapper)})
+     *
+     * @param mapper a {@link ValueMapper} the computes the new output values
+     * @param named  a {@link Named} config used to name the processor in the topology
+     * @param <VR>      the value type of the result stream
+     * @return a {@code KStream} that contains more or less records with unmodified keys and new values of different type
+     * @see #selectKey(KeyValueMapper)
+     * @see #map(KeyValueMapper)
+     * @see #flatMap(KeyValueMapper)
+     * @see #mapValues(ValueMapper)
+     * @see #mapValues(ValueMapperWithKey)
+     * @see #transform(TransformerSupplier, String...)
+     * @see #flatTransform(TransformerSupplier, String...)
+     * @see #transformValues(ValueTransformerSupplier, String...)
+     * @see #transformValues(ValueTransformerWithKeySupplier, String...)
+     * @see #flatTransformValues(ValueTransformerSupplier, String...)
+     * @see #flatTransformValues(ValueTransformerWithKeySupplier, String...)
+     */
+    <VR> KStream<K, VR> flatMapValues(final ValueMapper<? super V, ? extends Iterable<? extends VR>> mapper,
+                                      final Named named);
+    /**
+     * Create a new {@code KStream} by transforming the value of each record in this stream into zero or more values
+     * with the same key in the new stream.
+     * Transform the value of each input record into zero or more records with the same (unmodified) key in the output
+     * stream (value type can be altered arbitrarily).
      * The provided {@link ValueMapperWithKey} is applied to each input record and computes zero or more output values.
      * Thus, an input record {@code <K,V>} can be transformed into output records {@code <K:V'>, <K:V''>, ...}.
      * This is a stateless record-by-record operation (cf. {@link #transformValues(ValueTransformerWithKeySupplier, String...)}
@@ -361,13 +632,63 @@ public interface KStream<K, V> {
     <VR> KStream<K, VR> flatMapValues(final ValueMapperWithKey<? super K, ? super V, ? extends Iterable<? extends VR>> mapper);
 
     /**
-     * Print the records of this KStream using the options provided by {@link Printed}
-     * Note that this is mainly for debugging/testing purposes, and it will try to flush on each record print.
-     * It <em>SHOULD NOT</em> be used for production usage if performance requirements are concerned.
-     *
-     * @param printed options for printing
-     */
-    void print(final Printed<K, V> printed);
+     * Create a new {@code KStream} by transforming the value of each record in this stream into zero or more values
+     * with the same key in the new stream.
+     * Transform the value of each input record into zero or more records with the same (unmodified) key in the output
+     * stream (value type can be altered arbitrarily).
+     * The provided {@link ValueMapperWithKey} is applied to each input record and computes zero or more output values.
+     * Thus, an input record {@code <K,V>} can be transformed into output records {@code <K:V'>, <K:V''>, ...}.
+     * This is a stateless record-by-record operation (cf. {@link #transformValues(ValueTransformerWithKeySupplier, String...)}
+     * for stateful value transformation).
+     * <p>
+     * The example below splits input records {@code <Integer:String>}, with key=1, containing sentences as values
+     * into their words.
+     * <pre>{@code
+     * KStream<Integer, String> inputStream = builder.stream("topic");
+     * KStream<Integer, String> outputStream = inputStream.flatMapValues(new ValueMapper<Integer, String, Iterable<String>> {
+     *     Iterable<Integer, String> apply(Integer readOnlyKey, String value) {
+     *         if(readOnlyKey == 1) {
+     *             return Arrays.asList(value.split(" "));
+     *         } else {
+     *             return Arrays.asList(value);
+     *         }
+     *     }
+     * });
+     * }</pre>
+     * The provided {@link ValueMapperWithKey} must return an {@link Iterable} (e.g., any {@link java.util.Collection} type)
+     * and the return value must not be {@code null}.
+     * <p>
+     * Note that the key is read-only and should not be modified, as this can lead to corrupt partitioning.
+     * So, splitting a record into multiple records with the same key preserves data co-location with respect to the key.
+     * Thus, <em>no</em> internal data redistribution is required if a key based operator (like an aggregation or join)
+     * is applied to the result {@code KStream}. (cf. {@link #flatMap(KeyValueMapper)})
+     *
+     * @param mapper a {@link ValueMapperWithKey} the computes the new output values
+     * @param named  a {@link Named} config used to name the processor in the topology
+     * @param <VR>      the value type of the result stream
+     * @return a {@code KStream} that contains more or less records with unmodified keys and new values of different type
+     * @see #selectKey(KeyValueMapper)
+     * @see #map(KeyValueMapper)
+     * @see #flatMap(KeyValueMapper)
+     * @see #mapValues(ValueMapper)
+     * @see #mapValues(ValueMapperWithKey)
+     * @see #transform(TransformerSupplier, String...)
+     * @see #flatTransform(TransformerSupplier, String...)
+     * @see #transformValues(ValueTransformerSupplier, String...)
+     * @see #transformValues(ValueTransformerWithKeySupplier, String...)
+     * @see #flatTransformValues(ValueTransformerSupplier, String...)
+     * @see #flatTransformValues(ValueTransformerWithKeySupplier, String...)
+     */
+    <VR> KStream<K, VR> flatMapValues(final ValueMapperWithKey<? super K, ? super V, ? extends Iterable<? extends VR>> mapper,
+                                      final Named named);
+    /**
+     * Print the records of this KStream using the options provided by {@link Printed}
+     * Note that this is mainly for debugging/testing purposes, and it will try to flush on each record print.
+     * It <em>SHOULD NOT</em> be used for production usage if performance requirements are concerned.
+     *
+     * @param printed options for printing
+     */
+    void print(final Printed<K, V> printed);
 
     /**
      * Perform an action on each record of {@code KStream}.
@@ -382,6 +703,17 @@ public interface KStream<K, V> {
     /**
      * Perform an action on each record of {@code KStream}.
      * This is a stateless record-by-record operation (cf. {@link #process(ProcessorSupplier, String...)}).
+     * Note that this is a terminal operation that returns void.
+     *
+     * @param action an action to perform on each record
+     * @param named  a {@link Named} config used to name the processor in the topology
+     * @see #process(ProcessorSupplier, String...)
+     */
+    void foreach(final ForeachAction<? super K, ? super V> action, final Named named);
+
+    /**
+     * Perform an action on each record of {@code KStream}.
+     * This is a stateless record-by-record operation (cf. {@link #process(ProcessorSupplier, String...)}).
      * <p>
      * Peek is a non-terminal operation that triggers a side effect (such as logging or statistics collection)
      * and returns an unchanged stream.
@@ -395,6 +727,22 @@ public interface KStream<K, V> {
     KStream<K, V> peek(final ForeachAction<? super K, ? super V> action);
 
     /**
+     * Perform an action on each record of {@code KStream}.
+     * This is a stateless record-by-record operation (cf. {@link #process(ProcessorSupplier, String...)}).
+     * <p>
+     * Peek is a non-terminal operation that triggers a side effect (such as logging or statistics collection)
+     * and returns an unchanged stream.
+     * <p>
+     * Note that since this operation is stateless, it may execute multiple times for a single record in failure cases.
+     *
+     * @param action an action to perform on each record
+     * @param named  a {@link Named} config used to name the processor in the topology
+     * @see #process(ProcessorSupplier, String...)
+     * @return itself
+     */
+    KStream<K, V> peek(final ForeachAction<? super K, ? super V> action, final Named named);
+
+    /**
      * Creates an array of {@code KStream} from this stream by branching the records in the original stream based on
      * the supplied predicates.
      * Each record is evaluated against the supplied predicates, and predicates are evaluated in order.
@@ -411,6 +759,23 @@ public interface KStream<K, V> {
     KStream<K, V>[] branch(final Predicate<? super K, ? super V>... predicates);
 
     /**
+     * Creates an array of {@code KStream} from this stream by branching the records in the original stream based on
+     * the supplied predicates.
+     * Each record is evaluated against the supplied predicates, and predicates are evaluated in order.
+     * Each stream in the result array corresponds position-wise (index) to the predicate in the supplied predicates.
+     * The branching happens on first-match: A record in the original stream is assigned to the corresponding result
+     * stream for the first predicate that evaluates to true, and is assigned to this stream only.
+     * A record will be dropped if none of the predicates evaluate to true.
+     * This is a stateless record-by-record operation.
+     *
+     * @param named  a {@link Named} config used to name the processor in the topology
+     * @param predicates the ordered list of {@link Predicate} instances
+     * @return multiple distinct substreams of this {@code KStream}
+     */
+    @SuppressWarnings("unchecked")
+    KStream<K, V>[] branch(final Named named, final Predicate<? super K, ? super V>... predicates);
+
+    /**
      * Merge this stream and the given stream into one larger stream.
      * <p>
      * There is no ordering guarantee between records from this {@code KStream} and records from
@@ -424,6 +789,20 @@ public interface KStream<K, V> {
     KStream<K, V> merge(final KStream<K, V> stream);
 
     /**
+     * Merge this stream and the given stream into one larger stream.
+     * <p>
+     * There is no ordering guarantee between records from this {@code KStream} and records from
+     * the provided {@code KStream} in the merged stream.
+     * Relative order is preserved within each input stream though (ie, records within one input
+     * stream are processed in order).
+     *
+     * @param stream a stream which is to be merged into this stream
+     * @param named  a {@link Named} config used to name the processor in the topology
+     * @return a merged stream containing all records from this and the provided {@code KStream}
+     */
+    KStream<K, V> merge(final KStream<K, V> stream, final Named named);
+
+    /**
      * Materialize this stream to a topic and creates a new {@code KStream} from the topic using default serializers,
      * deserializers, and producer's {@link DefaultPartitioner}.
      * The specified topic should be manually created before it is used (i.e., before the Kafka Streams application is
@@ -590,6 +969,99 @@ public interface KStream<K, V> {
                                        final String... stateStoreNames);
 
     /**
+     * Transform each record of the input stream into zero or one record in the output stream (both key and value type
+     * can be altered arbitrarily).
+     * A {@link Transformer} (provided by the given {@link TransformerSupplier}) is applied to each input record and
+     * returns zero or one output record.
+     * Thus, an input record {@code <K,V>} can be transformed into an output record {@code <K':V'>}.
+     * This is a stateful record-by-record operation (cf. {@link #map(KeyValueMapper) map()}).
+     * Furthermore, via {@link org.apache.kafka.streams.processor.Punctuator#punctuate(long) Punctuator#punctuate()},
+     * the processing progress can be observed and additional periodic actions can be performed.
+     * <p>
+     * In order to assign a state, the state must be created and registered beforehand (it's not required to connect
+     * global state stores; read-only access to global state stores is available by default):
+     * <pre>{@code
+     * // create store
+     * StoreBuilder<KeyValueStore<String,String>> keyValueStoreBuilder =
+     *         Stores.keyValueStoreBuilder(Stores.persistentKeyValueStore("myTransformState"),
+     *                 Serdes.String(),
+     *                 Serdes.String());
+     * // register store
+     * builder.addStateStore(keyValueStoreBuilder);
+     *
+     * KStream outputStream = inputStream.transform(new TransformerSupplier() { ... }, "myTransformState");
+     * }</pre>
+     * Within the {@link Transformer}, the state is obtained via the {@link ProcessorContext}.
+     * To trigger periodic actions via {@link org.apache.kafka.streams.processor.Punctuator#punctuate(long) punctuate()},
+     * a schedule must be registered.
+     * The {@link Transformer} must return a {@link KeyValue} type in {@link Transformer#transform(Object, Object)
+     * transform()}.
+     * The return value of {@link Transformer#transform(Object, Object) Transformer#transform()} may be {@code null},
+     * in which case no record is emitted.
+     * <pre>{@code
+     * new TransformerSupplier() {
+     *     Transformer get() {
+     *         return new Transformer() {
+     *             private ProcessorContext context;
+     *             private StateStore state;
+     *
+     *             void init(ProcessorContext context) {
+     *                 this.context = context;
+     *                 this.state = context.getStateStore("myTransformState");
+     *                 // punctuate each second; can access this.state
+     *                 context.schedule(Duration.ofSeconds(1), PunctuationType.WALL_CLOCK_TIME, new Punctuator(..));
+     *             }
+     *
+     *             KeyValue transform(K key, V value) {
+     *                 // can access this.state
+     *                 return new KeyValue(key, value); // can emit a single value via return -- can also be null
+     *             }
+     *
+     *             void close() {
+     *                 // can access this.state
+     *             }
+     *         }
+     *     }
+     * }
+     * }</pre>
+     * Even if any upstream operation was key-changing, no auto-repartition is triggered.
+     * If repartitioning is required, a call to {@link #through(String) through()} should be performed before
+     * {@code transform()}.
+     * <p>
+     * Transforming records might result in an internal data redistribution if a key based operator (like an aggregation
+     * or join) is applied to the result {@code KStream}.
+     * (cf. {@link #transformValues(ValueTransformerSupplier, String...) transformValues()} )
+     * <p>
+     * Note that it is possible to emit multiple records for each input record by using
+     * {@link ProcessorContext#forward(Object, Object) context#forward()} in
+     * {@link Transformer#transform(Object, Object) Transformer#transform()} and
+     * {@link org.apache.kafka.streams.processor.Punctuator#punctuate(long) Punctuator#punctuate()}.
+     * Be aware that a mismatch between the types of the emitted records and the type of the stream would only be
+     * detected at runtime.
+     * To ensure type-safety at compile-time, {@link ProcessorContext#forward(Object, Object) context#forward()} should
+     * not be used in {@link Transformer#transform(Object, Object) Transformer#transform()} and
+     * {@link org.apache.kafka.streams.processor.Punctuator#punctuate(long) Punctuator#punctuate()}.
+     * If in {@link Transformer#transform(Object, Object) Transformer#transform()} multiple records need to be emitted
+     * for each input record, it is recommended to use {@link #flatTransform(TransformerSupplier, String...)
+     * flatTransform()}.
+     *
+     * @param transformerSupplier an instance of {@link TransformerSupplier} that generates a {@link Transformer}
+     * @param named               a {@link Named} config used to name the processor in the topology
+     * @param stateStoreNames     the names of the state stores used by the processor
+     * @param <K1>                the key type of the new stream
+     * @param <V1>                the value type of the new stream
+     * @return a {@code KStream} that contains more or less records with new key and value (possibly of different type)
+     * @see #map(KeyValueMapper)
+     * @see #flatTransform(TransformerSupplier, String...)
+     * @see #transformValues(ValueTransformerSupplier, String...)
+     * @see #transformValues(ValueTransformerWithKeySupplier, String...)
+     * @see #process(ProcessorSupplier, String...)
+     */
+    <K1, V1> KStream<K1, V1> transform(final TransformerSupplier<? super K, ? super V, KeyValue<K1, V1>> transformerSupplier,
+                                       final Named named,
+                                       final String... stateStoreNames);
+
+    /**
      * Transform each record of the input stream into zero or more records in the output stream (both key and value type
      * can be altered arbitrarily).
      * A {@link Transformer} (provided by the given {@link TransformerSupplier}) is applied to each input record and
@@ -682,52 +1154,465 @@ public interface KStream<K, V> {
                                            final String... stateStoreNames);
 
     /**
-     * Transform the value of each input record into a new value (with possibly a new type) of the output record.
-     * A {@link ValueTransformer} (provided by the given {@link ValueTransformerSupplier}) is applied to each input
-     * record value and computes a new value for it.
-     * Thus, an input record {@code <K,V>} can be transformed into an output record {@code <K:V'>}.
-     * This is a stateful record-by-record operation (cf. {@link #mapValues(ValueMapper)}).
-     * Furthermore, via {@link org.apache.kafka.streams.processor.Punctuator#punctuate(long)} the processing progress
-     * can be observed and additional periodic actions can be performed.
+     * Transform each record of the input stream into zero or more records in the output stream (both key and value type
+     * can be altered arbitrarily).
+     * A {@link Transformer} (provided by the given {@link TransformerSupplier}) is applied to each input record and
+     * returns zero or more output records.
+     * Thus, an input record {@code <K,V>} can be transformed into output records {@code <K':V'>, <K'':V''>, ...}.
+     * This is a stateful record-by-record operation (cf. {@link #flatMap(KeyValueMapper) flatMap()} for stateless
+     * record transformation).
+     * Furthermore, via {@link org.apache.kafka.streams.processor.Punctuator#punctuate(long) Punctuator#punctuate()}
+     * the processing progress can be observed and additional periodic actions can be performed.
      * <p>
-     * In order to assign a state store, the state store must be created and registered beforehand (it's not required to
-     * connect global state stores; read-only access to global state stores is available by default):
+     * In order to assign a state, the state must be created and registered beforehand (it's not required to connect
+     * global state stores; read-only access to global state stores is available by default):
      * <pre>{@code
      * // create store
      * StoreBuilder<KeyValueStore<String,String>> keyValueStoreBuilder =
-     *         Stores.keyValueStoreBuilder(Stores.persistentKeyValueStore("myValueTransformState"),
+     *         Stores.keyValueStoreBuilder(Stores.persistentKeyValueStore("myTransformState"),
      *                 Serdes.String(),
      *                 Serdes.String());
      * // register store
      * builder.addStateStore(keyValueStoreBuilder);
      *
-     * KStream outputStream = inputStream.transformValues(new ValueTransformerSupplier() { ... }, "myValueTransformState");
+     * KStream outputStream = inputStream.flatTransform(new TransformerSupplier() { ... }, "myTransformState");
      * }</pre>
-     * Within the {@link ValueTransformer}, the state store is obtained via the {@link ProcessorContext}.
-     * To trigger periodic actions via {@link org.apache.kafka.streams.processor.Punctuator#punctuate(long) punctuate()},
-     * a schedule must be registered.
-     * The {@link ValueTransformer} must return the new value in {@link ValueTransformer#transform(Object) transform()}.
-     * If the return value of {@link ValueTransformer#transform(Object) ValueTransformer#transform()} is {@code null},
-     * no records are emitted.
-     * In contrast to {@link #transform(TransformerSupplier, String...) transform()}, no additional {@link KeyValue}
-     * pairs can be emitted via {@link ProcessorContext#forward(Object, Object) ProcessorContext.forward()}.
-     * A {@link org.apache.kafka.streams.errors.StreamsException} is thrown if the {@link ValueTransformer} tries to
-     * emit a {@link KeyValue} pair.
+     * Within the {@link Transformer}, the state is obtained via the {@link ProcessorContext}.
+     * To trigger periodic actions via {@link org.apache.kafka.streams.processor.Punctuator#punctuate(long)
+     * punctuate()}, a schedule must be registered.
+     * The {@link Transformer} must return an {@link java.lang.Iterable} type (e.g., any {@link java.util.Collection}
+     * type) in {@link Transformer#transform(Object, Object) transform()}.
+     * The return value of {@link Transformer#transform(Object, Object) Transformer#transform()} may be {@code null},
+     * which is equal to returning an empty {@link java.lang.Iterable Iterable}, i.e., no records are emitted.
      * <pre>{@code
-     * new ValueTransformerSupplier() {
-     *     ValueTransformer get() {
-     *         return new ValueTransformer() {
+     * new TransformerSupplier() {
+     *     Transformer get() {
+     *         return new Transformer() {
+     *             private ProcessorContext context;
      *             private StateStore state;
      *
      *             void init(ProcessorContext context) {
-     *                 this.state = context.getStateStore("myValueTransformState");
-     *                 // punctuate each second, can access this.state
+     *                 this.context = context;
+     *                 this.state = context.getStateStore("myTransformState");
+     *                 // punctuate each second; can access this.state
      *                 context.schedule(Duration.ofSeconds(1), PunctuationType.WALL_CLOCK_TIME, new Punctuator(..));
      *             }
      *
-     *             NewValueType transform(V value) {
+     *             Iterable<KeyValue> transform(K key, V value) {
      *                 // can access this.state
-     *                 return new NewValueType(); // or null
+     *                 List<KeyValue> result = new ArrayList<>();
+     *                 for (int i = 0; i < 3; i++) {
+     *                     result.add(new KeyValue(key, value));
+     *                 }
+     *                 return result; // emits a list of key-value pairs via return
+     *             }
+     *
+     *             void close() {
+     *                 // can access this.state
+     *             }
+     *         }
+     *     }
+     * }
+     * }</pre>
+     * Even if any upstream operation was key-changing, no auto-repartition is triggered.
+     * If repartitioning is required, a call to {@link #through(String) through()} should be performed before
+     * {@code flatTransform()}.
+     * <p>
+     * Transforming records might result in an internal data redistribution if a key based operator (like an aggregation
+     * or join) is applied to the result {@code KStream}.
+     * (cf. {@link #transformValues(ValueTransformerSupplier, String...) transformValues()})
+     * <p>
+     * Note that it is possible to emit records by using {@link ProcessorContext#forward(Object, Object)
+     * context#forward()} in {@link Transformer#transform(Object, Object) Transformer#transform()} and
+     * {@link org.apache.kafka.streams.processor.Punctuator#punctuate(long) Punctuator#punctuate()}.
+     * Be aware that a mismatch between the types of the emitted records and the type of the stream would only be
+     * detected at runtime.
+     * To ensure type-safety at compile-time, {@link ProcessorContext#forward(Object, Object) context#forward()} should
+     * not be used in {@link Transformer#transform(Object, Object) Transformer#transform()} and
+     * {@link org.apache.kafka.streams.processor.Punctuator#punctuate(long) Punctuator#punctuate()}.
+     *
+     * @param transformerSupplier an instance of {@link TransformerSupplier} that generates a {@link Transformer}
+     * @param named               a {@link Named} config used to name the processor in the topology
+     * @param stateStoreNames     the names of the state stores used by the processor
+     * @param <K1>                the key type of the new stream
+     * @param <V1>                the value type of the new stream
+     * @return a {@code KStream} that contains more or less records with new key and value (possibly of different type)
+     * @see #flatMap(KeyValueMapper)
+     * @see #transform(TransformerSupplier, String...)
+     * @see #transformValues(ValueTransformerSupplier, String...)
+     * @see #transformValues(ValueTransformerWithKeySupplier, String...)
+     * @see #process(ProcessorSupplier, String...)
+     */
+    <K1, V1> KStream<K1, V1> flatTransform(final TransformerSupplier<? super K, ? super V, Iterable<KeyValue<K1, V1>>> transformerSupplier,
+                                           final Named named,
+                                           final String... stateStoreNames);
+
+    /**
+     * Transform the value of each input record into a new value (with possibly a new type) of the output record.
+     * A {@link ValueTransformer} (provided by the given {@link ValueTransformerSupplier}) is applied to each input
+     * record value and computes a new value for it.
+     * Thus, an input record {@code <K,V>} can be transformed into an output record {@code <K:V'>}.
+     * This is a stateful record-by-record operation (cf. {@link #mapValues(ValueMapper)}).
+     * Furthermore, via {@link org.apache.kafka.streams.processor.Punctuator#punctuate(long)} the processing progress
+     * can be observed and additional periodic actions can be performed.
+     * <p>
+     * In order to assign a state store, the state store must be created and registered beforehand (it's not required to
+     * connect global state stores; read-only access to global state stores is available by default):
+     * <pre>{@code
+     * // create store
+     * StoreBuilder<KeyValueStore<String,String>> keyValueStoreBuilder =
+     *         Stores.keyValueStoreBuilder(Stores.persistentKeyValueStore("myValueTransformState"),
+     *                 Serdes.String(),
+     *                 Serdes.String());
+     * // register store
+     * builder.addStateStore(keyValueStoreBuilder);
+     *
+     * KStream outputStream = inputStream.transformValues(new ValueTransformerSupplier() { ... }, "myValueTransformState");
+     * }</pre>
+     * Within the {@link ValueTransformer}, the state store is obtained via the {@link ProcessorContext}.
+     * To trigger periodic actions via {@link org.apache.kafka.streams.processor.Punctuator#punctuate(long) punctuate()},
+     * a schedule must be registered.
+     * The {@link ValueTransformer} must return the new value in {@link ValueTransformer#transform(Object) transform()}.
+     * If the return value of {@link ValueTransformer#transform(Object) ValueTransformer#transform()} is {@code null},
+     * no records are emitted.
+     * In contrast to {@link #transform(TransformerSupplier, String...) transform()}, no additional {@link KeyValue}
+     * pairs can be emitted via {@link ProcessorContext#forward(Object, Object) ProcessorContext.forward()}.
+     * A {@link org.apache.kafka.streams.errors.StreamsException} is thrown if the {@link ValueTransformer} tries to
+     * emit a {@link KeyValue} pair.
+     * <pre>{@code
+     * new ValueTransformerSupplier() {
+     *     ValueTransformer get() {
+     *         return new ValueTransformer() {
+     *             private StateStore state;
+     *
+     *             void init(ProcessorContext context) {
+     *                 this.state = context.getStateStore("myValueTransformState");
+     *                 // punctuate each second, can access this.state
+     *                 context.schedule(Duration.ofSeconds(1), PunctuationType.WALL_CLOCK_TIME, new Punctuator(..));
+     *             }
+     *
+     *             NewValueType transform(V value) {
+     *                 // can access this.state
+     *                 return new NewValueType(); // or null
+     *             }
+     *
+     *             void close() {
+     *                 // can access this.state
+     *             }
+     *         }
+     *     }
+     * }
+     * }</pre>
+     * Even if any upstream operation was key-changing, no auto-repartition is triggered.
+     * If repartitioning is required, a call to {@link #through(String) through()} should be performed before
+     * {@code transformValues()}.
+     * <p>
+     * Setting a new value preserves data co-location with respect to the key.
+     * Thus, <em>no</em> internal data redistribution is required if a key based operator (like an aggregation or join)
+     * is applied to the result {@code KStream}. (cf. {@link #transform(TransformerSupplier, String...)})
+     *
+     * @param valueTransformerSupplier a instance of {@link ValueTransformerSupplier} that generates a
+     *                                 {@link ValueTransformer}
+     * @param stateStoreNames          the names of the state stores used by the processor
+     * @param <VR>                     the value type of the result stream
+     * @return a {@code KStream} that contains records with unmodified key and new values (possibly of different type)
+     * @see #mapValues(ValueMapper)
+     * @see #mapValues(ValueMapperWithKey)
+     * @see #transform(TransformerSupplier, String...)
+     */
+    <VR> KStream<K, VR> transformValues(final ValueTransformerSupplier<? super V, ? extends VR> valueTransformerSupplier,
+                                        final String... stateStoreNames);
+    /**
+     * Transform the value of each input record into a new value (with possibly a new type) of the output record.
+     * A {@link ValueTransformer} (provided by the given {@link ValueTransformerSupplier}) is applied to each input
+     * record value and computes a new value for it.
+     * Thus, an input record {@code <K,V>} can be transformed into an output record {@code <K:V'>}.
+     * This is a stateful record-by-record operation (cf. {@link #mapValues(ValueMapper)}).
+     * Furthermore, via {@link org.apache.kafka.streams.processor.Punctuator#punctuate(long)} the processing progress
+     * can be observed and additional periodic actions can be performed.
+     * <p>
+     * In order to assign a state store, the state store must be created and registered beforehand (it's not required to
+     * connect global state stores; read-only access to global state stores is available by default):
+     * <pre>{@code
+     * // create store
+     * StoreBuilder<KeyValueStore<String,String>> keyValueStoreBuilder =
+     *         Stores.keyValueStoreBuilder(Stores.persistentKeyValueStore("myValueTransformState"),
+     *                 Serdes.String(),
+     *                 Serdes.String());
+     * // register store
+     * builder.addStateStore(keyValueStoreBuilder);
+     *
+     * KStream outputStream = inputStream.transformValues(new ValueTransformerSupplier() { ... }, "myValueTransformState");
+     * }</pre>
+     * Within the {@link ValueTransformer}, the state store is obtained via the {@link ProcessorContext}.
+     * To trigger periodic actions via {@link org.apache.kafka.streams.processor.Punctuator#punctuate(long) punctuate()},
+     * a schedule must be registered.
+     * The {@link ValueTransformer} must return the new value in {@link ValueTransformer#transform(Object) transform()}.
+     * If the return value of {@link ValueTransformer#transform(Object) ValueTransformer#transform()} is {@code null}, no
+     * records are emitted.
+     * In contrast to {@link #transform(TransformerSupplier, String...) transform()}, no additional {@link KeyValue}
+     * pairs can be emitted via {@link ProcessorContext#forward(Object, Object) ProcessorContext.forward()}.
+     * A {@link org.apache.kafka.streams.errors.StreamsException} is thrown if the {@link ValueTransformer} tries to
+     * emit a {@link KeyValue} pair.
+     * <pre>{@code
+     * new ValueTransformerSupplier() {
+     *     ValueTransformer get() {
+     *         return new ValueTransformer() {
+     *             private StateStore state;
+     *
+     *             void init(ProcessorContext context) {
+     *                 this.state = context.getStateStore("myValueTransformState");
+     *                 // punctuate each second, can access this.state
+     *                 context.schedule(Duration.ofSeconds(1), PunctuationType.WALL_CLOCK_TIME, new Punctuator(..));
+     *             }
+     *
+     *             NewValueType transform(V value) {
+     *                 // can access this.state
+     *                 return new NewValueType(); // or null
+     *             }
+     *
+     *             void close() {
+     *                 // can access this.state
+     *             }
+     *         }
+     *     }
+     * }
+     * }</pre>
+     * Even if any upstream operation was key-changing, no auto-repartition is triggered.
+     * If repartitioning is required, a call to {@link #through(String) through()} should be performed before
+     * {@code transformValues()}.
+     * <p>
+     * Setting a new value preserves data co-location with respect to the key.
+     * Thus, <em>no</em> internal data redistribution is required if a key based operator (like an aggregation or join)
+     * is applied to the result {@code KStream}. (cf. {@link #transform(TransformerSupplier, String...)})
+     *
+     * @param valueTransformerSupplier a instance of {@link ValueTransformerSupplier} that generates a
+     *                                 {@link ValueTransformer}
+     * @param named                    a {@link Named} config used to name the processor in the topology
+     * @param stateStoreNames          the names of the state stores used by the processor
+     * @param <VR>                     the value type of the result stream
+     * @return a {@code KStream} that contains records with unmodified key and new values (possibly of different type)
+     * @see #mapValues(ValueMapper)
+     * @see #mapValues(ValueMapperWithKey)
+     * @see #transform(TransformerSupplier, String...)
+     */
+    <VR> KStream<K, VR> transformValues(final ValueTransformerSupplier<? super V, ? extends VR> valueTransformerSupplier,
+                                        final Named named,
+                                        final String... stateStoreNames);
+
+    /**
+     * Transform the value of each input record into a new value (with possibly a new type) of the output record.
+     * A {@link ValueTransformerWithKey} (provided by the given {@link ValueTransformerWithKeySupplier}) is applied to
+     * each input record value and computes a new value for it.
+     * Thus, an input record {@code <K,V>} can be transformed into an output record {@code <K:V'>}.
+     * This is a stateful record-by-record operation (cf. {@link #mapValues(ValueMapperWithKey)}).
+     * Furthermore, via {@link org.apache.kafka.streams.processor.Punctuator#punctuate(long)} the processing progress
+     * can be observed and additional periodic actions can be performed.
+     * <p>
+     * In order to assign a state store, the state store must be created and registered beforehand (it's not required to
+     * connect global state stores; read-only access to global state stores is available by default):
+     * <pre>{@code
+     * // create store
+     * StoreBuilder<KeyValueStore<String,String>> keyValueStoreBuilder =
+     *         Stores.keyValueStoreBuilder(Stores.persistentKeyValueStore("myValueTransformState"),
+     *                 Serdes.String(),
+     *                 Serdes.String());
+     * // register store
+     * builder.addStateStore(keyValueStoreBuilder);
+     *
+     * KStream outputStream = inputStream.transformValues(new ValueTransformerWithKeySupplier() { ... }, "myValueTransformState");
+     * }</pre>
+     * Within the {@link ValueTransformerWithKey}, the state store is obtained via the {@link ProcessorContext}.
+     * To trigger periodic actions via {@link org.apache.kafka.streams.processor.Punctuator#punctuate(long) punctuate()},
+     * a schedule must be registered.
+     * The {@link ValueTransformerWithKey} must return the new value in
+     * {@link ValueTransformerWithKey#transform(Object, Object) transform()}.
+     * If the return value of {@link ValueTransformerWithKey#transform(Object, Object) ValueTransformerWithKey#transform()}
+     * is {@code null}, no records are emitted.
+     * In contrast to {@link #transform(TransformerSupplier, String...) transform()} and
+     * {@link #flatTransform(TransformerSupplier, String...) flatTransform()}, no additional {@link KeyValue} pairs
+     * can be emitted via {@link ProcessorContext#forward(Object, Object) ProcessorContext.forward()}.
+     * A {@link org.apache.kafka.streams.errors.StreamsException} is thrown if the {@link ValueTransformerWithKey} tries
+     * to emit a {@link KeyValue} pair.
+     * <pre>{@code
+     * new ValueTransformerWithKeySupplier() {
+     *     ValueTransformerWithKey get() {
+     *         return new ValueTransformerWithKey() {
+     *             private StateStore state;
+     *
+     *             void init(ProcessorContext context) {
+     *                 this.state = context.getStateStore("myValueTransformState");
+     *                 // punctuate each second, can access this.state
+     *                 context.schedule(Duration.ofSeconds(1), PunctuationType.WALL_CLOCK_TIME, new Punctuator(..));
+     *             }
+     *
+     *             NewValueType transform(K readOnlyKey, V value) {
+     *                 // can access this.state and use read-only key
+     *                 return new NewValueType(readOnlyKey); // or null
+     *             }
+     *
+     *             void close() {
+     *                 // can access this.state
+     *             }
+     *         }
+     *     }
+     * }
+     * }</pre>
+     * Even if any upstream operation was key-changing, no auto-repartition is triggered.
+     * If repartitioning is required, a call to {@link #through(String) through()} should be performed before
+     * {@code transformValues()}.
+     * <p>
+     * Note that the key is read-only and should not be modified, as this can lead to corrupt partitioning.
+     * So, setting a new value preserves data co-location with respect to the key.
+     * Thus, <em>no</em> internal data redistribution is required if a key based operator (like an aggregation or join)
+     * is applied to the result {@code KStream}. (cf. {@link #transform(TransformerSupplier, String...)})
+     *
+     * @param valueTransformerSupplier a instance of {@link ValueTransformerWithKeySupplier} that generates a
+     *                                 {@link ValueTransformerWithKey}
+     * @param stateStoreNames          the names of the state stores used by the processor
+     * @param <VR>                     the value type of the result stream
+     * @return a {@code KStream} that contains records with unmodified key and new values (possibly of different type)
+     * @see #mapValues(ValueMapper)
+     * @see #mapValues(ValueMapperWithKey)
+     * @see #transform(TransformerSupplier, String...)
+     */
+    <VR> KStream<K, VR> transformValues(final ValueTransformerWithKeySupplier<? super K, ? super V, ? extends VR> valueTransformerSupplier,
+                                        final String... stateStoreNames);
+
+    /**
+     * Transform the value of each input record into a new value (with possibly a new type) of the output record.
+     * A {@link ValueTransformerWithKey} (provided by the given {@link ValueTransformerWithKeySupplier}) is applied to
+     * each input record value and computes a new value for it.
+     * Thus, an input record {@code <K,V>} can be transformed into an output record {@code <K:V'>}.
+     * This is a stateful record-by-record operation (cf. {@link #mapValues(ValueMapperWithKey)}).
+     * Furthermore, via {@link org.apache.kafka.streams.processor.Punctuator#punctuate(long)} the processing progress
+     * can be observed and additional periodic actions can be performed.
+     * <p>
+     * In order to assign a state store, the state store must be created and registered beforehand (it's not required to
+     * connect global state stores; read-only access to global state stores is available by default):
+     * <pre>{@code
+     * // create store
+     * StoreBuilder<KeyValueStore<String,String>> keyValueStoreBuilder =
+     *         Stores.keyValueStoreBuilder(Stores.persistentKeyValueStore("myValueTransformState"),
+     *                 Serdes.String(),
+     *                 Serdes.String());
+     * // register store
+     * builder.addStateStore(keyValueStoreBuilder);
+     *
+     * KStream outputStream = inputStream.transformValues(new ValueTransformerWithKeySupplier() { ... }, "myValueTransformState");
+     * }</pre>
+     * Within the {@link ValueTransformerWithKey}, the state store is obtained via the {@link ProcessorContext}.
+     * To trigger periodic actions via {@link org.apache.kafka.streams.processor.Punctuator#punctuate(long) punctuate()},
+     * a schedule must be registered.
+     * The {@link ValueTransformerWithKey} must return the new value in
+     * {@link ValueTransformerWithKey#transform(Object, Object) transform()}.
+     * If the return value of {@link ValueTransformerWithKey#transform(Object, Object) ValueTransformerWithKey#transform()}
+     * is {@code null}, no records are emitted.
+     * In contrast to {@link #transform(TransformerSupplier, String...) transform()} and
+     * {@link #flatTransform(TransformerSupplier, String...) flatTransform()}, no additional {@link KeyValue} pairs
+     * can be emitted via {@link ProcessorContext#forward(Object, Object) ProcessorContext.forward()}.
+     * A {@link org.apache.kafka.streams.errors.StreamsException} is thrown if the {@link ValueTransformerWithKey} tries
+     * to emit a {@link KeyValue} pair.
+     * <pre>{@code
+     * new ValueTransformerWithKeySupplier() {
+     *     ValueTransformerWithKey get() {
+     *         return new ValueTransformerWithKey() {
+     *             private StateStore state;
+     *
+     *             void init(ProcessorContext context) {
+     *                 this.state = context.getStateStore("myValueTransformState");
+     *                 // punctuate each second, can access this.state
+     *                 context.schedule(Duration.ofSeconds(1), PunctuationType.WALL_CLOCK_TIME, new Punctuator(..));
+     *             }
+     *
+     *             NewValueType transform(K readOnlyKey, V value) {
+     *                 // can access this.state and use read-only key
+     *                 return new NewValueType(readOnlyKey); // or null
+     *             }
+     *
+     *             void close() {
+     *                 // can access this.state
+     *             }
+     *         }
+     *     }
+     * }
+     * }</pre>
+     * Even if any upstream operation was key-changing, no auto-repartition is triggered.
+     * If repartitioning is required, a call to {@link #through(String) through()} should be performed before
+     * {@code transformValues()}.
+     * <p>
+     * Note that the key is read-only and should not be modified, as this can lead to corrupt partitioning.
+     * So, setting a new value preserves data co-location with respect to the key.
+     * Thus, <em>no</em> internal data redistribution is required if a key based operator (like an aggregation or join)
+     * is applied to the result {@code KStream}. (cf. {@link #transform(TransformerSupplier, String...)})
+     *
+     * @param valueTransformerSupplier a instance of {@link ValueTransformerWithKeySupplier} that generates a
+     *                                 {@link ValueTransformerWithKey}
+     * @param named                    a {@link Named} config used to name the processor in the topology
+     * @param stateStoreNames          the names of the state stores used by the processor
+     * @param <VR>                     the value type of the result stream
+     * @return a {@code KStream} that contains records with unmodified key and new values (possibly of different type)
+     * @see #mapValues(ValueMapper)
+     * @see #mapValues(ValueMapperWithKey)
+     * @see #transform(TransformerSupplier, String...)
+     */
+    <VR> KStream<K, VR> transformValues(final ValueTransformerWithKeySupplier<? super K, ? super V, ? extends VR> valueTransformerSupplier,
+                                        final Named named,
+                                        final String... stateStoreNames);
+    /**
+     * Transform the value of each input record into zero or more new values (with possibly a new
+     * type) and emit for each new value a record with the same key of the input record and the value.
+     * A {@link ValueTransformer} (provided by the given {@link ValueTransformerSupplier}) is applied to each input
+     * record value and computes zero or more new values.
+     * Thus, an input record {@code <K,V>} can be transformed into output records {@code <K:V'>, <K:V''>, ...}.
+     * This is a stateful record-by-record operation (cf. {@link #mapValues(ValueMapper) mapValues()}).
+     * Furthermore, via {@link org.apache.kafka.streams.processor.Punctuator#punctuate(long) Punctuator#punctuate()}
+     * the processing progress can be observed and additional periodic actions can be performed.
+     * <p>
+     * In order to assign a state store, the state store must be created and registered beforehand:
+     * <pre>{@code
+     * // create store
+     * StoreBuilder<KeyValueStore<String,String>> keyValueStoreBuilder =
+     *         Stores.keyValueStoreBuilder(Stores.persistentKeyValueStore("myValueTransformState"),
+     *                 Serdes.String(),
+     *                 Serdes.String());
+     * // register store
+     * builder.addStateStore(keyValueStoreBuilder);
+     *
+     * KStream outputStream = inputStream.flatTransformValues(new ValueTransformerSupplier() { ... }, "myValueTransformState");
+     * }</pre>
+     * Within the {@link ValueTransformer}, the state store is obtained via the {@link ProcessorContext}.
+     * To trigger periodic actions via {@link org.apache.kafka.streams.processor.Punctuator#punctuate(long) punctuate()},
+     * a schedule must be registered.
+     * The {@link ValueTransformer} must return an {@link java.lang.Iterable} type (e.g., any
+     * {@link java.util.Collection} type) in {@link ValueTransformer#transform(Object)
+     * transform()}.
+     * If the return value of {@link ValueTransformer#transform(Object) ValueTransformer#transform()} is an empty
+     * {@link java.lang.Iterable Iterable} or {@code null}, no records are emitted.
+     * In contrast to {@link #transform(TransformerSupplier, String...) transform()} and
+     * {@link #flatTransform(TransformerSupplier, String...) flatTransform()}, no additional {@link KeyValue} pairs
+     * can be emitted via {@link ProcessorContext#forward(Object, Object) ProcessorContext.forward()}.
+     * A {@link org.apache.kafka.streams.errors.StreamsException} is thrown if the {@link ValueTransformer} tries to
+     * emit a {@link KeyValue} pair.
+     * <pre>{@code
+     * new ValueTransformerSupplier() {
+     *     ValueTransformer get() {
+     *         return new ValueTransformer() {
+     *             private StateStore state;
+     *
+     *             void init(ProcessorContext context) {
+     *                 this.state = context.getStateStore("myValueTransformState");
+     *                 // punctuate each second, can access this.state
+     *                 context.schedule(Duration.ofSeconds(1), PunctuationType.WALL_CLOCK_TIME, new Punctuator(..));
+     *             }
+     *
+     *             Iterable<NewValueType> transform(V value) {
+     *                 // can access this.state
+     *                 List<NewValueType> result = new ArrayList<>();
+     *                 for (int i = 0; i < 3; i++) {
+     *                     result.add(new NewValueType(value));
+     *                 }
+     *                 return result; // values
      *             }
      *
      *             void close() {
@@ -739,35 +1624,38 @@ public interface KStream<K, V> {
      * }</pre>
      * Even if any upstream operation was key-changing, no auto-repartition is triggered.
      * If repartitioning is required, a call to {@link #through(String) through()} should be performed before
-     * {@code transformValues()}.
+     * {@code flatTransformValues()}.
      * <p>
      * Setting a new value preserves data co-location with respect to the key.
      * Thus, <em>no</em> internal data redistribution is required if a key based operator (like an aggregation or join)
-     * is applied to the result {@code KStream}. (cf. {@link #transform(TransformerSupplier, String...)})
+     * is applied to the result {@code KStream}. (cf. {@link #flatTransform(TransformerSupplier, String...)
+     * flatTransform()})
      *
-     * @param valueTransformerSupplier a instance of {@link ValueTransformerSupplier} that generates a
+     * @param valueTransformerSupplier an instance of {@link ValueTransformerSupplier} that generates a
      *                                 {@link ValueTransformer}
      * @param stateStoreNames          the names of the state stores used by the processor
      * @param <VR>                     the value type of the result stream
-     * @return a {@code KStream} that contains records with unmodified key and new values (possibly of different type)
+     * @return a {@code KStream} that contains more or less records with unmodified key and new values (possibly of
+     * different type)
      * @see #mapValues(ValueMapper)
      * @see #mapValues(ValueMapperWithKey)
      * @see #transform(TransformerSupplier, String...)
+     * @see #flatTransform(TransformerSupplier, String...)
      */
-    <VR> KStream<K, VR> transformValues(final ValueTransformerSupplier<? super V, ? extends VR> valueTransformerSupplier,
-                                        final String... stateStoreNames);
+    <VR> KStream<K, VR> flatTransformValues(final ValueTransformerSupplier<? super V, Iterable<VR>> valueTransformerSupplier,
+                                            final String... stateStoreNames);
 
     /**
-     * Transform the value of each input record into a new value (with possibly a new type) of the output record.
-     * A {@link ValueTransformerWithKey} (provided by the given {@link ValueTransformerWithKeySupplier}) is applied to
-     * each input record value and computes a new value for it.
-     * Thus, an input record {@code <K,V>} can be transformed into an output record {@code <K:V'>}.
-     * This is a stateful record-by-record operation (cf. {@link #mapValues(ValueMapperWithKey)}).
-     * Furthermore, via {@link org.apache.kafka.streams.processor.Punctuator#punctuate(long)} the processing progress
-     * can be observed and additional periodic actions can be performed.
+     * Transform the value of each input record into zero or more new values (with possibly a new
+     * type) and emit for each new value a record with the same key of the input record and the value.
+     * A {@link ValueTransformer} (provided by the given {@link ValueTransformerSupplier}) is applied to each input
+     * record value and computes zero or more new values.
+     * Thus, an input record {@code <K,V>} can be transformed into output records {@code <K:V'>, <K:V''>, ...}.
+     * This is a stateful record-by-record operation (cf. {@link #mapValues(ValueMapper) mapValues()}).
+     * Furthermore, via {@link org.apache.kafka.streams.processor.Punctuator#punctuate(long) Punctuator#punctuate()}
+     * the processing progress can be observed and additional periodic actions can be performed.
      * <p>
-     * In order to assign a state store, the state store must be created and registered beforehand (it's not required to
-     * connect global state stores; read-only access to global state stores is available by default):
+     * In order to assign a state store, the state store must be created and registered beforehand:
      * <pre>{@code
      * // create store
      * StoreBuilder<KeyValueStore<String,String>> keyValueStoreBuilder =
@@ -777,24 +1665,25 @@ public interface KStream<K, V> {
      * // register store
      * builder.addStateStore(keyValueStoreBuilder);
      *
-     * KStream outputStream = inputStream.transformValues(new ValueTransformerWithKeySupplier() { ... }, "myValueTransformState");
+     * KStream outputStream = inputStream.flatTransformValues(new ValueTransformerSupplier() { ... }, "myValueTransformState");
      * }</pre>
-     * Within the {@link ValueTransformerWithKey}, the state store is obtained via the {@link ProcessorContext}.
+     * Within the {@link ValueTransformer}, the state store is obtained via the {@link ProcessorContext}.
      * To trigger periodic actions via {@link org.apache.kafka.streams.processor.Punctuator#punctuate(long) punctuate()},
      * a schedule must be registered.
-     * The {@link ValueTransformerWithKey} must return the new value in
-     * {@link ValueTransformerWithKey#transform(Object, Object) transform()}.
-     * If the return value of {@link ValueTransformerWithKey#transform(Object, Object) ValueTransformerWithKey#transform()}
-     * is {@code null}, no records are emitted.
+     * The {@link ValueTransformer} must return an {@link java.lang.Iterable} type (e.g., any
+     * {@link java.util.Collection} type) in {@link ValueTransformer#transform(Object)
+     * transform()}.
+     * If the return value of {@link ValueTransformer#transform(Object) ValueTransformer#transform()} is an empty
+     * {@link java.lang.Iterable Iterable} or {@code null}, no records are emitted.
      * In contrast to {@link #transform(TransformerSupplier, String...) transform()} and
      * {@link #flatTransform(TransformerSupplier, String...) flatTransform()}, no additional {@link KeyValue} pairs
      * can be emitted via {@link ProcessorContext#forward(Object, Object) ProcessorContext.forward()}.
-     * A {@link org.apache.kafka.streams.errors.StreamsException} is thrown if the {@link ValueTransformerWithKey} tries
-     * to emit a {@link KeyValue} pair.
+     * A {@link org.apache.kafka.streams.errors.StreamsException} is thrown if the {@link ValueTransformer} tries to
+     * emit a {@link KeyValue} pair.
      * <pre>{@code
-     * new ValueTransformerWithKeySupplier() {
-     *     ValueTransformerWithKey get() {
-     *         return new ValueTransformerWithKey() {
+     * new ValueTransformerSupplier() {
+     *     ValueTransformer get() {
+     *         return new ValueTransformer() {
      *             private StateStore state;
      *
      *             void init(ProcessorContext context) {
@@ -803,9 +1692,13 @@ public interface KStream<K, V> {
      *                 context.schedule(Duration.ofSeconds(1), PunctuationType.WALL_CLOCK_TIME, new Punctuator(..));
      *             }
      *
-     *             NewValueType transform(K readOnlyKey, V value) {
-     *                 // can access this.state and use read-only key
-     *                 return new NewValueType(readOnlyKey); // or null
+     *             Iterable<NewValueType> transform(V value) {
+     *                 // can access this.state
+     *                 List<NewValueType> result = new ArrayList<>();
+     *                 for (int i = 0; i < 3; i++) {
+     *                     result.add(new NewValueType(value));
+     *                 }
+     *                 return result; // values
      *             }
      *
      *             void close() {
@@ -817,34 +1710,38 @@ public interface KStream<K, V> {
      * }</pre>
      * Even if any upstream operation was key-changing, no auto-repartition is triggered.
      * If repartitioning is required, a call to {@link #through(String) through()} should be performed before
-     * {@code transformValues()}.
+     * {@code flatTransformValues()}.
      * <p>
-     * Note that the key is read-only and should not be modified, as this can lead to corrupt partitioning.
-     * So, setting a new value preserves data co-location with respect to the key.
+     * Setting a new value preserves data co-location with respect to the key.
      * Thus, <em>no</em> internal data redistribution is required if a key based operator (like an aggregation or join)
-     * is applied to the result {@code KStream}. (cf. {@link #transform(TransformerSupplier, String...)})
+     * is applied to the result {@code KStream}. (cf. {@link #flatTransform(TransformerSupplier, String...)
+     * flatTransform()})
      *
-     * @param valueTransformerSupplier a instance of {@link ValueTransformerWithKeySupplier} that generates a
-     *                                 {@link ValueTransformerWithKey}
+     * @param valueTransformerSupplier an instance of {@link ValueTransformerSupplier} that generates a
+     *                                 {@link ValueTransformer}
+     * @param named                    a {@link Named} config used to name the processor in the topology
      * @param stateStoreNames          the names of the state stores used by the processor
      * @param <VR>                     the value type of the result stream
-     * @return a {@code KStream} that contains records with unmodified key and new values (possibly of different type)
+     * @return a {@code KStream} that contains more or less records with unmodified key and new values (possibly of
+     * different type)
      * @see #mapValues(ValueMapper)
      * @see #mapValues(ValueMapperWithKey)
      * @see #transform(TransformerSupplier, String...)
+     * @see #flatTransform(TransformerSupplier, String...)
      */
-    <VR> KStream<K, VR> transformValues(final ValueTransformerWithKeySupplier<? super K, ? super V, ? extends VR> valueTransformerSupplier,
-                                        final String... stateStoreNames);
+    <VR> KStream<K, VR> flatTransformValues(final ValueTransformerSupplier<? super V, Iterable<VR>> valueTransformerSupplier,
+                                            final Named named,
+                                            final String... stateStoreNames);
 
     /**
      * Transform the value of each input record into zero or more new values (with possibly a new
      * type) and emit for each new value a record with the same key of the input record and the value.
-     * A {@link ValueTransformer} (provided by the given {@link ValueTransformerSupplier}) is applied to each input
-     * record value and computes zero or more new values.
+     * A {@link ValueTransformerWithKey} (provided by the given {@link ValueTransformerWithKeySupplier}) is applied to
+     * each input record value and computes zero or more new values.
      * Thus, an input record {@code <K,V>} can be transformed into output records {@code <K:V'>, <K:V''>, ...}.
-     * This is a stateful record-by-record operation (cf. {@link #mapValues(ValueMapper) mapValues()}).
-     * Furthermore, via {@link org.apache.kafka.streams.processor.Punctuator#punctuate(long) Punctuator#punctuate()}
-     * the processing progress can be observed and additional periodic actions can be performed.
+     * This is a stateful record-by-record operation (cf. {@link #flatMapValues(ValueMapperWithKey) flatMapValues()}).
+     * Furthermore, via {@link org.apache.kafka.streams.processor.Punctuator#punctuate(long)} the processing progress can
+     * be observed and additional periodic actions can be performed.
      * <p>
      * In order to assign a state store, the state store must be created and registered beforehand:
      * <pre>{@code
@@ -856,25 +1753,25 @@ public interface KStream<K, V> {
      * // register store
      * builder.addStateStore(keyValueStoreBuilder);
      *
-     * KStream outputStream = inputStream.flatTransformValues(new ValueTransformerSupplier() { ... }, "myValueTransformState");
+     * KStream outputStream = inputStream.flatTransformValues(new ValueTransformerWithKeySupplier() { ... }, "myValueTransformState");
      * }</pre>
-     * Within the {@link ValueTransformer}, the state store is obtained via the {@link ProcessorContext}.
+     * Within the {@link ValueTransformerWithKey}, the state store is obtained via the {@link ProcessorContext}.
      * To trigger periodic actions via {@link org.apache.kafka.streams.processor.Punctuator#punctuate(long) punctuate()},
      * a schedule must be registered.
-     * The {@link ValueTransformer} must return an {@link java.lang.Iterable} type (e.g., any
-     * {@link java.util.Collection} type) in {@link ValueTransformer#transform(Object)
+     * The {@link ValueTransformerWithKey} must return an {@link java.lang.Iterable} type (e.g., any
+     * {@link java.util.Collection} type) in {@link ValueTransformerWithKey#transform(Object, Object)
      * transform()}.
-     * If the return value of {@link ValueTransformer#transform(Object) ValueTransformer#transform()} is an empty
-     * {@link java.lang.Iterable Iterable} or {@code null}, no records are emitted.
+     * If the return value of {@link ValueTransformerWithKey#transform(Object, Object) ValueTransformerWithKey#transform()}
+     * is an empty {@link java.lang.Iterable Iterable} or {@code null}, no records are emitted.
      * In contrast to {@link #transform(TransformerSupplier, String...) transform()} and
      * {@link #flatTransform(TransformerSupplier, String...) flatTransform()}, no additional {@link KeyValue} pairs
      * can be emitted via {@link ProcessorContext#forward(Object, Object) ProcessorContext.forward()}.
-     * A {@link org.apache.kafka.streams.errors.StreamsException} is thrown if the {@link ValueTransformer} tries to
-     * emit a {@link KeyValue} pair.
+     * A {@link org.apache.kafka.streams.errors.StreamsException} is thrown if the {@link ValueTransformerWithKey} tries
+     * to emit a {@link KeyValue} pair.
      * <pre>{@code
-     * new ValueTransformerSupplier() {
-     *     ValueTransformer get() {
-     *         return new ValueTransformer() {
+     * new ValueTransformerWithKeySupplier() {
+     *     ValueTransformerWithKey get() {
+     *         return new ValueTransformerWithKey() {
      *             private StateStore state;
      *
      *             void init(ProcessorContext context) {
@@ -883,11 +1780,11 @@ public interface KStream<K, V> {
      *                 context.schedule(Duration.ofSeconds(1), PunctuationType.WALL_CLOCK_TIME, new Punctuator(..));
      *             }
      *
-     *             Iterable<NewValueType> transform(V value) {
-     *                 // can access this.state
+     *             Iterable<NewValueType> transform(K readOnlyKey, V value) {
+     *                 // can access this.state and use read-only key
      *                 List<NewValueType> result = new ArrayList<>();
      *                 for (int i = 0; i < 3; i++) {
-     *                     result.add(new NewValueType(value));
+     *                     result.add(new NewValueType(readOnlyKey));
      *                 }
      *                 return result; // values
      *             }
@@ -903,13 +1800,14 @@ public interface KStream<K, V> {
      * If repartitioning is required, a call to {@link #through(String) through()} should be performed before
      * {@code flatTransformValues()}.
      * <p>
-     * Setting a new value preserves data co-location with respect to the key.
+     * Note that the key is read-only and should not be modified, as this can lead to corrupt partitioning.
+     * So, setting a new value preserves data co-location with respect to the key.
      * Thus, <em>no</em> internal data redistribution is required if a key based operator (like an aggregation or join)
      * is applied to the result {@code KStream}. (cf. {@link #flatTransform(TransformerSupplier, String...)
      * flatTransform()})
      *
-     * @param valueTransformerSupplier an instance of {@link ValueTransformerSupplier} that generates a
-     *                                 {@link ValueTransformer}
+     * @param valueTransformerSupplier a instance of {@link ValueTransformerWithKeySupplier} that generates a
+     *                                 {@link ValueTransformerWithKey}
      * @param stateStoreNames          the names of the state stores used by the processor
      * @param <VR>                     the value type of the result stream
      * @return a {@code KStream} that contains more or less records with unmodified key and new values (possibly of
@@ -919,7 +1817,7 @@ public interface KStream<K, V> {
      * @see #transform(TransformerSupplier, String...)
      * @see #flatTransform(TransformerSupplier, String...)
      */
-    <VR> KStream<K, VR> flatTransformValues(final ValueTransformerSupplier<? super V, Iterable<VR>> valueTransformerSupplier,
+    <VR> KStream<K, VR> flatTransformValues(final ValueTransformerWithKeySupplier<? super K, ? super V, Iterable<VR>> valueTransformerSupplier,
                                             final String... stateStoreNames);
 
     /**
@@ -929,8 +1827,8 @@ public interface KStream<K, V> {
      * each input record value and computes zero or more new values.
      * Thus, an input record {@code <K,V>} can be transformed into output records {@code <K:V'>, <K:V''>, ...}.
      * This is a stateful record-by-record operation (cf. {@link #flatMapValues(ValueMapperWithKey) flatMapValues()}).
-     * Furthermore, via {@link org.apache.kafka.streams.processor.Punctuator#punctuate(long) punctuate()} the processing
-     * progress can be observed and additional periodic actions can be performed.
+     * Furthermore, via {@link org.apache.kafka.streams.processor.Punctuator#punctuate(long)} the processing progress can
+     * be observed and additional periodic actions can be performed.
      * <p>
      * In order to assign a state store, the state store must be created and registered beforehand:
      * <pre>{@code
@@ -997,6 +1895,7 @@ public interface KStream<K, V> {
      *
      * @param valueTransformerSupplier a instance of {@link ValueTransformerWithKeySupplier} that generates a
      *                                 {@link ValueTransformerWithKey}
+     * @param named                    a {@link Named} config used to name the processor in the topology
      * @param stateStoreNames          the names of the state stores used by the processor
      * @param <VR>                     the value type of the result stream
      * @return a {@code KStream} that contains more or less records with unmodified key and new values (possibly of
@@ -1007,8 +1906,67 @@ public interface KStream<K, V> {
      * @see #flatTransform(TransformerSupplier, String...)
      */
     <VR> KStream<K, VR> flatTransformValues(final ValueTransformerWithKeySupplier<? super K, ? super V, Iterable<VR>> valueTransformerSupplier,
+                                            final Named named,
                                             final String... stateStoreNames);
 
+    /**
+     * Process all records in this stream, one record at a time, by applying a {@link Processor} (provided by the given
+     * {@link ProcessorSupplier}).
+     * This is a stateful record-by-record operation (cf. {@link #foreach(ForeachAction)}).
+     * Furthermore, via {@link org.apache.kafka.streams.processor.Punctuator#punctuate(long)} the processing progress
+     * can be observed and additional periodic actions can be performed.
+     * Note that this is a terminal operation that returns void.
+     * <p>
+     * In order to assign a state, the state must be created and registered beforehand (it's not required to connect
+     * global state stores; read-only access to global state stores is available by default):
+     * <pre>{@code
+     * // create store
+     * StoreBuilder<KeyValueStore<String,String>> keyValueStoreBuilder =
+     *         Stores.keyValueStoreBuilder(Stores.persistentKeyValueStore("myProcessorState"),
+     *                 Serdes.String(),
+     *                 Serdes.String());
+     * // register store
+     * builder.addStateStore(keyValueStoreBuilder);
+     *
+     * inputStream.process(new ProcessorSupplier() { ... }, "myProcessorState");
+     * }</pre>
+     * Within the {@link Processor}, the state is obtained via the
+     * {@link ProcessorContext}.
+     * To trigger periodic actions via {@link org.apache.kafka.streams.processor.Punctuator#punctuate(long) punctuate()},
+     * a schedule must be registered.
+     * <pre>{@code
+     * new ProcessorSupplier() {
+     *     Processor get() {
+     *         return new Processor() {
+     *             private StateStore state;
+     *
+     *             void init(ProcessorContext context) {
+     *                 this.state = context.getStateStore("myProcessorState");
+     *                 // punctuate each second, can access this.state
+     *                 context.schedule(Duration.ofSeconds(1), PunctuationType.WALL_CLOCK_TIME, new Punctuator(..));
+     *             }
+     *
+     *             void process(K key, V value) {
+     *                 // can access this.state
+     *             }
+     *
+     *             void close() {
+     *                 // can access this.state
+     *             }
+     *         }
+     *     }
+     * }
+     * }</pre>
+     * Even if any upstream operation was key-changing, no auto-repartition is triggered.
+     * If repartitioning is required, a call to {@link #through(String)} should be performed before {@code transform()}.
+     *
+     * @param processorSupplier a instance of {@link ProcessorSupplier} that generates a {@link Processor}
+     * @param stateStoreNames   the names of the state store used by the processor
+     * @see #foreach(ForeachAction)
+     * @see #transform(TransformerSupplier, String...)
+     */
+    void process(final ProcessorSupplier<? super K, ? super V> processorSupplier,
+                 final String... stateStoreNames);
 
     /**
      * Process all records in this stream, one record at a time, by applying a {@link Processor} (provided by the given
@@ -1062,11 +2020,13 @@ public interface KStream<K, V> {
      * If repartitioning is required, a call to {@link #through(String)} should be performed before {@code transform()}.
      *
      * @param processorSupplier a instance of {@link ProcessorSupplier} that generates a {@link Processor}
+     * @param named             a {@link Named} config used to name the processor in the topology
      * @param stateStoreNames   the names of the state store used by the processor
      * @see #foreach(ForeachAction)
      * @see #transform(TransformerSupplier, String...)
      */
     void process(final ProcessorSupplier<? super K, ? super V> processorSupplier,
+                 final Named named,
                  final String... stateStoreNames);
 
     /**
@@ -2083,6 +3043,40 @@ public interface KStream<K, V> {
                                      final ValueJoiner<? super V, ? super GV, ? extends RV> joiner);
 
     /**
+     * Join records of this stream with {@link GlobalKTable}'s records using non-windowed inner equi join.
+     * The join is a primary key table lookup join with join attribute
+     * {@code keyValueMapper.map(stream.keyValue) == table.key}.
+     * "Table lookup join" means, that results are only computed if {@code KStream} records are processed.
+     * This is done by performing a lookup for matching records in the <em>current</em> internal {@link GlobalKTable}
+     * state.
+     * In contrast, processing {@link GlobalKTable} input records will only update the internal {@link GlobalKTable}
+     * state and will not produce any result records.
+     * <p>
+     * For each {@code KStream} record that finds a corresponding record in {@link GlobalKTable} the provided
+     * {@link ValueJoiner} will be called to compute a value (with arbitrary type) for the result record.
+     * The key of the result record is the same as the key of this {@code KStream}.
+     * If a {@code KStream} input record key or value is {@code null} the record will not be included in the join
+     * operation and thus no output record will be added to the resulting {@code KStream}.
+     * If {@code keyValueMapper} returns {@code null} implying no match exists, no output record will be added to the
+     * resulting {@code KStream}.
+     *
+     * @param globalKTable   the {@link GlobalKTable} to be joined with this stream
+     * @param keyValueMapper instance of {@link KeyValueMapper} used to map from the (key, value) of this stream
+     *                       to the key of the {@link GlobalKTable}
+     * @param joiner         a {@link ValueJoiner} that computes the join result for a pair of matching records
+     * @param named          a {@link Named} config used to name the processor in the topology
+     * @param <GK>           the key type of {@link GlobalKTable}
+     * @param <GV>           the value type of the {@link GlobalKTable}
+     * @param <RV>           the value type of the resulting {@code KStream}
+     * @return a {@code KStream} that contains join-records for each key and values computed by the given
+     * {@link ValueJoiner}, one output for each input {@code KStream} record
+     * @see #leftJoin(GlobalKTable, KeyValueMapper, ValueJoiner)
+     */
+    <GK, GV, RV> KStream<K, RV> join(final GlobalKTable<GK, GV> globalKTable,
+                                     final KeyValueMapper<? super K, ? super V, ? extends GK> keyValueMapper,
+                                     final ValueJoiner<? super V, ? super GV, ? extends RV> joiner,
+                                     final Named named);
+    /**
      * Join records of this stream with {@link GlobalKTable}'s records using non-windowed left equi join.
      * In contrast to {@link #join(GlobalKTable, KeyValueMapper, ValueJoiner) inner-join}, all records from this stream
      * will produce an output record (cf. below).
@@ -2118,4 +3112,43 @@ public interface KStream<K, V> {
     <GK, GV, RV> KStream<K, RV> leftJoin(final GlobalKTable<GK, GV> globalKTable,
                                          final KeyValueMapper<? super K, ? super V, ? extends GK> keyValueMapper,
                                          final ValueJoiner<? super V, ? super GV, ? extends RV> valueJoiner);
+
+    /**
+     * Join records of this stream with {@link GlobalKTable}'s records using non-windowed left equi join.
+     * In contrast to {@link #join(GlobalKTable, KeyValueMapper, ValueJoiner) inner-join}, all records from this stream
+     * will produce an output record (cf. below).
+     * The join is a primary key table lookup join with join attribute
+     * {@code keyValueMapper.map(stream.keyValue) == table.key}.
+     * "Table lookup join" means, that results are only computed if {@code KStream} records are processed.
+     * This is done by performing a lookup for matching records in the <em>current</em> internal {@link GlobalKTable}
+     * state.
+     * In contrast, processing {@link GlobalKTable} input records will only update the internal {@link GlobalKTable}
+     * state and will not produce any result records.
+     * <p>
+     * For each {@code KStream} record whether or not it finds a corresponding record in {@link GlobalKTable} the
+     * provided {@link ValueJoiner} will be called to compute a value (with arbitrary type) for the result record.
+     * The key of the result record is the same as this {@code KStream}.
+     * If a {@code KStream} input record key or value is {@code null} the record will not be included in the join
+     * operation and thus no output record will be added to the resulting {@code KStream}.
+     * If {@code keyValueMapper} returns {@code null} implying no match exists, a {@code null} value will be
+     * provided to {@link ValueJoiner}.
+     * If no {@link GlobalKTable} record was found during lookup, a {@code null} value will be provided to
+     * {@link ValueJoiner}.
+     *
+     * @param globalKTable   the {@link GlobalKTable} to be joined with this stream
+     * @param keyValueMapper instance of {@link KeyValueMapper} used to map from the (key, value) of this stream
+     *                       to the key of the {@link GlobalKTable}
+     * @param valueJoiner    a {@link ValueJoiner} that computes the join result for a pair of matching records
+     * @param named          a {@link Named} config used to name the processor in the topology
+     * @param <GK>           the key type of {@link GlobalKTable}
+     * @param <GV>           the value type of the {@link GlobalKTable}
+     * @param <RV>           the value type of the resulting {@code KStream}
+     * @return a {@code KStream} that contains join-records for each key and values computed by the given
+     * {@link ValueJoiner}, one output for each input {@code KStream} record
+     * @see #join(GlobalKTable, KeyValueMapper, ValueJoiner)
+     */
+    <GK, GV, RV> KStream<K, RV> leftJoin(final GlobalKTable<GK, GV> globalKTable,
+                                         final KeyValueMapper<? super K, ? super V, ? extends GK> keyValueMapper,
+                                         final ValueJoiner<? super V, ? super GV, ? extends RV> valueJoiner,
+                                         final Named named);
 }
diff --git a/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KStreamImpl.java b/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KStreamImpl.java
index 67729ec..9d76033 100644
--- a/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KStreamImpl.java
+++ b/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KStreamImpl.java
@@ -27,6 +27,7 @@ import org.apache.kafka.streams.kstream.KGroupedStream;
 import org.apache.kafka.streams.kstream.KStream;
 import org.apache.kafka.streams.kstream.KTable;
 import org.apache.kafka.streams.kstream.KeyValueMapper;
+import org.apache.kafka.streams.kstream.Named;
 import org.apache.kafka.streams.kstream.Predicate;
 import org.apache.kafka.streams.kstream.Printed;
 import org.apache.kafka.streams.kstream.Produced;
@@ -128,47 +129,64 @@ public class KStreamImpl<K, V> extends AbstractStream<K, V> implements KStream<K
 
     @Override
     public KStream<K, V> filter(final Predicate<? super K, ? super V> predicate) {
-        Objects.requireNonNull(predicate, "predicate can't be null");
-        final String name = builder.newProcessorName(FILTER_NAME);
-
+        return filter(predicate, NamedInternal.empty());
+    }
 
+    @Override
+    public KStream<K, V> filter(final Predicate<? super K, ? super V> predicate, final Named named) {
+        Objects.requireNonNull(predicate, "predicate can't be null");
+        Objects.requireNonNull(named, "named can't be null");
+        final String name = new NamedInternal(named).orElseGenerateWithPrefix(builder, FILTER_NAME);
         final ProcessorParameters<? super K, ? super V> processorParameters = new ProcessorParameters<>(new KStreamFilter<>(predicate, false), name);
         final ProcessorGraphNode<? super K, ? super V> filterProcessorNode = new ProcessorGraphNode<>(name, processorParameters);
         builder.addGraphNode(this.streamsGraphNode, filterProcessorNode);
 
-        return new KStreamImpl<>(name,
-                                 keySerde,
-                                 valSerde,
-                                 sourceNodes,
-                                 repartitionRequired,
-                                 filterProcessorNode,
-                                 builder);
+        return new KStreamImpl<>(
+                name,
+                keySerde,
+                valSerde,
+                sourceNodes,
+                repartitionRequired,
+                filterProcessorNode,
+                builder);
     }
 
     @Override
     public KStream<K, V> filterNot(final Predicate<? super K, ? super V> predicate) {
-        Objects.requireNonNull(predicate, "predicate can't be null");
-        final String name = builder.newProcessorName(FILTER_NAME);
+        return filterNot(predicate, NamedInternal.empty());
+    }
 
+    @Override
+    public KStream<K, V> filterNot(final Predicate<? super K, ? super V> predicate, final Named named) {
+        Objects.requireNonNull(predicate, "predicate can't be null");
+        Objects.requireNonNull(named, "named can't be null");
+        final String name = new NamedInternal(named).orElseGenerateWithPrefix(builder, FILTER_NAME);
         final ProcessorParameters<? super K, ? super V> processorParameters = new ProcessorParameters<>(new KStreamFilter<>(predicate, true), name);
         final ProcessorGraphNode<? super K, ? super V> filterNotProcessorNode = new ProcessorGraphNode<>(name, processorParameters);
 
         builder.addGraphNode(this.streamsGraphNode, filterNotProcessorNode);
 
-        return new KStreamImpl<>(name,
-                                 keySerde,
-                                 valSerde,
-                                 sourceNodes,
-                                 repartitionRequired,
-                                 filterNotProcessorNode,
-                                 builder);
+        return new KStreamImpl<>(
+                name,
+                keySerde,
+                valSerde,
+                sourceNodes,
+                repartitionRequired,
+                filterNotProcessorNode,
+                builder);
     }
 
     @Override
     public <KR> KStream<KR, V> selectKey(final KeyValueMapper<? super K, ? super V, ? extends KR> mapper) {
+        return selectKey(mapper, NamedInternal.empty());
+    }
+
+    @Override
+    public <KR> KStream<KR, V> selectKey(final KeyValueMapper<? super K, ? super V, ? extends KR> mapper, final Named named) {
         Objects.requireNonNull(mapper, "mapper can't be null");
+        Objects.requireNonNull(named, "named can't be null");
 
-        final ProcessorGraphNode<K, V> selectKeyProcessorNode = internalSelectKey(mapper, NamedInternal.empty());
+        final ProcessorGraphNode<K, V> selectKeyProcessorNode = internalSelectKey(mapper, new NamedInternal(named));
 
         selectKeyProcessorNode.keyChangingOperation(true);
         builder.addGraphNode(this.streamsGraphNode, selectKeyProcessorNode);
@@ -177,7 +195,6 @@ public class KStreamImpl<K, V> extends AbstractStream<K, V> implements KStream<K
         return new KStreamImpl<>(selectKeyProcessorNode.nodeName(), null, valSerde, sourceNodes, true, selectKeyProcessorNode, builder);
     }
 
-
     private <KR> ProcessorGraphNode<K, V> internalSelectKey(final KeyValueMapper<? super K, ? super V, ? extends KR> mapper,
                                                             final NamedInternal named) {
         final String name = named.orElseGenerateWithPrefix(builder, KEY_SELECT_NAME);
@@ -190,9 +207,14 @@ public class KStreamImpl<K, V> extends AbstractStream<K, V> implements KStream<K
 
     @Override
     public <KR, VR> KStream<KR, VR> map(final KeyValueMapper<? super K, ? super V, ? extends KeyValue<? extends KR, ? extends VR>> mapper) {
-        Objects.requireNonNull(mapper, "mapper can't be null");
-        final String name = builder.newProcessorName(MAP_NAME);
+        return map(mapper, NamedInternal.empty());
+    }
 
+    @Override
+    public <KR, VR> KStream<KR, VR> map(final KeyValueMapper<? super K, ? super V, ? extends KeyValue<? extends KR, ? extends VR>> mapper, final Named named) {
+        Objects.requireNonNull(mapper, "mapper can't be null");
+        Objects.requireNonNull(named, "named can't be null");
+        final String name = new NamedInternal(named).orElseGenerateWithPrefix(builder, MAP_NAME);
         final ProcessorParameters<? super K, ? super V> processorParameters = new ProcessorParameters<>(new KStreamMap<>(mapper), name);
 
         final ProcessorGraphNode<? super K, ? super V> mapProcessorNode = new ProcessorGraphNode<>(name, processorParameters);
@@ -201,26 +223,36 @@ public class KStreamImpl<K, V> extends AbstractStream<K, V> implements KStream<K
         builder.addGraphNode(this.streamsGraphNode, mapProcessorNode);
 
         // key and value serde cannot be preserved
-        return new KStreamImpl<>(name,
-                                 null,
-                                 null,
-                                 sourceNodes,
-                                 true,
-                                 mapProcessorNode,
-                                 builder);
+        return new KStreamImpl<>(
+                name,
+                null,
+                null,
+                sourceNodes,
+                true,
+                mapProcessorNode,
+                builder);
     }
 
-
     @Override
     public <VR> KStream<K, VR> mapValues(final ValueMapper<? super V, ? extends VR> mapper) {
         return mapValues(withKey(mapper));
     }
 
     @Override
+    public <VR> KStream<K, VR> mapValues(final ValueMapper<? super V, ? extends VR> mapper, final Named named) {
+        return mapValues(withKey(mapper), named);
+    }
+
+    @Override
     public <VR> KStream<K, VR> mapValues(final ValueMapperWithKey<? super K, ? super V, ? extends VR> mapper) {
-        Objects.requireNonNull(mapper, "mapper can't be null");
-        final String name = builder.newProcessorName(MAPVALUES_NAME);
+        return mapValues(mapper, NamedInternal.empty());
+    }
 
+    @Override
+    public <VR> KStream<K, VR> mapValues(final ValueMapperWithKey<? super K, ? super V, ? extends VR> mapper, final Named named) {
+        Objects.requireNonNull(mapper, "mapper can't be null");
+        Objects.requireNonNull(mapper, "named can't be null");
+        final String name = new NamedInternal(named).orElseGenerateWithPrefix(builder, MAPVALUES_NAME);
         final ProcessorParameters<? super K, ? super V> processorParameters = new ProcessorParameters<>(new KStreamMapValues<>(mapper), name);
         final ProcessorGraphNode<? super K, ? super V> mapValuesProcessorNode = new ProcessorGraphNode<>(name, processorParameters);
 
@@ -228,13 +260,14 @@ public class KStreamImpl<K, V> extends AbstractStream<K, V> implements KStream<K
         builder.addGraphNode(this.streamsGraphNode, mapValuesProcessorNode);
 
         // value serde cannot be preserved
-        return new KStreamImpl<>(name,
-                                 keySerde,
-                                 null,
-                                 sourceNodes,
-                                 repartitionRequired,
-                                 mapValuesProcessorNode,
-                                 builder);
+        return new KStreamImpl<>(
+                name,
+                keySerde,
+                null,
+                sourceNodes,
+                repartitionRequired,
+                mapValuesProcessorNode,
+                builder);
     }
 
     @Override
@@ -250,9 +283,15 @@ public class KStreamImpl<K, V> extends AbstractStream<K, V> implements KStream<K
 
     @Override
     public <KR, VR> KStream<KR, VR> flatMap(final KeyValueMapper<? super K, ? super V, ? extends Iterable<? extends KeyValue<? extends KR, ? extends VR>>> mapper) {
-        Objects.requireNonNull(mapper, "mapper can't be null");
-        final String name = builder.newProcessorName(FLATMAP_NAME);
+        return flatMap(mapper, NamedInternal.empty());
+    }
 
+    @Override
+    public <KR, VR> KStream<KR, VR> flatMap(final KeyValueMapper<? super K, ? super V, ? extends Iterable<? extends KeyValue<? extends KR, ? extends VR>>> mapper,
+                                            final Named named) {
+        Objects.requireNonNull(mapper, "mapper can't be null");
+        Objects.requireNonNull(named, "named can't be null");
+        final String name = new NamedInternal(named).orElseGenerateWithPrefix(builder, FLATMAP_NAME);
         final ProcessorParameters<? super K, ? super V> processorParameters = new ProcessorParameters<>(new KStreamFlatMap<>(mapper), name);
         final ProcessorGraphNode<? super K, ? super V> flatMapNode = new ProcessorGraphNode<>(name, processorParameters);
         flatMapNode.keyChangingOperation(true);
@@ -260,13 +299,7 @@ public class KStreamImpl<K, V> extends AbstractStream<K, V> implements KStream<K
         builder.addGraphNode(this.streamsGraphNode, flatMapNode);
 
         // key and value serde cannot be preserved
-        return new KStreamImpl<>(name,
-                                 null,
-                                 null,
-                                 sourceNodes,
-                                 true,
-                                 flatMapNode,
-                                 builder);
+        return new KStreamImpl<>(name, null, null, sourceNodes, true, flatMapNode, builder);
     }
 
     @Override
@@ -275,10 +308,22 @@ public class KStreamImpl<K, V> extends AbstractStream<K, V> implements KStream<K
     }
 
     @Override
+    public <VR> KStream<K, VR> flatMapValues(final ValueMapper<? super V, ? extends Iterable<? extends VR>> mapper,
+                                             final Named named) {
+        return flatMapValues(withKey(mapper), named);
+    }
+
+    @Override
     public <VR> KStream<K, VR> flatMapValues(final ValueMapperWithKey<? super K, ? super V, ? extends Iterable<? extends VR>> mapper) {
-        Objects.requireNonNull(mapper, "mapper can't be null");
-        final String name = builder.newProcessorName(FLATMAPVALUES_NAME);
+        return flatMapValues(mapper, NamedInternal.empty());
+    }
 
+    @Override
+    public <VR> KStream<K, VR> flatMapValues(final ValueMapperWithKey<? super K, ? super V, ? extends Iterable<? extends VR>> mapper,
+                                             final Named named) {
+        Objects.requireNonNull(mapper, "mapper can't be null");
+        Objects.requireNonNull(named, "named can't be null");
+        final String name = new NamedInternal(named).orElseGenerateWithPrefix(builder, FLATMAPVALUES_NAME);
         final ProcessorParameters<? super K, ? super V> processorParameters = new ProcessorParameters<>(new KStreamFlatMapValues<>(mapper), name);
         final ProcessorGraphNode<? super K, ? super V> flatMapValuesNode = new ProcessorGraphNode<>(name, processorParameters);
 
@@ -292,6 +337,19 @@ public class KStreamImpl<K, V> extends AbstractStream<K, V> implements KStream<K
     @Override
     @SuppressWarnings("unchecked")
     public KStream<K, V>[] branch(final Predicate<? super K, ? super V>... predicates) {
+        return doBranch(NamedInternal.empty(), predicates);
+    }
+
+    @Override
+    @SuppressWarnings("unchecked")
+    public KStream<K, V>[] branch(final Named name, final Predicate<? super K, ? super V>... predicates) {
+        Objects.requireNonNull(name, "name can't be null");
+        return doBranch(new NamedInternal(name), predicates);
+    }
+
+    @SuppressWarnings("unchecked")
+    private KStream<K, V>[] doBranch(final NamedInternal named,
+                                     final Predicate<? super K, ? super V>... predicates) {
         if (predicates.length == 0) {
             throw new IllegalArgumentException("you must provide at least one predicate");
         }
@@ -299,11 +357,11 @@ public class KStreamImpl<K, V> extends AbstractStream<K, V> implements KStream<K
             Objects.requireNonNull(predicate, "predicates can't have null values");
         }
 
-        final String branchName = builder.newProcessorName(BRANCH_NAME);
+        final String branchName = named.orElseGenerateWithPrefix(builder, BRANCH_NAME);
 
         final String[] childNames = new String[predicates.length];
         for (int i = 0; i < predicates.length; i++) {
-            childNames[i] = builder.newProcessorName(BRANCHCHILD_NAME);
+            childNames[i] = named.suffixWithOrElseGet("-predicate-" + i, builder, BRANCHCHILD_NAME);
         }
 
         final ProcessorParameters processorParameters = new ProcessorParameters<>(new KStreamBranch(predicates.clone(), childNames), branchName);
@@ -326,13 +384,20 @@ public class KStreamImpl<K, V> extends AbstractStream<K, V> implements KStream<K
     @Override
     public KStream<K, V> merge(final KStream<K, V> stream) {
         Objects.requireNonNull(stream);
-        return merge(builder, stream);
+        return merge(builder, stream, NamedInternal.empty());
+    }
+
+    @Override
+    public KStream<K, V> merge(final KStream<K, V> stream, final Named processorName) {
+        Objects.requireNonNull(stream);
+        return merge(builder, stream, new NamedInternal(processorName));
     }
 
     private KStream<K, V> merge(final InternalStreamsBuilder builder,
-                                final KStream<K, V> stream) {
+                                final KStream<K, V> stream,
+                                final NamedInternal processorName) {
         final KStreamImpl<K, V> streamImpl = (KStreamImpl<K, V>) stream;
-        final String name = builder.newProcessorName(MERGE_NAME);
+        final String name = processorName.orElseGenerateWithPrefix(builder, MERGE_NAME);
         final Set<String> allSourceNodes = new HashSet<>();
 
         final boolean requireRepartitioning = streamImpl.repartitionRequired || repartitionRequired;
@@ -341,9 +406,7 @@ public class KStreamImpl<K, V> extends AbstractStream<K, V> implements KStream<K
 
         final ProcessorParameters<? super K, ? super V> processorParameters = new ProcessorParameters<>(new KStreamPassThrough<>(), name);
 
-
         final ProcessorGraphNode<? super K, ? super V> mergeNode = new ProcessorGraphNode<>(name, processorParameters);
-
         mergeNode.setMergeNode(true);
         builder.addGraphNode(Arrays.asList(this.streamsGraphNode, streamImpl.streamsGraphNode), mergeNode);
 
@@ -353,12 +416,16 @@ public class KStreamImpl<K, V> extends AbstractStream<K, V> implements KStream<K
 
     @Override
     public void foreach(final ForeachAction<? super K, ? super V> action) {
-        Objects.requireNonNull(action, "action can't be null");
-        final String name = builder.newProcessorName(FOREACH_NAME);
+        foreach(action, NamedInternal.empty());
+    }
 
+    @Override
+    public void foreach(final ForeachAction<? super K, ? super V> action, final Named named) {
+        Objects.requireNonNull(action, "action can't be null");
+        final String name = new NamedInternal(named).orElseGenerateWithPrefix(builder, FOREACH_NAME);
         final ProcessorParameters<? super K, ? super V> processorParameters = new ProcessorParameters<>(
-            new KStreamPeek<>(action, false),
-            name
+                new KStreamPeek<>(action, false),
+                name
         );
 
         final ProcessorGraphNode<? super K, ? super V> foreachNode = new ProcessorGraphNode<>(name, processorParameters);
@@ -367,12 +434,18 @@ public class KStreamImpl<K, V> extends AbstractStream<K, V> implements KStream<K
 
     @Override
     public KStream<K, V> peek(final ForeachAction<? super K, ? super V> action) {
+        return peek(action, NamedInternal.empty());
+    }
+
+    @Override
+    public KStream<K, V> peek(final ForeachAction<? super K, ? super V> action, final Named named) {
         Objects.requireNonNull(action, "action can't be null");
-        final String name = builder.newProcessorName(PEEK_NAME);
+        Objects.requireNonNull(named, "named can't be null");
+        final String name = new NamedInternal(named).orElseGenerateWithPrefix(builder, PEEK_NAME);
 
         final ProcessorParameters<? super K, ? super V> processorParameters = new ProcessorParameters<>(
-            new KStreamPeek<>(action, true),
-            name
+                new KStreamPeek<>(action, true),
+                name
         );
 
         final ProcessorGraphNode<? super K, ? super V> peekNode = new ProcessorGraphNode<>(name, processorParameters);
@@ -459,56 +532,89 @@ public class KStreamImpl<K, V> extends AbstractStream<K, V> implements KStream<K
         builder.addGraphNode(this.streamsGraphNode, sinkNode);
     }
 
-    private <K1, V1> KStream<K1, V1> doFlatTransform(final TransformerSupplier<? super K, ? super V, Iterable<KeyValue<K1, V1>>> transformerSupplier,
-                                                     final String... stateStoreNames) {
+    @Override
+    public <KR, VR> KStream<KR, VR> transform(final TransformerSupplier<? super K, ? super V, KeyValue<KR, VR>> transformerSupplier,
+                                              final String... stateStoreNames) {
+        Objects.requireNonNull(transformerSupplier, "transformerSupplier can't be null");
         final String name = builder.newProcessorName(TRANSFORM_NAME);
-        final StatefulProcessorNode<? super K, ? super V> transformNode = new StatefulProcessorNode<>(
-            name,
-            new ProcessorParameters<>(new KStreamFlatTransform<>(transformerSupplier), name),
-            stateStoreNames
-        );
-
-        transformNode.keyChangingOperation(true);
-        builder.addGraphNode(this.streamsGraphNode, transformNode);
-
-        // cannot inherit key and value serde
-        return new KStreamImpl<>(name, null, null, sourceNodes, true, transformNode, builder);
+        return flatTransform(new TransformerSupplierAdapter<>(transformerSupplier), Named.as(name), stateStoreNames);
     }
 
     @Override
     public <KR, VR> KStream<KR, VR> transform(final TransformerSupplier<? super K, ? super V, KeyValue<KR, VR>> transformerSupplier,
+                                              final Named named,
                                               final String... stateStoreNames) {
         Objects.requireNonNull(transformerSupplier, "transformerSupplier can't be null");
-        return doFlatTransform(new TransformerSupplierAdapter<>(transformerSupplier), stateStoreNames);
+        return flatTransform(new TransformerSupplierAdapter<>(transformerSupplier), named, stateStoreNames);
+    }
+
+    @Override
+    public <K1, V1> KStream<K1, V1> flatTransform(final TransformerSupplier<? super K, ? super V, Iterable<KeyValue<K1, V1>>> transformerSupplier,
+                                                  final String... stateStoreNames) {
+        Objects.requireNonNull(transformerSupplier, "transformerSupplier can't be null");
+        final String name = builder.newProcessorName(TRANSFORM_NAME);
+        return flatTransform(transformerSupplier, Named.as(name), stateStoreNames);
     }
 
     @Override
     public <K1, V1> KStream<K1, V1> flatTransform(final TransformerSupplier<? super K, ? super V, Iterable<KeyValue<K1, V1>>> transformerSupplier,
+                                                  final Named named,
                                                   final String... stateStoreNames) {
         Objects.requireNonNull(transformerSupplier, "transformerSupplier can't be null");
-        return doFlatTransform(transformerSupplier, stateStoreNames);
+        Objects.requireNonNull(named, "named can't be null");
+
+        final String name = new NamedInternal(named).name();
+        final StatefulProcessorNode<? super K, ? super V> transformNode = new StatefulProcessorNode<>(
+                name,
+                new ProcessorParameters<>(new KStreamFlatTransform<>(transformerSupplier), name),
+                stateStoreNames
+        );
+
+        transformNode.keyChangingOperation(true);
+        builder.addGraphNode(streamsGraphNode, transformNode);
+
+        // cannot inherit key and value serde
+        return new KStreamImpl<>(name, null, null, sourceNodes, true, transformNode, builder);
     }
 
     @Override
     public <VR> KStream<K, VR> transformValues(final ValueTransformerSupplier<? super V, ? extends VR> valueTransformerSupplier,
                                                final String... stateStoreNames) {
         Objects.requireNonNull(valueTransformerSupplier, "valueTransformerSupplier can't be null");
+        return doTransformValues(toValueTransformerWithKeySupplier(valueTransformerSupplier), NamedInternal.empty(), stateStoreNames);
+    }
 
-        return doTransformValues(toValueTransformerWithKeySupplier(valueTransformerSupplier), stateStoreNames);
+    @Override
+    public <VR> KStream<K, VR> transformValues(final ValueTransformerSupplier<? super V, ? extends VR> valueTransformerSupplier,
+                                               final Named named,
+                                               final String... stateStoreNames) {
+        Objects.requireNonNull(valueTransformerSupplier, "valueTransformSupplier can't be null");
+        Objects.requireNonNull(named, "named can't be null");
+        return doTransformValues(toValueTransformerWithKeySupplier(valueTransformerSupplier),
+                new NamedInternal(named), stateStoreNames);
     }
 
     @Override
     public <VR> KStream<K, VR> transformValues(final ValueTransformerWithKeySupplier<? super K, ? super V, ? extends VR> valueTransformerSupplier,
                                                final String... stateStoreNames) {
         Objects.requireNonNull(valueTransformerSupplier, "valueTransformerSupplier can't be null");
+        return doTransformValues(valueTransformerSupplier, NamedInternal.empty(), stateStoreNames);
+    }
 
-        return doTransformValues(valueTransformerSupplier, stateStoreNames);
+    @Override
+    public <VR> KStream<K, VR> transformValues(final ValueTransformerWithKeySupplier<? super K, ? super V, ? extends VR> valueTransformerSupplier,
+                                               final Named named,
+                                               final String... stateStoreNames) {
+        Objects.requireNonNull(valueTransformerSupplier, "valueTransformSupplier can't be null");
+        Objects.requireNonNull(named, "named can't be null");
+        return doTransformValues(valueTransformerSupplier, new NamedInternal(named), stateStoreNames);
     }
 
     private <VR> KStream<K, VR> doTransformValues(final ValueTransformerWithKeySupplier<? super K, ? super V, ? extends VR> valueTransformerWithKeySupplier,
+                                                  final NamedInternal named,
                                                   final String... stateStoreNames) {
-        final String name = builder.newProcessorName(TRANSFORMVALUES_NAME);
 
+        final String name = named.orElseGenerateWithPrefix(builder, TRANSFORMVALUES_NAME);
         final StatefulProcessorNode<? super K, ? super V> transformNode = new StatefulProcessorNode<>(
             name,
             new ProcessorParameters<>(new KStreamTransformValues<>(valueTransformerWithKeySupplier), name),
@@ -527,20 +633,39 @@ public class KStreamImpl<K, V> extends AbstractStream<K, V> implements KStream<K
                                                    final String... stateStoreNames) {
         Objects.requireNonNull(valueTransformerSupplier, "valueTransformerSupplier can't be null");
 
-        return doFlatTransformValues(toValueTransformerWithKeySupplier(valueTransformerSupplier), stateStoreNames);
+        return doFlatTransformValues(toValueTransformerWithKeySupplier(valueTransformerSupplier), NamedInternal.empty(), stateStoreNames);
+    }
+
+    @Override
+    public <VR> KStream<K, VR> flatTransformValues(final ValueTransformerSupplier<? super V, Iterable<VR>> valueTransformerSupplier,
+                                                   final Named named,
+                                                   final String... stateStoreNames) {
+        Objects.requireNonNull(valueTransformerSupplier, "valueTransformerSupplier can't be null");
+
+        return doFlatTransformValues(toValueTransformerWithKeySupplier(valueTransformerSupplier), named, stateStoreNames);
+    }
+
+    @Override
+    public <VR> KStream<K, VR> flatTransformValues(final ValueTransformerWithKeySupplier<? super K, ? super V, Iterable<VR>> valueTransformerSupplier,
+                                                   final String... stateStoreNames) {
+        Objects.requireNonNull(valueTransformerSupplier, "valueTransformerSupplier can't be null");
+
+        return doFlatTransformValues(valueTransformerSupplier, NamedInternal.empty(), stateStoreNames);
     }
 
     @Override
     public <VR> KStream<K, VR> flatTransformValues(final ValueTransformerWithKeySupplier<? super K, ? super V, Iterable<VR>> valueTransformerSupplier,
+                                                   final Named named,
                                                    final String... stateStoreNames) {
         Objects.requireNonNull(valueTransformerSupplier, "valueTransformerSupplier can't be null");
 
-        return doFlatTransformValues(valueTransformerSupplier, stateStoreNames);
+        return doFlatTransformValues(valueTransformerSupplier, named, stateStoreNames);
     }
 
     private <VR> KStream<K, VR> doFlatTransformValues(final ValueTransformerWithKeySupplier<? super K, ? super V, Iterable<VR>> valueTransformerWithKeySupplier,
+                                                      final Named named,
                                                       final String... stateStoreNames) {
-        final String name = builder.newProcessorName(TRANSFORMVALUES_NAME);
+        final String name = new NamedInternal(named).orElseGenerateWithPrefix(builder, TRANSFORMVALUES_NAME);
 
         final StatefulProcessorNode<? super K, ? super V> transformNode = new StatefulProcessorNode<>(
             name,
@@ -561,11 +686,21 @@ public class KStreamImpl<K, V> extends AbstractStream<K, V> implements KStream<K
 
         Objects.requireNonNull(processorSupplier, "ProcessSupplier cant' be null");
         final String name = builder.newProcessorName(PROCESSOR_NAME);
+        process(processorSupplier, Named.as(name), stateStoreNames);
+    }
+
+    @Override
+    public void process(final ProcessorSupplier<? super K, ? super V> processorSupplier,
+                        final Named named,
+                        final String... stateStoreNames) {
+        Objects.requireNonNull(processorSupplier, "ProcessSupplier cant' be null");
+        Objects.requireNonNull(named, "named cant' be null");
 
+        final String name = new NamedInternal(named).name();
         final StatefulProcessorNode<? super K, ? super V> processNode = new StatefulProcessorNode<>(
-            name,
-            new ProcessorParameters<>(processorSupplier, name),
-            stateStoreNames
+                name,
+                new ProcessorParameters<>(processorSupplier, name),
+                stateStoreNames
         );
 
         builder.addGraphNode(this.streamsGraphNode, processNode);
@@ -621,14 +756,16 @@ public class KStreamImpl<K, V> extends AbstractStream<K, V> implements KStream<K
         KStreamImpl<K, VO> joinOther = (KStreamImpl<K, VO>) other;
 
         final JoinedInternal<K, V, VO> joinedInternal = new JoinedInternal<>(joined);
-        final String name = joinedInternal.name();
+        final NamedInternal name = new NamedInternal(joinedInternal.name());
         if (joinThis.repartitionRequired) {
-            final String leftJoinRepartitionTopicName = name != null ? name + "-left" : joinThis.name;
+            final String joinThisName = joinThis.name;
+            final String leftJoinRepartitionTopicName = name.suffixWithOrElseGet("-left", joinThisName);
             joinThis = joinThis.repartitionForJoin(leftJoinRepartitionTopicName, joined.keySerde(), joined.valueSerde());
         }
 
         if (joinOther.repartitionRequired) {
-            final String rightJoinRepartitionTopicName = name != null ? name + "-right" : joinOther.name;
+            final String joinOtherName = joinOther.name;
+            final String rightJoinRepartitionTopicName = name.suffixWithOrElseGet("-right", joinOtherName);
             joinOther = joinOther.repartitionForJoin(rightJoinRepartitionTopicName, joined.keySerde(), joined.otherValueSerde());
         }
 
@@ -779,26 +916,45 @@ public class KStreamImpl<K, V> extends AbstractStream<K, V> implements KStream<K
     public <KG, VG, VR> KStream<K, VR> join(final GlobalKTable<KG, VG> globalTable,
                                             final KeyValueMapper<? super K, ? super V, ? extends KG> keyMapper,
                                             final ValueJoiner<? super V, ? super VG, ? extends VR> joiner) {
-        return globalTableJoin(globalTable, keyMapper, joiner, false);
+        return globalTableJoin(globalTable, keyMapper, joiner, false, NamedInternal.empty());
+    }
+
+    @Override
+    public <KG, VG, VR> KStream<K, VR> join(final GlobalKTable<KG, VG> globalTable,
+                                            final KeyValueMapper<? super K, ? super V, ? extends KG> keyMapper,
+                                            final ValueJoiner<? super V, ? super VG, ? extends VR> joiner,
+                                            final Named named) {
+        return globalTableJoin(globalTable, keyMapper, joiner, false, named);
     }
 
     @Override
     public <KG, VG, VR> KStream<K, VR> leftJoin(final GlobalKTable<KG, VG> globalTable,
                                                 final KeyValueMapper<? super K, ? super V, ? extends KG> keyMapper,
                                                 final ValueJoiner<? super V, ? super VG, ? extends VR> joiner) {
-        return globalTableJoin(globalTable, keyMapper, joiner, true);
+        return globalTableJoin(globalTable, keyMapper, joiner, true, NamedInternal.empty());
     }
 
+    @Override
+    public <KG, VG, VR> KStream<K, VR> leftJoin(final GlobalKTable<KG, VG> globalTable,
+                                                final KeyValueMapper<? super K, ? super V, ? extends KG> keyMapper,
+                                                final ValueJoiner<? super V, ? super VG, ? extends VR> joiner,
+                                                final Named named) {
+        return globalTableJoin(globalTable, keyMapper, joiner, true, named);
+    }
+
+
     private <KG, VG, VR> KStream<K, VR> globalTableJoin(final GlobalKTable<KG, VG> globalTable,
                                                         final KeyValueMapper<? super K, ? super V, ? extends KG> keyMapper,
                                                         final ValueJoiner<? super V, ? super VG, ? extends VR> joiner,
-                                                        final boolean leftJoin) {
+                                                        final boolean leftJoin,
+                                                        final Named named) {
         Objects.requireNonNull(globalTable, "globalTable can't be null");
         Objects.requireNonNull(keyMapper, "keyMapper can't be null");
         Objects.requireNonNull(joiner, "joiner can't be null");
+        Objects.requireNonNull(named, "named can't be null");
 
         final KTableValueGetterSupplier<KG, VG> valueGetterSupplier = ((GlobalKTableImpl<KG, VG>) globalTable).valueGetterSupplier();
-        final String name = builder.newProcessorName(LEFTJOIN_NAME);
+        final String name = new NamedInternal(named).orElseGenerateWithPrefix(builder, LEFTJOIN_NAME);
 
         final ProcessorSupplier<K, V> processorSupplier = new KStreamGlobalKTableJoin<>(
             valueGetterSupplier,
@@ -828,7 +984,10 @@ public class KStreamImpl<K, V> extends AbstractStream<K, V> implements KStream<K
 
         final Set<String> allSourceNodes = ensureJoinableWith((AbstractStream<K, VO>) other);
 
-        final String name = builder.newProcessorName(leftJoin ? LEFTJOIN_NAME : JOIN_NAME);
+        final JoinedInternal<K, V, VO> joinedInternal = new JoinedInternal<>(joined);
+        final NamedInternal renamed = new NamedInternal(joinedInternal.name());
+
+        final String name = renamed.orElseGenerateWithPrefix(builder, leftJoin ? LEFTJOIN_NAME : JOIN_NAME);
         final ProcessorSupplier<K, V> processorSupplier = new KStreamKTableJoin<>(
             ((KTableImpl<K, ?, VO>) other).valueGetterSupplier(),
             joiner,
@@ -945,12 +1104,23 @@ public class KStreamImpl<K, V> extends AbstractStream<K, V> implements KStream<K
                                                    final ValueJoiner<? super V1, ? super V2, ? extends R> joiner,
                                                    final JoinWindows windows,
                                                    final Joined<K1, V1, V2> joined) {
-            final String thisWindowStreamName = builder.newProcessorName(WINDOWED_NAME);
-            final String otherWindowStreamName = builder.newProcessorName(WINDOWED_NAME);
-            final String joinThisName = rightOuter ? builder.newProcessorName(OUTERTHIS_NAME) : builder.newProcessorName(JOINTHIS_NAME);
-            final String joinOtherName = leftOuter ? builder.newProcessorName(OUTEROTHER_NAME) : builder.newProcessorName(JOINOTHER_NAME);
-            final String joinMergeName = builder.newProcessorName(MERGE_NAME);
 
+            final JoinedInternal<K1, V1, V2>  joinedInternal = new JoinedInternal<>(joined);
+            final NamedInternal renamed = new NamedInternal(joinedInternal.name());
+
+            final String thisWindowStreamName =  renamed.suffixWithOrElseGet(
+                    "-this-windowed", builder, WINDOWED_NAME);
+            final String otherWindowStreamName = renamed.suffixWithOrElseGet(
+                    "-other-windowed", builder, WINDOWED_NAME);
+
+            final String joinThisName = rightOuter ?
+                    renamed.suffixWithOrElseGet("-outer-this-join", builder, OUTERTHIS_NAME)
+                    : renamed.suffixWithOrElseGet("-this-join", builder, JOINTHIS_NAME);
+            final String joinOtherName = leftOuter ?
+                    renamed.suffixWithOrElseGet("-outer-other-join", builder, OUTEROTHER_NAME)
+                    : renamed.suffixWithOrElseGet("-other-join", builder, JOINOTHER_NAME);
+            final String joinMergeName = renamed.suffixWithOrElseGet(
+                    "-merge", builder, MERGE_NAME);
             final StreamsGraphNode thisStreamsGraphNode = ((AbstractStream) lhs).streamsGraphNode;
             final StreamsGraphNode otherStreamsGraphNode = ((AbstractStream) other).streamsGraphNode;
 
diff --git a/streams/src/main/java/org/apache/kafka/streams/kstream/internals/NamedInternal.java b/streams/src/main/java/org/apache/kafka/streams/kstream/internals/NamedInternal.java
index d478e9b..532928a 100644
--- a/streams/src/main/java/org/apache/kafka/streams/kstream/internals/NamedInternal.java
+++ b/streams/src/main/java/org/apache/kafka/streams/kstream/internals/NamedInternal.java
@@ -21,7 +21,7 @@ import org.apache.kafka.streams.kstream.Named;
 public class NamedInternal extends Named {
 
     public static NamedInternal empty() {
-        return new NamedInternal(null);
+        return new NamedInternal((String) null);
     }
 
     public static NamedInternal with(final String name) {
@@ -31,6 +31,15 @@ public class NamedInternal extends Named {
     /**
      * Creates a new {@link NamedInternal} instance.
      *
+     * @param internal  the internal name.
+     */
+    NamedInternal(final Named internal) {
+        super(internal);
+    }
+
+    /**
+     * Creates a new {@link NamedInternal} instance.
+     *
      * @param internal the internal name.
      */
     NamedInternal(final String internal) {
@@ -48,6 +57,14 @@ public class NamedInternal extends Named {
     public NamedInternal withName(final String name) {
         return new NamedInternal(name);
     }
+    
+    String suffixWithOrElseGet(final String suffix, final String other) {
+        if (name != null) {
+            return name + suffix;
+        } else {
+            return other;
+        }
+    }
 
     String suffixWithOrElseGet(final String suffix, final InternalNameProvider provider, final String prefix) {
         // We actually do not need to generate processor names for operation if a name is specified.
diff --git a/streams/src/test/java/org/apache/kafka/streams/StreamsBuilderTest.java b/streams/src/test/java/org/apache/kafka/streams/StreamsBuilderTest.java
index 93d444b..49477b0 100644
--- a/streams/src/test/java/org/apache/kafka/streams/StreamsBuilderTest.java
+++ b/streams/src/test/java/org/apache/kafka/streams/StreamsBuilderTest.java
@@ -24,10 +24,16 @@ import org.apache.kafka.common.utils.Utils;
 import org.apache.kafka.streams.errors.TopologyException;
 import org.apache.kafka.streams.kstream.Consumed;
 import org.apache.kafka.streams.kstream.ForeachAction;
+import org.apache.kafka.streams.kstream.JoinWindows;
+import org.apache.kafka.streams.kstream.Joined;
 import org.apache.kafka.streams.kstream.KStream;
 import org.apache.kafka.streams.kstream.KTable;
 import org.apache.kafka.streams.kstream.Materialized;
+import org.apache.kafka.streams.kstream.Named;
+import org.apache.kafka.streams.kstream.Printed;
 import org.apache.kafka.streams.kstream.Produced;
+import org.apache.kafka.streams.kstream.ValueTransformer;
+import org.apache.kafka.streams.kstream.ValueTransformerWithKey;
 import org.apache.kafka.streams.processor.StateStore;
 import org.apache.kafka.streams.processor.internals.InternalTopologyBuilder;
 import org.apache.kafka.streams.processor.internals.ProcessorNode;
@@ -39,9 +45,9 @@ import org.apache.kafka.test.MockPredicate;
 import org.apache.kafka.test.MockProcessorSupplier;
 import org.apache.kafka.test.MockValueJoiner;
 import org.apache.kafka.test.StreamsTestUtils;
-import org.junit.Assert;
 import org.junit.Test;
 
+import java.time.Duration;
 import java.util.Arrays;
 import java.util.Collections;
 import java.util.HashMap;
@@ -59,6 +65,8 @@ public class StreamsBuilderTest {
 
     private static final String STREAM_TOPIC     = "stream-topic";
 
+    private static final String STREAM_OPERATION_NAME = "stream-operation";
+
     private static final String STREAM_TOPIC_TWO = "stream-topic-two";
 
     private static final String TABLE_TOPIC      = "table-topic";
@@ -470,16 +478,243 @@ public class StreamsBuilderTest {
         assertSpecifiedNameForOperation(topology, "KSTREAM-SOURCE-0000000000", expected, "KSTREAM-SINK-0000000002");
     }
 
-    private void assertSpecifiedNameForOperation(final ProcessorTopology topology, final String... expected) {
+    @Test
+    public void shouldUseSpecifiedNameForMapOperation() {
+        builder.stream(STREAM_TOPIC).map(KeyValue::pair, Named.as(STREAM_OPERATION_NAME));
+        builder.build();
+        final ProcessorTopology topology = builder.internalTopologyBuilder.rewriteTopology(new StreamsConfig(props)).build();
+        assertSpecifiedNameForOperation(topology, "KSTREAM-SOURCE-0000000000", STREAM_OPERATION_NAME);
+    }
+
+    @Test
+    public void shouldUseSpecifiedNameForMapValuesOperation() {
+        builder.stream(STREAM_TOPIC).mapValues(v -> v, Named.as(STREAM_OPERATION_NAME));
+        builder.build();
+        final ProcessorTopology topology = builder.internalTopologyBuilder.rewriteTopology(new StreamsConfig(props)).build();
+        assertSpecifiedNameForOperation(topology, "KSTREAM-SOURCE-0000000000", STREAM_OPERATION_NAME);
+    }
+
+    @Test
+    public void shouldUseSpecifiedNameForMapValuesWithKeyOperation() {
+        builder.stream(STREAM_TOPIC).mapValues((k, v) -> v, Named.as(STREAM_OPERATION_NAME));
+        builder.build();
+        final ProcessorTopology topology = builder.internalTopologyBuilder.rewriteTopology(new StreamsConfig(props)).build();
+        assertSpecifiedNameForOperation(topology, "KSTREAM-SOURCE-0000000000", STREAM_OPERATION_NAME);
+    }
+
+    @Test
+    public void shouldUseSpecifiedNameForFilterOperation() {
+        builder.stream(STREAM_TOPIC).filter((k, v) -> true, Named.as(STREAM_OPERATION_NAME));
+        builder.build();
+        final ProcessorTopology topology = builder.internalTopologyBuilder.rewriteTopology(new StreamsConfig(props)).build();
+        assertSpecifiedNameForOperation(topology, "KSTREAM-SOURCE-0000000000", STREAM_OPERATION_NAME);
+    }
+
+    @Test
+    public void shouldUseSpecifiedNameForForEachOperation() {
+        builder.stream(STREAM_TOPIC).foreach((k, v) -> { }, Named.as(STREAM_OPERATION_NAME));
+        builder.build();
+        final ProcessorTopology topology = builder.internalTopologyBuilder.rewriteTopology(new StreamsConfig(props)).build();
+        assertSpecifiedNameForOperation(topology, "KSTREAM-SOURCE-0000000000", STREAM_OPERATION_NAME);
+    }
+
+    @Test
+    public void shouldUseSpecifiedNameForTransform() {
+        builder.stream(STREAM_TOPIC).transform(() -> null, Named.as(STREAM_OPERATION_NAME));
+        builder.build();
+        final ProcessorTopology topology = builder.internalTopologyBuilder.rewriteTopology(new StreamsConfig(props)).build();
+        assertSpecifiedNameForOperation(topology, "KSTREAM-SOURCE-0000000000", STREAM_OPERATION_NAME);
+    }
+
+    @Test
+    @SuppressWarnings("unchecked")
+    public void shouldUseSpecifiedNameForTransformValues() {
+        builder.stream(STREAM_TOPIC).transformValues(() -> (ValueTransformer) null, Named.as(STREAM_OPERATION_NAME));
+        builder.build();
+        final ProcessorTopology topology = builder.internalTopologyBuilder.rewriteTopology(new StreamsConfig(props)).build();
+        assertSpecifiedNameForOperation(topology, "KSTREAM-SOURCE-0000000000", STREAM_OPERATION_NAME);
+    }
+
+    @Test
+    @SuppressWarnings({"unchecked", "rawtypes"})
+    public void shouldUseSpecifiedNameForTransformValuesWithKey() {
+        builder.stream(STREAM_TOPIC).transformValues(() -> (ValueTransformerWithKey) null, Named.as(STREAM_OPERATION_NAME));
+        builder.build();
+        final ProcessorTopology topology = builder.internalTopologyBuilder.rewriteTopology(new StreamsConfig(props)).build();
+        assertSpecifiedNameForOperation(topology, "KSTREAM-SOURCE-0000000000", STREAM_OPERATION_NAME);
+    }
+
+    @Test
+    @SuppressWarnings({"unchecked", "rawtypes"})
+    public void shouldUseSpecifiedNameForBranchOperation() {
+        builder.stream(STREAM_TOPIC)
+                .branch(Named.as("branch-processor"), (k, v) -> true, (k, v) -> false);
+
+        builder.build();
+        final ProcessorTopology topology = builder.internalTopologyBuilder.rewriteTopology(new StreamsConfig(props)).build();
+        assertSpecifiedNameForOperation(topology,
+                "KSTREAM-SOURCE-0000000000",
+                "branch-processor",
+                "branch-processor-predicate-0",
+                "branch-processor-predicate-1");
+    }
+
+    @Test
+    public void shouldUseSpecifiedNameForJoinOperationBetweenKStreamAndKTable() {
+        final KStream<String, String> streamOne = builder.stream(STREAM_TOPIC);
+        final KTable<String, String> streamTwo = builder.table("table-topic");
+        streamOne.join(streamTwo, (value1, value2) -> value1, Joined.as(STREAM_OPERATION_NAME));
+        builder.build();
+
+        final ProcessorTopology topology = builder.internalTopologyBuilder.rewriteTopology(new StreamsConfig(props)).build();
+        assertSpecifiedNameForOperation(topology,
+                "KSTREAM-SOURCE-0000000000",
+                "KSTREAM-SOURCE-0000000002",
+                "KTABLE-SOURCE-0000000003",
+                STREAM_OPERATION_NAME);
+    }
+
+    @Test
+    public void shouldUseSpecifiedNameForLeftJoinOperationBetweenKStreamAndKTable() {
+        final KStream<String, String> streamOne = builder.stream(STREAM_TOPIC);
+        final KTable<String, String> streamTwo = builder.table(STREAM_TOPIC_TWO);
+        streamOne.leftJoin(streamTwo, (value1, value2) -> value1, Joined.as(STREAM_OPERATION_NAME));
+        builder.build();
+
+        final ProcessorTopology topology = builder.internalTopologyBuilder.rewriteTopology(new StreamsConfig(props)).build();
+        assertSpecifiedNameForOperation(topology,
+                "KSTREAM-SOURCE-0000000000",
+                "KSTREAM-SOURCE-0000000002",
+                "KTABLE-SOURCE-0000000003",
+                STREAM_OPERATION_NAME);
+    }
+
+    @Test
+    public void shouldUseSpecifiedNameForLeftJoinOperationBetweenKStreamAndKStream() {
+        final KStream<String, String> streamOne = builder.stream(STREAM_TOPIC);
+        final KStream<String, String> streamTwo = builder.stream(STREAM_TOPIC_TWO);
+
+        streamOne.leftJoin(streamTwo, (value1, value2) -> value1, JoinWindows.of(Duration.ofHours(1)), Joined.as(STREAM_OPERATION_NAME));
+        builder.build();
+
+        final ProcessorTopology topology = builder.internalTopologyBuilder.rewriteTopology(new StreamsConfig(props)).build();
+        assertSpecifiedNameForStateStore(topology.stateStores(),
+                STREAM_OPERATION_NAME + "-this-join-store", STREAM_OPERATION_NAME + "-outer-other-join-store"
+        );
+        assertSpecifiedNameForOperation(topology,
+                "KSTREAM-SOURCE-0000000000",
+                "KSTREAM-SOURCE-0000000001",
+                STREAM_OPERATION_NAME + "-this-windowed",
+                STREAM_OPERATION_NAME + "-other-windowed",
+                STREAM_OPERATION_NAME + "-this-join",
+                STREAM_OPERATION_NAME + "-outer-other-join",
+                STREAM_OPERATION_NAME + "-merge");
+    }
+
+    @Test
+    public void shouldUseSpecifiedNameForJoinOperationBetweenKStreamAndKStream() {
+        final KStream<String, String> streamOne = builder.stream(STREAM_TOPIC);
+        final KStream<String, String> streamTwo = builder.stream(STREAM_TOPIC_TWO);
+
+        streamOne.join(streamTwo, (value1, value2) -> value1, JoinWindows.of(Duration.ofHours(1)), Joined.as(STREAM_OPERATION_NAME));
+        builder.build();
+
+        final ProcessorTopology topology = builder.internalTopologyBuilder.rewriteTopology(new StreamsConfig(props)).build();
+        assertSpecifiedNameForStateStore(topology.stateStores(),
+                STREAM_OPERATION_NAME + "-this-join-store",
+                STREAM_OPERATION_NAME + "-other-join-store"
+        );
+        assertSpecifiedNameForOperation(topology,
+                "KSTREAM-SOURCE-0000000000",
+                "KSTREAM-SOURCE-0000000001",
+                STREAM_OPERATION_NAME + "-this-windowed",
+                STREAM_OPERATION_NAME + "-other-windowed",
+                STREAM_OPERATION_NAME + "-this-join",
+                STREAM_OPERATION_NAME + "-other-join",
+                STREAM_OPERATION_NAME + "-merge");
+    }
+
+    @Test
+    public void shouldUseSpecifiedNameForOuterJoinOperationBetweenKStreamAndKStream() {
+        final KStream<String, String> streamOne = builder.stream(STREAM_TOPIC);
+        final KStream<String, String> streamTwo = builder.stream(STREAM_TOPIC_TWO);
+
+        streamOne.outerJoin(streamTwo, (value1, value2) -> value1, JoinWindows.of(Duration.ofHours(1)), Joined.as(STREAM_OPERATION_NAME));
+        builder.build();
+        final ProcessorTopology topology = builder.internalTopologyBuilder.rewriteTopology(new StreamsConfig(props)).build();
+        assertSpecifiedNameForStateStore(topology.stateStores(),
+                STREAM_OPERATION_NAME + "-outer-this-join-store",
+                STREAM_OPERATION_NAME + "-outer-other-join-store");
+        assertSpecifiedNameForOperation(topology,
+                "KSTREAM-SOURCE-0000000000",
+                "KSTREAM-SOURCE-0000000001",
+                STREAM_OPERATION_NAME + "-this-windowed",
+                STREAM_OPERATION_NAME + "-other-windowed",
+                STREAM_OPERATION_NAME + "-outer-this-join",
+                STREAM_OPERATION_NAME + "-outer-other-join",
+                STREAM_OPERATION_NAME + "-merge");
+
+    }
+
+    @Test
+    public void shouldUseSpecifiedNameForMergeOperation() {
+        final String topic1 = "topic-1";
+        final String topic2 = "topic-2";
+
+        final KStream<String, String> source1 = builder.stream(topic1);
+        final KStream<String, String> source2 = builder.stream(topic2);
+        source1.merge(source2, Named.as("merge-processor"));
+        builder.build();
+        final ProcessorTopology topology = builder.internalTopologyBuilder.rewriteTopology(new StreamsConfig(props)).build();
+        assertSpecifiedNameForOperation(topology, "KSTREAM-SOURCE-0000000000", "KSTREAM-SOURCE-0000000001", "merge-processor");
+    }
+
+    @Test
+    public void shouldUseSpecifiedNameForProcessOperation() {
+        builder.stream(STREAM_TOPIC)
+                .process(() -> null, Named.as("test-processor"));
+
+        builder.build();
+        final ProcessorTopology topology = builder.internalTopologyBuilder.rewriteTopology(new StreamsConfig(props)).build();
+        assertSpecifiedNameForOperation(topology, "KSTREAM-SOURCE-0000000000", "test-processor");
+    }
+
+    @Test
+    public void shouldUseSpecifiedNameForPrintOperation() {
+        builder.stream(STREAM_TOPIC).print(Printed.toSysOut().withName("print-processor"));
+        builder.build();
+        final ProcessorTopology topology = builder.internalTopologyBuilder.rewriteTopology(new StreamsConfig(props)).build();
+        assertSpecifiedNameForOperation(topology, "KSTREAM-SOURCE-0000000000", "print-processor");
+    }
+
+    @Test
+    @SuppressWarnings({"unchecked", "rawtypes"})
+    public void shouldUseSpecifiedNameForFlatTransformValueOperation() {
+        builder.stream(STREAM_TOPIC).flatTransformValues(() -> (ValueTransformer) null, Named.as(STREAM_OPERATION_NAME));
+        builder.build();
+        final ProcessorTopology topology = builder.internalTopologyBuilder.rewriteTopology(new StreamsConfig(props)).build();
+        assertSpecifiedNameForOperation(topology, "KSTREAM-SOURCE-0000000000", STREAM_OPERATION_NAME);
+    }
+
+    @Test
+    @SuppressWarnings({"unchecked", "rawtypes"})
+    public void shouldUseSpecifiedNameForFlatTransformValueWithKeyOperation() {
+        builder.stream(STREAM_TOPIC).flatTransformValues(() -> (ValueTransformerWithKey) null, Named.as(STREAM_OPERATION_NAME));
+        builder.build();
+        final ProcessorTopology topology = builder.internalTopologyBuilder.rewriteTopology(new StreamsConfig(props)).build();
+        assertSpecifiedNameForOperation(topology, "KSTREAM-SOURCE-0000000000", STREAM_OPERATION_NAME);
+    }
+
+    private static void assertSpecifiedNameForOperation(final ProcessorTopology topology, final String... expected) {
         final List<ProcessorNode> processors = topology.processors();
-        Assert.assertEquals("Invalid number of expected processors", expected.length, processors.size());
+        assertEquals("Invalid number of expected processors", expected.length, processors.size());
         for (int i = 0; i < expected.length; i++) {
             assertEquals(expected[i], processors.get(i).name());
         }
     }
 
-    private void assertSpecifiedNameForStateStore(final List<StateStore> stores, final String... expected) {
-        Assert.assertEquals("Invalid number of expected state stores", expected.length, stores.size());
+    private static void assertSpecifiedNameForStateStore(final List<StateStore> stores, final String... expected) {
+        assertEquals("Invalid number of expected state stores", expected.length, stores.size());
         for (int i = 0; i < expected.length; i++) {
             assertEquals(expected[i], stores.get(i).name());
         }
diff --git a/streams/src/test/java/org/apache/kafka/streams/kstream/RepartitionTopicNamingTest.java b/streams/src/test/java/org/apache/kafka/streams/kstream/RepartitionTopicNamingTest.java
index 3c7e8c0..e621ffc 100644
--- a/streams/src/test/java/org/apache/kafka/streams/kstream/RepartitionTopicNamingTest.java
+++ b/streams/src/test/java/org/apache/kafka/streams/kstream/RepartitionTopicNamingTest.java
@@ -550,44 +550,44 @@ public class RepartitionTopicNamingTest {
             "      --> KTABLE-TOSTREAM-0000000011\n" +
             "      <-- KSTREAM-SOURCE-0000000041\n" +
             "    Processor: KTABLE-TOSTREAM-0000000011 (stores: [])\n" +
-            "      --> KSTREAM-SINK-0000000012, KSTREAM-WINDOWED-0000000034\n" +
+            "      --> joined-stream-other-windowed, KSTREAM-SINK-0000000012\n" +
             "      <-- KSTREAM-AGGREGATE-0000000007\n" +
             "    Processor: KSTREAM-FILTER-0000000020 (stores: [])\n" +
             "      --> KSTREAM-PEEK-0000000021\n" +
             "      <-- KSTREAM-SOURCE-0000000041\n" +
             "    Processor: KSTREAM-FILTER-0000000029 (stores: [])\n" +
-            "      --> KSTREAM-WINDOWED-0000000033\n" +
+            "      --> joined-stream-this-windowed\n" +
             "      <-- KSTREAM-SOURCE-0000000041\n" +
             "    Processor: KSTREAM-PEEK-0000000021 (stores: [])\n" +
             "      --> KSTREAM-REDUCE-0000000023\n" +
             "      <-- KSTREAM-FILTER-0000000020\n" +
-            "    Processor: KSTREAM-WINDOWED-0000000033 (stores: [KSTREAM-JOINTHIS-0000000035-store])\n" +
-            "      --> KSTREAM-JOINTHIS-0000000035\n" +
-            "      <-- KSTREAM-FILTER-0000000029\n" +
-            "    Processor: KSTREAM-WINDOWED-0000000034 (stores: [KSTREAM-JOINOTHER-0000000036-store])\n" +
-            "      --> KSTREAM-JOINOTHER-0000000036\n" +
+            "    Processor: joined-stream-other-windowed (stores: [joined-stream-other-join-store])\n" +
+            "      --> joined-stream-other-join\n" +
             "      <-- KTABLE-TOSTREAM-0000000011\n" +
+            "    Processor: joined-stream-this-windowed (stores: [joined-stream-this-join-store])\n" +
+            "      --> joined-stream-this-join\n" +
+            "      <-- KSTREAM-FILTER-0000000029\n" +
             "    Processor: KSTREAM-AGGREGATE-0000000014 (stores: [KSTREAM-AGGREGATE-STATE-STORE-0000000013])\n" +
             "      --> KTABLE-TOSTREAM-0000000018\n" +
             "      <-- KSTREAM-SOURCE-0000000041\n" +
-            "    Processor: KSTREAM-JOINOTHER-0000000036 (stores: [KSTREAM-JOINTHIS-0000000035-store])\n" +
-            "      --> KSTREAM-MERGE-0000000037\n" +
-            "      <-- KSTREAM-WINDOWED-0000000034\n" +
-            "    Processor: KSTREAM-JOINTHIS-0000000035 (stores: [KSTREAM-JOINOTHER-0000000036-store])\n" +
-            "      --> KSTREAM-MERGE-0000000037\n" +
-            "      <-- KSTREAM-WINDOWED-0000000033\n" +
             "    Processor: KSTREAM-REDUCE-0000000023 (stores: [KSTREAM-REDUCE-STATE-STORE-0000000022])\n" +
             "      --> KTABLE-TOSTREAM-0000000027\n" +
             "      <-- KSTREAM-PEEK-0000000021\n" +
-            "    Processor: KSTREAM-MERGE-0000000037 (stores: [])\n" +
-            "      --> KSTREAM-SINK-0000000038\n" +
-            "      <-- KSTREAM-JOINTHIS-0000000035, KSTREAM-JOINOTHER-0000000036\n" +
+            "    Processor: joined-stream-other-join (stores: [joined-stream-this-join-store])\n" +
+            "      --> joined-stream-merge\n" +
+            "      <-- joined-stream-other-windowed\n" +
+            "    Processor: joined-stream-this-join (stores: [joined-stream-other-join-store])\n" +
+            "      --> joined-stream-merge\n" +
+            "      <-- joined-stream-this-windowed\n" +
             "    Processor: KTABLE-TOSTREAM-0000000018 (stores: [])\n" +
             "      --> KSTREAM-SINK-0000000019\n" +
             "      <-- KSTREAM-AGGREGATE-0000000014\n" +
             "    Processor: KTABLE-TOSTREAM-0000000027 (stores: [])\n" +
             "      --> KSTREAM-SINK-0000000028\n" +
             "      <-- KSTREAM-REDUCE-0000000023\n" +
+            "    Processor: joined-stream-merge (stores: [])\n" +
+            "      --> KSTREAM-SINK-0000000038\n" +
+            "      <-- joined-stream-this-join, joined-stream-other-join\n" +
             "    Sink: KSTREAM-SINK-0000000012 (topic: outputTopic_0)\n" +
             "      <-- KTABLE-TOSTREAM-0000000011\n" +
             "    Sink: KSTREAM-SINK-0000000019 (topic: outputTopic_1)\n" +
@@ -595,7 +595,7 @@ public class RepartitionTopicNamingTest {
             "    Sink: KSTREAM-SINK-0000000028 (topic: outputTopic_2)\n" +
             "      <-- KTABLE-TOSTREAM-0000000027\n" +
             "    Sink: KSTREAM-SINK-0000000038 (topic: outputTopicForJoin)\n" +
-            "      <-- KSTREAM-MERGE-0000000037\n\n";
+            "      <-- joined-stream-merge\n\n";
 
 
     private static final String EXPECTED_UNOPTIMIZED_TOPOLOGY = "Topologies:\n" +
@@ -651,29 +651,29 @@ public class RepartitionTopicNamingTest {
             "      --> KTABLE-TOSTREAM-0000000011\n" +
             "      <-- KSTREAM-SOURCE-0000000010\n" +
             "    Processor: KTABLE-TOSTREAM-0000000011 (stores: [])\n" +
-            "      --> KSTREAM-SINK-0000000012, KSTREAM-WINDOWED-0000000034\n" +
+            "      --> KSTREAM-SINK-0000000012, joined-stream-other-windowed\n" +
             "      <-- KSTREAM-AGGREGATE-0000000007\n" +
             "    Source: KSTREAM-SOURCE-0000000032 (topics: [joined-stream-left-repartition])\n" +
-            "      --> KSTREAM-WINDOWED-0000000033\n" +
-            "    Processor: KSTREAM-WINDOWED-0000000033 (stores: [KSTREAM-JOINTHIS-0000000035-store])\n" +
-            "      --> KSTREAM-JOINTHIS-0000000035\n" +
-            "      <-- KSTREAM-SOURCE-0000000032\n" +
-            "    Processor: KSTREAM-WINDOWED-0000000034 (stores: [KSTREAM-JOINOTHER-0000000036-store])\n" +
-            "      --> KSTREAM-JOINOTHER-0000000036\n" +
+            "      --> joined-stream-this-windowed\n" +
+            "    Processor: joined-stream-other-windowed (stores: [joined-stream-other-join-store])\n" +
+            "      --> joined-stream-other-join\n" +
             "      <-- KTABLE-TOSTREAM-0000000011\n" +
-            "    Processor: KSTREAM-JOINOTHER-0000000036 (stores: [KSTREAM-JOINTHIS-0000000035-store])\n" +
-            "      --> KSTREAM-MERGE-0000000037\n" +
-            "      <-- KSTREAM-WINDOWED-0000000034\n" +
-            "    Processor: KSTREAM-JOINTHIS-0000000035 (stores: [KSTREAM-JOINOTHER-0000000036-store])\n" +
-            "      --> KSTREAM-MERGE-0000000037\n" +
-            "      <-- KSTREAM-WINDOWED-0000000033\n" +
-            "    Processor: KSTREAM-MERGE-0000000037 (stores: [])\n" +
+            "    Processor: joined-stream-this-windowed (stores: [joined-stream-this-join-store])\n" +
+            "      --> joined-stream-this-join\n" +
+            "      <-- KSTREAM-SOURCE-0000000032\n" +
+            "    Processor: joined-stream-other-join (stores: [joined-stream-this-join-store])\n" +
+            "      --> joined-stream-merge\n" +
+            "      <-- joined-stream-other-windowed\n" +
+            "    Processor: joined-stream-this-join (stores: [joined-stream-other-join-store])\n" +
+            "      --> joined-stream-merge\n" +
+            "      <-- joined-stream-this-windowed\n" +
+            "    Processor: joined-stream-merge (stores: [])\n" +
             "      --> KSTREAM-SINK-0000000038\n" +
-            "      <-- KSTREAM-JOINTHIS-0000000035, KSTREAM-JOINOTHER-0000000036\n" +
+            "      <-- joined-stream-this-join, joined-stream-other-join\n" +
             "    Sink: KSTREAM-SINK-0000000012 (topic: outputTopic_0)\n" +
             "      <-- KTABLE-TOSTREAM-0000000011\n" +
             "    Sink: KSTREAM-SINK-0000000038 (topic: outputTopicForJoin)\n" +
-            "      <-- KSTREAM-MERGE-0000000037\n" +
+            "      <-- joined-stream-merge\n" +
             "\n" +
             "  Sub-topology: 2\n" +
             "    Source: KSTREAM-SOURCE-0000000017 (topics: [aggregate-stream-repartition])\n" +