You are viewing a plain text version of this content. The canonical link for it is here.
Posted to jira@kafka.apache.org by "ASF GitHub Bot (JIRA)" <ji...@apache.org> on 2018/10/02 06:10:01 UTC

[jira] [Commented] (KAFKA-7406) Naming Join and Grouping Repartition Topics

    [ https://issues.apache.org/jira/browse/KAFKA-7406?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16635062#comment-16635062 ] 

ASF GitHub Bot commented on KAFKA-7406:
---------------------------------------

mjsax closed pull request #5709: KAFKA-7406: Name join group repartition topics
URL: https://github.com/apache/kafka/pull/5709
 
 
   

This is a PR merged from a forked repository.
As GitHub hides the original diff on merge, it is displayed below for
the sake of provenance:

As this is a foreign pull request (from a fork), the diff is supplied
below (as it won't show otherwise due to GitHub magic):

diff --git a/streams/src/main/java/org/apache/kafka/streams/kstream/Grouped.java b/streams/src/main/java/org/apache/kafka/streams/kstream/Grouped.java
new file mode 100644
index 00000000000..404cbd48501
--- /dev/null
+++ b/streams/src/main/java/org/apache/kafka/streams/kstream/Grouped.java
@@ -0,0 +1,157 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.kafka.streams.kstream;
+
+
+import org.apache.kafka.common.serialization.Serde;
+
+/**
+ * The class that is used to capture the key and value {@link Serde}s and set the part of name used for
+ * repartition topics when performing {@link KStream#groupBy(KeyValueMapper, Grouped)}, {@link
+ * KStream#groupByKey(Grouped)}, or {@link KTable#groupBy(KeyValueMapper, Grouped)} operations.  Note
+ * that Kafka Streams does not always create repartition topic for grouping operations.
+ *
+ * @param <K> the key type
+ * @param <V> the value type
+ */
+public class Grouped<K, V> {
+
+    protected final  Serde<K> keySerde;
+    protected final Serde<V> valueSerde;
+    protected final String name;
+
+
+    private Grouped(final String name,
+                    final Serde<K> keySerde,
+                    final Serde<V> valueSerde) {
+        this.name = name;
+        this.keySerde = keySerde;
+        this.valueSerde = valueSerde;
+    }
+
+    protected Grouped(final Grouped<K, V> grouped) {
+        this(grouped.name, grouped.keySerde, grouped.valueSerde);
+    }
+
+    /**
+     * Create a {@code Grouped} instance with the provided name used for a repartition topic required for
+     * performing the grouping operation.
+     *
+     * @param name the name used for a repartition topic if required
+     * @return a new {@link Grouped} configured with the name
+     * @see KStream#groupByKey(Grouped)
+     * @see KStream#groupBy(KeyValueMapper, Grouped)
+     * @see KTable#groupBy(KeyValueMapper, Grouped)
+     */
+    public static <K, V> Grouped<K, V> as(final String name) {
+        return new Grouped<>(name, null, null);
+    }
+
+
+    /**
+     * Create a {@code Grouped} instance with the provided keySerde.
+     *
+     * @param keySerde the Serde used for serializing the key
+     * @return a new {@link Grouped} configured with the keySerde
+     * @see KStream#groupByKey(Grouped)
+     * @see KStream#groupBy(KeyValueMapper, Grouped)
+     * @see KTable#groupBy(KeyValueMapper, Grouped)
+     */
+    public static <K> Grouped keySerde(final Serde<K> keySerde) {
+        return new Grouped<>(null, keySerde, null);
+    }
+
+
+    /**
+     * Create a {@code Grouped} instance with the provided valueSerde.
+     *
+     * @param valueSerde the Serde used for serializing the value
+     * @return a new {@link Grouped} configured with the valueSerde
+     * @see KStream#groupByKey(Grouped)
+     * @see KStream#groupBy(KeyValueMapper, Grouped)
+     * @see KTable#groupBy(KeyValueMapper, Grouped)
+     */
+    public static <V> Grouped valueSerde(final Serde<V> valueSerde) {
+        return new Grouped<>(null, null, valueSerde);
+    }
+
+    /**
+     * Create a {@code Grouped} instance with the provided {@code name}, {@code keySerde}, and {@code valueSerde}.
+     *
+     * @param name       the name used for part of the repartition topic name if required
+     * @param keySerde   the Serde used for serializing the key
+     * @param valueSerde the Serde used for serializing the value
+     * @return a new {@link Grouped} configured with the name, keySerde, and valueSerde
+     * @see KStream#groupByKey(Grouped)
+     * @see KStream#groupBy(KeyValueMapper, Grouped)
+     * @see KTable#groupBy(KeyValueMapper, Grouped)
+     */
+    public static <K, V> Grouped<K, V> with(final String name,
+                                            final Serde<K> keySerde,
+                                            final Serde<V> valueSerde) {
+        return new Grouped<>(name, keySerde, valueSerde);
+    }
+
+
+    /**
+     * Create a {@code Grouped} instance with the provided {@code keySerde} and {@code valueSerde}.
+     *
+     * @param keySerde   the Serde used for serializing the key
+     * @param valueSerde the Serde used for serializing the value
+     * @return a new {@link Grouped} configured with the keySerde, and valueSerde
+     * @see KStream#groupByKey(Grouped)
+     * @see KStream#groupBy(KeyValueMapper, Grouped)
+     * @see KTable#groupBy(KeyValueMapper, Grouped)
+     */
+    public static <K, V> Grouped<K, V> with(final Serde<K> keySerde,
+                                            final Serde<V> valueSerde) {
+        return new Grouped<>(null, keySerde, valueSerde);
+    }
+
+    /**
+     * Perform the grouping operation with the name for a repartition topic if required.  Note
+     * that Kafka Streams does not always create a repartition topic for grouping operations.
+     *
+     * @param name the name used for part of the repartition topic if required
+     * @return a new @{Grouped} instance configured with the {@code name}
+     * */
+    public Grouped<K, V> withName(final String name) {
+        return new Grouped<>(name, keySerde, valueSerde);
+    }
+
+    /**
+     * Perform the grouping operation using the provided keySerde for serializing the key.
+     *
+     * @param keySerde Serde to use for serializing the key
+     * @return a new {@code Grouped} instance configured with the {@code keySerde}
+     */
+    public Grouped<K, V> withKeySerde(final Serde<K> keySerde) {
+        return new Grouped<>(name, keySerde, valueSerde);
+    }
+
+    /**
+     * Perform the grouping operation using the provided valueSerde for serializing the value.
+     *
+     * @param valueSerde Serde to use for serializing the value
+     * @return a new {@code Grouped} instance configured with the {@code valueSerde}
+     */
+    public Grouped<K, V> withValueSerde(final Serde<V> valueSerde) {
+        return new Grouped<>(name, keySerde, valueSerde);
+    }
+
+}
diff --git a/streams/src/main/java/org/apache/kafka/streams/kstream/Joined.java b/streams/src/main/java/org/apache/kafka/streams/kstream/Joined.java
index 8601e1c6ad2..aa29c6805b3 100644
--- a/streams/src/main/java/org/apache/kafka/streams/kstream/Joined.java
+++ b/streams/src/main/java/org/apache/kafka/streams/kstream/Joined.java
@@ -24,16 +24,20 @@
  */
 public class Joined<K, V, VO> {
 
-    private Serde<K> keySerde;
-    private Serde<V> valueSerde;
-    private Serde<VO> otherValueSerde;
+    private final Serde<K> keySerde;
+    private final Serde<V> valueSerde;
+    private final Serde<VO> otherValueSerde;
+    private final String name;
+
 
     private Joined(final Serde<K> keySerde,
                    final Serde<V> valueSerde,
-                   final Serde<VO> otherValueSerde) {
+                   final Serde<VO> otherValueSerde,
+                   final String name) {
         this.keySerde = keySerde;
         this.valueSerde = valueSerde;
         this.otherValueSerde = otherValueSerde;
+        this.name = name;
     }
 
     /**
@@ -51,7 +55,32 @@ private Joined(final Serde<K> keySerde,
     public static <K, V, VO> Joined<K, V, VO> with(final Serde<K> keySerde,
                                                    final Serde<V> valueSerde,
                                                    final Serde<VO> otherValueSerde) {
-        return new Joined<>(keySerde, valueSerde, otherValueSerde);
+        return new Joined<>(keySerde, valueSerde, otherValueSerde, null);
+    }
+
+    /**
+     * Create an instance of {@code Joined} with key, value, and otherValue {@link Serde} instances.
+     * {@code null} values are accepted and will be replaced by the default serdes as defined in
+     * config.
+     *
+     * @param keySerde the key serde to use. If {@code null} the default key serde from config will be
+     * used
+     * @param valueSerde the value serde to use. If {@code null} the default value serde from config
+     * will be used
+     * @param otherValueSerde the otherValue serde to use. If {@code null} the default value serde
+     * from config will be used
+     * @param name the name used as the base for naming components of the join including any
+     * repartition topics
+     * @param <K> key type
+     * @param <V> value type
+     * @param <VO> other value type
+     * @return new {@code Joined} instance with the provided serdes
+     */
+    public static <K, V, VO> Joined<K, V, VO> with(final Serde<K> keySerde,
+                                                   final Serde<V> valueSerde,
+                                                   final Serde<VO> otherValueSerde,
+                                                   final String name) {
+        return new Joined<>(keySerde, valueSerde, otherValueSerde, name);
     }
 
     /**
@@ -65,7 +94,7 @@ private Joined(final Serde<K> keySerde,
      * @return new {@code Joined} instance configured with the keySerde
      */
     public static <K, V, VO> Joined<K, V, VO> keySerde(final Serde<K> keySerde) {
-        return with(keySerde, null, null);
+        return new Joined<>(keySerde, null, null, null);
     }
 
     /**
@@ -79,7 +108,7 @@ private Joined(final Serde<K> keySerde,
      * @return new {@code Joined} instance configured with the valueSerde
      */
     public static <K, V, VO> Joined<K, V, VO> valueSerde(final Serde<V> valueSerde) {
-        return with(null, valueSerde, null);
+        return new Joined<>(null, valueSerde, null, null);
     }
 
     /**
@@ -93,19 +122,34 @@ private Joined(final Serde<K> keySerde,
      * @return new {@code Joined} instance configured with the otherValueSerde
      */
     public static <K, V, VO> Joined<K, V, VO> otherValueSerde(final Serde<VO> otherValueSerde) {
-        return with(null, null, otherValueSerde);
+        return new Joined<>(null, null, otherValueSerde, null);
+    }
+
+    /**
+     * Create an instance of {@code Joined} with base name for all components of the join, this may
+     * include any repartition topics created to complete the join.
+     *
+     * @param name the name used as the base for naming components of the join including any
+     * repartition topics
+     * @param <K> key type
+     * @param <V> value type
+     * @param <VO> other value type
+     * @return new {@code Joined} instance configured with the name
+     */
+    public static <K, V, VO> Joined<K, V, VO> named(final String name) {
+        return new Joined<>(null, null, null, name);
     }
 
+
     /**
      * Set the key {@link Serde} to be used. Null values are accepted and will be replaced by the default
      * key serde as defined in config
      *
      * @param keySerde the key serde to use. If null the default key serde from config will be used
-     * @return this
+     * @return new {@code Joined} instance configured with the {@code name}
      */
     public Joined<K, V, VO> withKeySerde(final Serde<K> keySerde) {
-        this.keySerde = keySerde;
-        return this;
+        return new Joined<>(keySerde, valueSerde, otherValueSerde, name);
     }
 
     /**
@@ -113,11 +157,10 @@ private Joined(final Serde<K> keySerde,
      * value serde as defined in config
      *
      * @param valueSerde the value serde to use. If null the default value serde from config will be used
-     * @return this
+     * @return new {@code Joined} instance configured with the {@code valueSerde}
      */
     public Joined<K, V, VO> withValueSerde(final Serde<V> valueSerde) {
-        this.valueSerde = valueSerde;
-        return this;
+        return new Joined<>(keySerde, valueSerde, otherValueSerde, name);
     }
 
     /**
@@ -125,11 +168,22 @@ private Joined(final Serde<K> keySerde,
      * value serde as defined in config
      *
      * @param otherValueSerde the otherValue serde to use. If null the default value serde from config will be used
-     * @return this
+     * @return new {@code Joined} instance configured with the {@code valueSerde}
      */
     public Joined<K, V, VO> withOtherValueSerde(final Serde<VO> otherValueSerde) {
-        this.otherValueSerde = otherValueSerde;
-        return this;
+        return new Joined<>(keySerde, valueSerde, otherValueSerde, name);
+    }
+
+    /**
+     * Set the base name used for all components of the join, this may include any repartition topics
+     * created to complete the join.
+     *
+     * @param name the name used as the base for naming components of the join including any
+     * repartition topics
+     * @return new {@code Joined} instance configured with the {@code name}
+     */
+    public Joined<K, V, VO> withName(final String name) {
+        return new Joined<>(keySerde, valueSerde, otherValueSerde, name);
     }
 
     public Serde<K> keySerde() {
@@ -143,4 +197,8 @@ private Joined(final Serde<K> keySerde,
     public Serde<VO> otherValueSerde() {
         return otherValueSerde;
     }
+
+    public String name() {
+        return name;
+    }
 }
diff --git a/streams/src/main/java/org/apache/kafka/streams/kstream/KStream.java b/streams/src/main/java/org/apache/kafka/streams/kstream/KStream.java
index cf2ce75450e..13f1dfc53c9 100644
--- a/streams/src/main/java/org/apache/kafka/streams/kstream/KStream.java
+++ b/streams/src/main/java/org/apache/kafka/streams/kstream/KStream.java
@@ -770,11 +770,12 @@ void process(final ProcessorSupplier<? super K, ? super V> processorSupplier,
      * If a key changing operator was used before this operation (e.g., {@link #selectKey(KeyValueMapper)},
      * {@link #map(KeyValueMapper)}, {@link #flatMap(KeyValueMapper)}, or
      * {@link #transform(TransformerSupplier, String...)}), and no data redistribution happened afterwards (e.g., via
-     * {@link #through(String)}) an internal repartitioning topic will be created in Kafka.
-     * This topic will be named "${applicationId}-XXX-repartition", where "applicationId" is user-specified in
-     * {@link StreamsConfig} via parameter {@link StreamsConfig#APPLICATION_ID_CONFIG APPLICATION_ID_CONFIG}, "XXX" is
+     * {@link #through(String)}) an internal repartitioning topic may need to be created in Kafka if a later
+     * operator depends on the newly selected key.
+     * This topic will be named "${applicationId}-&lt;name&gt-repartition", where "applicationId" is user-specified in
+     * {@link StreamsConfig} via parameter {@link StreamsConfig#APPLICATION_ID_CONFIG APPLICATION_ID_CONFIG}, "&lt;name&gt" is
      * an internally generated name, and "-repartition" is a fixed suffix.
-     *
+     * <p>
      * You can retrieve all generated internal topic names via {@link Topology#describe()}.
      *
      * <p>
@@ -782,7 +783,7 @@ void process(final ProcessorSupplier<? super K, ? super V> processorSupplier,
      * records to it, and rereading all records from it, such that the resulting {@link KGroupedStream} is partitioned
      * correctly on its key.
      * If the last key changing operator changed the key type, it is recommended to use
-     * {@link #groupByKey(Serialized)} instead.
+     * {@link #groupByKey(org.apache.kafka.streams.kstream.Grouped)} instead.
      *
      * @return a {@link KGroupedStream} that contains the grouped records of the original {@code KStream}
      * @see #groupBy(KeyValueMapper)
@@ -799,11 +800,12 @@ void process(final ProcessorSupplier<? super K, ? super V> processorSupplier,
      * If a key changing operator was used before this operation (e.g., {@link #selectKey(KeyValueMapper)},
      * {@link #map(KeyValueMapper)}, {@link #flatMap(KeyValueMapper)}, or
      * {@link #transform(TransformerSupplier, String...)}), and no data redistribution happened afterwards (e.g., via
-     * {@link #through(String)}) an internal repartitioning topic will be created in Kafka.
-     * This topic will be named "${applicationId}-XXX-repartition", where "applicationId" is user-specified in
-     * {@link StreamsConfig} via parameter {@link StreamsConfig#APPLICATION_ID_CONFIG APPLICATION_ID_CONFIG}, "XXX" is
+     * {@link #through(String)}) an internal repartitioning topic may need to be created in Kafka
+     * if a later operator depends on the newly selected key..
+     * This topic will be named "${applicationId}-&lt;name&gt-repartition", where "applicationId" is user-specified in
+     * {@link StreamsConfig} via parameter {@link StreamsConfig#APPLICATION_ID_CONFIG APPLICATION_ID_CONFIG}, "&lt;name&gt" is
      * an internally generated name, and "-repartition" is a fixed suffix.
-     *
+     * <p>
      * You can retrieve all generated internal topic names via {@link Topology#describe()}.
      *
      * <p>
@@ -813,22 +815,58 @@ void process(final ProcessorSupplier<? super K, ? super V> processorSupplier,
      *
      * @return a {@link KGroupedStream} that contains the grouped records of the original {@code KStream}
      * @see #groupBy(KeyValueMapper)
+     *
+     * @deprecated since 2.1. Use {@link org.apache.kafka.streams.kstream.KStream#groupByKey(Grouped)} instead
      */
+    @Deprecated
     KGroupedStream<K, V> groupByKey(final Serialized<K, V> serialized);
 
+    /**
+     * Group the records by their current key into a {@link KGroupedStream} while preserving the original values
+     * and using the serializers as defined by {@link Grouped}.
+     * Grouping a stream on the record key is required before an aggregation operator can be applied to the data
+     * (cf. {@link KGroupedStream}).
+     * If a record key is {@code null} the record will not be included in the resulting {@link KGroupedStream}.
+     * <p>
+     * If a key changing operator was used before this operation (e.g., {@link #selectKey(KeyValueMapper)},
+     * {@link #map(KeyValueMapper)}, {@link #flatMap(KeyValueMapper)}, or
+     * {@link #transform(TransformerSupplier, String...)}), and no data redistribution happened afterwards (e.g., via
+     * {@link #through(String)}) an internal repartitioning topic may need to be created in Kafka if a later operator
+     * depends on the newly selected key.
+     * This topic will be named "${applicationId}-&lt;name&gt;-repartition", where "applicationId" is user-specified in
+     * {@link StreamsConfig} via parameter {@link StreamsConfig#APPLICATION_ID_CONFIG APPLICATION_ID_CONFIG}, &lt;name&gt; is
+     * either provided via {@link org.apache.kafka.streams.kstream.Grouped#as(String)} or an internally generated name,
+     * and "-repartition" is a fixed suffix.
+     *
+     * <p>
+     * You can retrieve all generated internal topic names via {@link Topology#describe()}.
+     *
+     * <p>
+     * For this case, all data of this stream will be redistributed through the repartitioning topic by writing all
+     * records to it, and rereading all records from it, such that the resulting {@link KGroupedStream} is partitioned
+     * correctly on its key.
+     *
+     * @param  grouped  the {@link Grouped} instance used to specify {@link org.apache.kafka.common.serialization.Serdes}
+     *                  and part of the name for a repartition topic if repartitioning is required.
+     * @return a {@link KGroupedStream} that contains the grouped records of the original {@code KStream}
+     * @see #groupBy(KeyValueMapper)
+     */
+    KGroupedStream<K, V> groupByKey(final Grouped<K, V> grouped);
+
     /**
      * Group the records of this {@code KStream} on a new key that is selected using the provided {@link KeyValueMapper}
      * and default serializers and deserializers.
      * Grouping a stream on the record key is required before an aggregation operator can be applied to the data
      * (cf. {@link KGroupedStream}).
-     * The {@link KeyValueMapper} selects a new key (which should be of the same type) while preserving the original values.
+     * The {@link KeyValueMapper} selects a new key (which may or may not be of the same type) while preserving the original values.
      * If the new record key is {@code null} the record will not be included in the resulting {@link KGroupedStream}
      * <p>
-     * Because a new key is selected, an internal repartitioning topic will be created in Kafka.
-     * This topic will be named "${applicationId}-XXX-repartition", where "applicationId" is user-specified in
-     * {@link  StreamsConfig} via parameter {@link StreamsConfig#APPLICATION_ID_CONFIG APPLICATION_ID_CONFIG}, "XXX" is
+     * Because a new key is selected, an internal repartitioning topic may need to be created in Kafka if a
+     * later operator depends on the newly selected key.
+     * This topic will be named "${applicationId}-&lt;name&gt-repartition", where "applicationId" is user-specified in
+     * {@link  StreamsConfig} via parameter {@link StreamsConfig#APPLICATION_ID_CONFIG APPLICATION_ID_CONFIG}, "&lt;name&gt" is
      * an internally generated name, and "-repartition" is a fixed suffix.
-     *
+     * <p>
      * You can retrieve all generated internal topic names via {@link Topology#describe()}.
      *
      * <p>
@@ -849,14 +887,15 @@ void process(final ProcessorSupplier<? super K, ? super V> processorSupplier,
      * and {@link Serde}s as specified by {@link Serialized}.
      * Grouping a stream on the record key is required before an aggregation operator can be applied to the data
      * (cf. {@link KGroupedStream}).
-     * The {@link KeyValueMapper} selects a new key (which should be of the same type) while preserving the original values.
+     * The {@link KeyValueMapper} selects a new key (which may or may not be of the same type) while preserving the original values.
      * If the new record key is {@code null} the record will not be included in the resulting {@link KGroupedStream}.
      * <p>
-     * Because a new key is selected, an internal repartitioning topic will be created in Kafka.
-     * This topic will be named "${applicationId}-XXX-repartition", where "applicationId" is user-specified in
+     * Because a new key is selected, an internal repartitioning topic may need to be created in Kafka if a
+     * later operator depends on the newly selected key.
+     * This topic will be as "${applicationId}-XXX-repartition", where "applicationId" is user-specified in
      * {@link  StreamsConfig} via parameter {@link StreamsConfig#APPLICATION_ID_CONFIG APPLICATION_ID_CONFIG}, "XXX" is
      * an internally generated name, and "-repartition" is a fixed suffix.
-     *
+     * <p>
      * You can retrieve all generated internal topic names via {@link Topology#describe()}.
      *
      * <p>
@@ -868,10 +907,46 @@ void process(final ProcessorSupplier<? super K, ? super V> processorSupplier,
      * @param selector a {@link KeyValueMapper} that computes a new key for grouping
      * @param <KR>     the key type of the result {@link KGroupedStream}
      * @return a {@link KGroupedStream} that contains the grouped records of the original {@code KStream}
+     *
+     * @deprecated since 2.1. Use {@link org.apache.kafka.streams.kstream.KStream#groupBy(KeyValueMapper, Grouped)} instead
      */
+    @Deprecated
     <KR> KGroupedStream<KR, V> groupBy(final KeyValueMapper<? super K, ? super V, KR> selector,
                                        final Serialized<KR, V> serialized);
 
+    /**
+     * Group the records of this {@code KStream} on a new key that is selected using the provided {@link KeyValueMapper}
+     * and {@link Serde}s as specified by {@link Grouped}.
+     * Grouping a stream on the record key is required before an aggregation operator can be applied to the data
+     * (cf. {@link KGroupedStream}).
+     * The {@link KeyValueMapper} selects a new key (which may or may not be of the same type) while preserving the original values.
+     * If the new record key is {@code null} the record will not be included in the resulting {@link KGroupedStream}.
+     * <p>
+     * Because a new key is selected, an internal repartitioning topic may need to be created in Kafka if a later
+     * operator depends on the newly selected key.
+     * This topic will be named "${applicationId}-&lt;name&gt-repartition", where "applicationId" is user-specified in
+     * {@link  StreamsConfig} via parameter {@link StreamsConfig#APPLICATION_ID_CONFIG APPLICATION_ID_CONFIG}, "&lt;name&gt" is
+     * either provided via {@link org.apache.kafka.streams.kstream.Grouped#as(String)} or an internally generated name.
+     * <p>
+     * You can retrieve all generated internal topic names via {@link Topology#describe()}.
+     *
+     * <p>
+     * All data of this stream will be redistributed through the repartitioning topic by writing all records to it,
+     * and rereading all records from it, such that the resulting {@link KGroupedStream} is partitioned on the new key.
+     * <p>
+     * This operation is equivalent to calling {@link #selectKey(KeyValueMapper)} followed by {@link #groupByKey()}.
+     *
+     * @param selector a {@link KeyValueMapper} that computes a new key for grouping
+     * @param grouped  the {@link Grouped} instance used to specify {@link org.apache.kafka.common.serialization.Serdes}
+     *                 and part of the name for a repartition topic if repartitioning is required.
+     * @param <KR>     the key type of the result {@link KGroupedStream}
+     * @return a {@link KGroupedStream} that contains the grouped records of the original {@code KStream}
+     */
+    <KR> KGroupedStream<KR, V> groupBy(final KeyValueMapper<? super K, ? super V, KR> selector,
+                                       final Grouped<KR, V> grouped);
+
+
+
     /**
      * Join records of this stream with another {@code KStream}'s records using windowed inner equi join with default
      * serializers and deserializers.
@@ -932,7 +1007,7 @@ void process(final ProcessorSupplier<? super K, ? super V> processorSupplier,
      * in {@link StreamsConfig} via parameter
      * {@link StreamsConfig#APPLICATION_ID_CONFIG APPLICATION_ID_CONFIG}, "storeName" is an
      * internally generated name, and "-changelog" is a fixed suffix.
-     *
+     * <p>
      * You can retrieve all generated internal topic names via {@link Topology#describe()}.
      *
      * @param otherStream the {@code KStream} to be joined with this stream
@@ -1076,9 +1151,9 @@ void process(final ProcessorSupplier<? super K, ? super V> processorSupplier,
      * Furthermore, both input streams need to be co-partitioned on the join key (i.e., use the same partitioner).
      * If this requirement is not met, Kafka Streams will automatically repartition the data, i.e., it will create an
      * internal repartitioning topic in Kafka and write and re-read the data via this topic before the actual join.
-     * The repartitioning topic will be named "${applicationId}-XXX-repartition", where "applicationId" is
+     * The repartitioning topic will be named "${applicationId}-&lt;name&gt-repartition", where "applicationId" is
      * user-specified in {@link StreamsConfig} via parameter
-     * {@link StreamsConfig#APPLICATION_ID_CONFIG APPLICATION_ID_CONFIG}, "XXX" is an internally generated name, and
+     * {@link StreamsConfig#APPLICATION_ID_CONFIG APPLICATION_ID_CONFIG}, "&lt;name&gt" is an internally generated name, and
      * "-repartition" is a fixed suffix.
      * <p>
      * Repartitioning can happen for one or both of the joining {@code KStream}s.
@@ -1157,9 +1232,9 @@ void process(final ProcessorSupplier<? super K, ? super V> processorSupplier,
      * Furthermore, both input streams need to be co-partitioned on the join key (i.e., use the same partitioner).
      * If this requirement is not met, Kafka Streams will automatically repartition the data, i.e., it will create an
      * internal repartitioning topic in Kafka and write and re-read the data via this topic before the actual join.
-     * The repartitioning topic will be named "${applicationId}-XXX-repartition", where "applicationId" is
+     * The repartitioning topic will be named "${applicationId}-&lt;name&gt;-repartition", where "applicationId" is
      * user-specified in {@link StreamsConfig} via parameter
-     * {@link StreamsConfig#APPLICATION_ID_CONFIG APPLICATION_ID_CONFIG}, "XXX" is an internally generated name, and
+     * {@link StreamsConfig#APPLICATION_ID_CONFIG APPLICATION_ID_CONFIG}, "&lt;name&gt;" is an internally generated name, and
      * "-repartition" is a fixed suffix.
      * <p>
      * Repartitioning can happen for one or both of the joining {@code KStream}s.
@@ -1241,9 +1316,9 @@ void process(final ProcessorSupplier<? super K, ? super V> processorSupplier,
      * Furthermore, both input streams need to be co-partitioned on the join key (i.e., use the same partitioner).
      * If this requirement is not met, Kafka Streams will automatically repartition the data, i.e., it will create an
      * internal repartitioning topic in Kafka and write and re-read the data via this topic before the actual join.
-     * The repartitioning topic will be named "${applicationId}-XXX-repartition", where "applicationId" is
+     * The repartitioning topic will be named "${applicationId}-&lt;name&gt;-repartition", where "applicationId" is
      * user-specified in {@link StreamsConfig} via parameter
-     * {@link StreamsConfig#APPLICATION_ID_CONFIG APPLICATION_ID_CONFIG}, "XXX" is an internally generated name, and
+     * {@link StreamsConfig#APPLICATION_ID_CONFIG APPLICATION_ID_CONFIG}, "&lt;name&gt;" is an internally generated name, and
      * "-repartition" is a fixed suffix.
      * <p>
      * Repartitioning can happen for one or both of the joining {@code KStream}s.
@@ -1323,9 +1398,9 @@ void process(final ProcessorSupplier<? super K, ? super V> processorSupplier,
      * Furthermore, both input streams need to be co-partitioned on the join key (i.e., use the same partitioner).
      * If this requirement is not met, Kafka Streams will automatically repartition the data, i.e., it will create an
      * internal repartitioning topic in Kafka and write and re-read the data via this topic before the actual join.
-     * The repartitioning topic will be named "${applicationId}-XXX-repartition", where "applicationId" is
+     * The repartitioning topic will be named "${applicationId}-&lt;name&gt;-repartition", where "applicationId" is
      * user-specified in {@link StreamsConfig} via parameter
-     * {@link StreamsConfig#APPLICATION_ID_CONFIG APPLICATION_ID_CONFIG}, "XXX" is an internally generated name, and
+     * {@link StreamsConfig#APPLICATION_ID_CONFIG APPLICATION_ID_CONFIG}, "&lt;name&gt;" is an internally generated name, and
      * "-repartition" is a fixed suffix.
      * <p>
      * Repartitioning can happen for one or both of the joining {@code KStream}s.
@@ -1408,9 +1483,9 @@ void process(final ProcessorSupplier<? super K, ? super V> processorSupplier,
      * cf. {@link #join(GlobalKTable, KeyValueMapper, ValueJoiner)}.
      * If this requirement is not met, Kafka Streams will automatically repartition the data, i.e., it will create an
      * internal repartitioning topic in Kafka and write and re-read the data via this topic before the actual join.
-     * The repartitioning topic will be named "${applicationId}-XXX-repartition", where "applicationId" is
+     * The repartitioning topic will be named "${applicationId}-&lt;name&gt;-repartition", where "applicationId" is
      * user-specified in {@link StreamsConfig} via parameter
-     * {@link StreamsConfig#APPLICATION_ID_CONFIG APPLICATION_ID_CONFIG}, "XXX" is an internally generated name, and
+     * {@link StreamsConfig#APPLICATION_ID_CONFIG APPLICATION_ID_CONFIG}, "&lt;name&gt;" is an internally generated name, and
      * "-repartition" is a fixed suffix.
      *
      * You can retrieve all generated internal topic names via {@link Topology#describe()}.
@@ -1484,9 +1559,9 @@ void process(final ProcessorSupplier<? super K, ? super V> processorSupplier,
      * cf. {@link #join(GlobalKTable, KeyValueMapper, ValueJoiner)}.
      * If this requirement is not met, Kafka Streams will automatically repartition the data, i.e., it will create an
      * internal repartitioning topic in Kafka and write and re-read the data via this topic before the actual join.
-     * The repartitioning topic will be named "${applicationId}-XXX-repartition", where "applicationId" is
+     * The repartitioning topic will be named "${applicationId}-&lt;name&gt;-repartition", where "applicationId" is
      * user-specified in {@link StreamsConfig} via parameter
-     * {@link StreamsConfig#APPLICATION_ID_CONFIG APPLICATION_ID_CONFIG}, "XXX" is an internally generated name, and
+     * {@link StreamsConfig#APPLICATION_ID_CONFIG APPLICATION_ID_CONFIG}, "&lt;name&gt;" is an internally generated name, and
      * "-repartition" is a fixed suffix.
      *
      * You can retrieve all generated internal topic names via {@link Topology#describe()}.
@@ -1566,9 +1641,9 @@ void process(final ProcessorSupplier<? super K, ? super V> processorSupplier,
      * cf. {@link #join(GlobalKTable, KeyValueMapper, ValueJoiner)}.
      * If this requirement is not met, Kafka Streams will automatically repartition the data, i.e., it will create an
      * internal repartitioning topic in Kafka and write and re-read the data via this topic before the actual join.
-     * The repartitioning topic will be named "${applicationId}-XXX-repartition", where "applicationId" is
+     * The repartitioning topic will be named "${applicationId}-&lt;name&gt;-repartition", where "applicationId" is
      * user-specified in {@link StreamsConfig} via parameter
-     * {@link StreamsConfig#APPLICATION_ID_CONFIG APPLICATION_ID_CONFIG}, "XXX" is an internally generated name, and
+     * {@link StreamsConfig#APPLICATION_ID_CONFIG APPLICATION_ID_CONFIG}, "&lt;name&gt;" is an internally generated name, and
      * "-repartition" is a fixed suffix.
      *
      * You can retrieve all generated internal topic names via {@link Topology#describe()}.
@@ -1645,9 +1720,9 @@ void process(final ProcessorSupplier<? super K, ? super V> processorSupplier,
      * cf. {@link #join(GlobalKTable, KeyValueMapper, ValueJoiner)}.
      * If this requirement is not met, Kafka Streams will automatically repartition the data, i.e., it will create an
      * internal repartitioning topic in Kafka and write and re-read the data via this topic before the actual join.
-     * The repartitioning topic will be named "${applicationId}-XXX-repartition", where "applicationId" is
+     * The repartitioning topic will be named "${applicationId}-&lt;name&gt;-repartition", where "applicationId" is
      * user-specified in {@link StreamsConfig} via parameter
-     * {@link StreamsConfig#APPLICATION_ID_CONFIG APPLICATION_ID_CONFIG}, "XXX" is an internally generated name, and
+     * {@link StreamsConfig#APPLICATION_ID_CONFIG APPLICATION_ID_CONFIG}, "&lt;name&gt;" is an internally generated name, and
      * "-repartition" is a fixed suffix.
      *
      * You can retrieve all generated internal topic names via {@link Topology#describe()}.
diff --git a/streams/src/main/java/org/apache/kafka/streams/kstream/KTable.java b/streams/src/main/java/org/apache/kafka/streams/kstream/KTable.java
index 293bc6b7a86..f9989e83590 100644
--- a/streams/src/main/java/org/apache/kafka/streams/kstream/KTable.java
+++ b/streams/src/main/java/org/apache/kafka/streams/kstream/KTable.java
@@ -23,8 +23,8 @@
 import org.apache.kafka.streams.KeyValue;
 import org.apache.kafka.streams.StreamsBuilder;
 import org.apache.kafka.streams.StreamsConfig;
-import org.apache.kafka.streams.processor.ProcessorContext;
 import org.apache.kafka.streams.Topology;
+import org.apache.kafka.streams.processor.ProcessorContext;
 import org.apache.kafka.streams.processor.StateStore;
 import org.apache.kafka.streams.state.KeyValueBytesStoreSupplier;
 import org.apache.kafka.streams.state.KeyValueStore;
@@ -562,8 +562,8 @@
      * If the new record key is {@code null} the record will not be included in the resulting {@link KGroupedTable}
      * <p>
      * Because a new key is selected, an internal repartitioning topic will be created in Kafka.
-     * This topic will be named "${applicationId}-XXX-repartition", where "applicationId" is user-specified in
-     * {@link  StreamsConfig} via parameter {@link StreamsConfig#APPLICATION_ID_CONFIG APPLICATION_ID_CONFIG}, "XXX" is
+     * This topic will be named "${applicationId}-&lt;name&gt-repartition", where "applicationId" is user-specified in
+     * {@link  StreamsConfig} via parameter {@link StreamsConfig#APPLICATION_ID_CONFIG APPLICATION_ID_CONFIG}, "&lt;name&gt" is
      * an internally generated name, and "-repartition" is a fixed suffix.
      *
      * You can retrieve all generated internal topic names via {@link Topology#describe()}.
@@ -590,12 +590,12 @@
      * provided {@link KeyValueMapper}.
      * Re-grouping a {@code KTable} is required before an aggregation operator can be applied to the data
      * (cf. {@link KGroupedTable}).
-     * The {@link KeyValueMapper} selects a new key and value (with should both have unmodified type).
+     * The {@link KeyValueMapper} selects a new key and value (with both maybe being the same type or a new type).
      * If the new record key is {@code null} the record will not be included in the resulting {@link KGroupedTable}
      * <p>
      * Because a new key is selected, an internal repartitioning topic will be created in Kafka.
-     * This topic will be named "${applicationId}-XXX-repartition", where "applicationId" is user-specified in
-     * {@link  StreamsConfig} via parameter {@link StreamsConfig#APPLICATION_ID_CONFIG APPLICATION_ID_CONFIG}, "XXX" is
+     * This topic will be named "${applicationId}-&lt;name&gt-repartition", where "applicationId" is user-specified in
+     * {@link  StreamsConfig} via parameter {@link StreamsConfig#APPLICATION_ID_CONFIG APPLICATION_ID_CONFIG}, "&lt;name&gt" is
      * an internally generated name, and "-repartition" is a fixed suffix.
      *
      * You can retrieve all generated internal topic names via {@link Topology#describe()}.
@@ -610,10 +610,46 @@
      * @param <KR>          the key type of the result {@link KGroupedTable}
      * @param <VR>          the value type of the result {@link KGroupedTable}
      * @return a {@link KGroupedTable} that contains the re-grouped records of the original {@code KTable}
+     *
+     * @deprecated since 2.1. Use {@link org.apache.kafka.streams.kstream.KTable#groupBy(KeyValueMapper, Grouped)} instead
      */
+    @Deprecated
     <KR, VR> KGroupedTable<KR, VR> groupBy(final KeyValueMapper<? super K, ? super V, KeyValue<KR, VR>> selector,
                                            final Serialized<KR, VR> serialized);
 
+    /**
+     * Re-groups the records of this {@code KTable} using the provided {@link KeyValueMapper}
+     * and {@link Serde}s as specified by {@link Grouped}.
+     * Each {@link KeyValue} pair of this {@code KTable} is mapped to a new {@link KeyValue} pair by applying the
+     * provided {@link KeyValueMapper}.
+     * Re-grouping a {@code KTable} is required before an aggregation operator can be applied to the data
+     * (cf. {@link KGroupedTable}).
+     * The {@link KeyValueMapper} selects a new key and value (where both could the same type or a new type).
+     * If the new record key is {@code null} the record will not be included in the resulting {@link KGroupedTable}
+     * <p>
+     * Because a new key is selected, an internal repartitioning topic will be created in Kafka.
+     * This topic will be named "${applicationId}-&lt;name&gt-repartition", where "applicationId" is user-specified in
+     * {@link  StreamsConfig} via parameter {@link StreamsConfig#APPLICATION_ID_CONFIG APPLICATION_ID_CONFIG},  "&lt;name&gt" is
+     * either provided via {@link org.apache.kafka.streams.kstream.Grouped#as(String)} or an internally generated name.
+     *
+     * <p>
+     * You can retrieve all generated internal topic names via {@link Topology#describe()}.
+     *
+     * <p>
+     * All data of this {@code KTable} will be redistributed through the repartitioning topic by writing all update
+     * records to and rereading all updated records from it, such that the resulting {@link KGroupedTable} is partitioned
+     * on the new key.
+     *
+     * @param selector      a {@link KeyValueMapper} that computes a new grouping key and value to be aggregated
+     * @param grouped       the {@link Grouped} instance used to specify {@link org.apache.kafka.common.serialization.Serdes}
+     *                      and the name for a repartition topic if repartitioning is required.
+     * @param <KR>          the key type of the result {@link KGroupedTable}
+     * @param <VR>          the value type of the result {@link KGroupedTable}
+     * @return a {@link KGroupedTable} that contains the re-grouped records of the original {@code KTable}
+     */
+    <KR, VR> KGroupedTable<KR, VR> groupBy(final KeyValueMapper<? super K, ? super V, KeyValue<KR, VR>> selector,
+                                           final Grouped<KR, VR> grouped);
+
     /**
      * Join records of this {@code KTable} with another {@code KTable}'s records using non-windowed inner equi join,
      * with default serializers, deserializers, and state store.
diff --git a/streams/src/main/java/org/apache/kafka/streams/kstream/Serialized.java b/streams/src/main/java/org/apache/kafka/streams/kstream/Serialized.java
index 0b04696d074..df9c9a111fb 100644
--- a/streams/src/main/java/org/apache/kafka/streams/kstream/Serialized.java
+++ b/streams/src/main/java/org/apache/kafka/streams/kstream/Serialized.java
@@ -24,7 +24,10 @@
  *
  * @param <K> the key type
  * @param <V> the value type
+ *
+ *  @deprecated since 2.1. Use {@link org.apache.kafka.streams.kstream.Grouped)} instead
  */
+@Deprecated
 public class Serialized<K, V> {
 
     protected final Serde<K> keySerde;
diff --git a/streams/src/main/java/org/apache/kafka/streams/kstream/internals/GroupedInternal.java b/streams/src/main/java/org/apache/kafka/streams/kstream/internals/GroupedInternal.java
new file mode 100644
index 00000000000..2360fc64464
--- /dev/null
+++ b/streams/src/main/java/org/apache/kafka/streams/kstream/internals/GroupedInternal.java
@@ -0,0 +1,40 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.kafka.streams.kstream.internals;
+
+import org.apache.kafka.common.serialization.Serde;
+import org.apache.kafka.streams.kstream.Grouped;
+
+public class GroupedInternal<K, V> extends Grouped<K, V> {
+
+    GroupedInternal(final Grouped<K, V> grouped) {
+        super(grouped);
+    }
+
+    public Serde<K> keySerde() {
+        return keySerde;
+    }
+
+    public Serde<V> valueSerde() {
+        return valueSerde;
+    }
+
+    public String name() {
+        return name;
+    }
+}
diff --git a/streams/src/main/java/org/apache/kafka/streams/kstream/internals/GroupedStreamAggregateBuilder.java b/streams/src/main/java/org/apache/kafka/streams/kstream/internals/GroupedStreamAggregateBuilder.java
index 9791db6495d..3439cf591be 100644
--- a/streams/src/main/java/org/apache/kafka/streams/kstream/internals/GroupedStreamAggregateBuilder.java
+++ b/streams/src/main/java/org/apache/kafka/streams/kstream/internals/GroupedStreamAggregateBuilder.java
@@ -36,6 +36,7 @@
     private final Serde<K> keySerde;
     private final Serde<V> valueSerde;
     private final boolean repartitionRequired;
+    private final String userName;
     private final Set<String> sourceNodes;
     private final String name;
     private final StreamsGraphNode streamsGraphNode;
@@ -47,20 +48,20 @@
     final Initializer<V> reduceInitializer = () -> null;
 
     GroupedStreamAggregateBuilder(final InternalStreamsBuilder builder,
-                                  final Serde<K> keySerde,
-                                  final Serde<V> valueSerde,
+                                  final GroupedInternal<K, V> groupedInternal,
                                   final boolean repartitionRequired,
                                   final Set<String> sourceNodes,
                                   final String name,
                                   final StreamsGraphNode streamsGraphNode) {
 
         this.builder = builder;
-        this.keySerde = keySerde;
-        this.valueSerde = valueSerde;
+        this.keySerde = groupedInternal.keySerde();
+        this.valueSerde = groupedInternal.valueSerde();
         this.repartitionRequired = repartitionRequired;
         this.sourceNodes = sourceNodes;
         this.name = name;
         this.streamsGraphNode = streamsGraphNode;
+        this.userName = groupedInternal.name();
     }
 
     <KR, T> KTable<KR, T> build(final String functionName,
@@ -74,7 +75,7 @@
 
         final OptimizableRepartitionNode.OptimizableRepartitionNodeBuilder<K, V> repartitionNodeBuilder = OptimizableRepartitionNode.optimizableRepartitionNodeBuilder();
 
-        final String sourceName = repartitionIfRequired(storeBuilder.name(), repartitionNodeBuilder);
+        final String sourceName = repartitionIfRequired(userName != null ? userName : storeBuilder.name(), repartitionNodeBuilder);
 
         StreamsGraphNode parentNode = streamsGraphNode;
 
@@ -105,19 +106,20 @@
                                 aggregateSupplier,
                                 statefulProcessorNode,
                                 builder);
+
     }
 
     /**
      * @return the new sourceName if repartitioned. Otherwise the name of this stream
      */
-    private String repartitionIfRequired(final String queryableStoreName,
+    private String repartitionIfRequired(final String repartitionTopicNamePrefix,
                                          final OptimizableRepartitionNode.OptimizableRepartitionNodeBuilder<K, V> optimizableRepartitionNodeBuilder) {
         if (!repartitionRequired) {
             return this.name;
         }
         // if repartition required the operation
         // captured needs to be set in the graph
-        return KStreamImpl.createRepartitionedSource(builder, keySerde, valueSerde, queryableStoreName, name, optimizableRepartitionNodeBuilder);
+        return KStreamImpl.createRepartitionedSource(builder, keySerde, valueSerde, repartitionTopicNamePrefix, optimizableRepartitionNodeBuilder);
 
     }
 }
diff --git a/streams/src/main/java/org/apache/kafka/streams/kstream/internals/InternalStreamsBuilder.java b/streams/src/main/java/org/apache/kafka/streams/kstream/internals/InternalStreamsBuilder.java
index 8f767408fb4..20df0847ccf 100644
--- a/streams/src/main/java/org/apache/kafka/streams/kstream/internals/InternalStreamsBuilder.java
+++ b/streams/src/main/java/org/apache/kafka/streams/kstream/internals/InternalStreamsBuilder.java
@@ -21,9 +21,9 @@
 import org.apache.kafka.streams.StreamsConfig;
 import org.apache.kafka.streams.errors.StreamsException;
 import org.apache.kafka.streams.kstream.GlobalKTable;
+import org.apache.kafka.streams.kstream.Grouped;
 import org.apache.kafka.streams.kstream.KStream;
 import org.apache.kafka.streams.kstream.KTable;
-import org.apache.kafka.streams.kstream.Serialized;
 import org.apache.kafka.streams.kstream.internals.graph.GlobalStoreNode;
 import org.apache.kafka.streams.kstream.internals.graph.OptimizableRepartitionNode;
 import org.apache.kafka.streams.kstream.internals.graph.ProcessorParameters;
@@ -43,7 +43,8 @@
 import java.util.Collections;
 import java.util.Comparator;
 import java.util.HashMap;
-import java.util.HashSet;
+import java.util.LinkedHashMap;
+import java.util.LinkedHashSet;
 import java.util.Map;
 import java.util.Objects;
 import java.util.PriorityQueue;
@@ -59,8 +60,8 @@
     private final AtomicInteger index = new AtomicInteger(0);
 
     private final AtomicInteger buildPriorityIndex = new AtomicInteger(0);
-    private final Map<StreamsGraphNode, Set<OptimizableRepartitionNode>> keyChangingOperationsToOptimizableRepartitionNodes = new HashMap<>();
-    private final Set<StreamsGraphNode> mergeNodes = new HashSet<>();
+    private final LinkedHashMap<StreamsGraphNode, LinkedHashSet<OptimizableRepartitionNode>> keyChangingOperationsToOptimizableRepartitionNodes = new LinkedHashMap<>();
+    private final LinkedHashSet<StreamsGraphNode> mergeNodes = new LinkedHashSet<>();
 
     private static final String TOPOLOGY_ROOT = "root";
     private static final Logger LOG = LoggerFactory.getLogger(InternalStreamsBuilder.class);
@@ -251,7 +252,7 @@ private void maybeAddNodeForOptimizationMetadata(final StreamsGraphNode node) {
         }
 
         if (node.isKeyChangingOperation()) {
-            keyChangingOperationsToOptimizableRepartitionNodes.put(node, new HashSet<>());
+            keyChangingOperationsToOptimizableRepartitionNodes.put(node, new LinkedHashSet<>());
         } else if (node instanceof OptimizableRepartitionNode) {
             final StreamsGraphNode parentNode = getKeyChangingParentNode(node);
             if (parentNode != null) {
@@ -305,7 +306,7 @@ private void maybePerformOptimizations(final Properties props) {
     private void maybeOptimizeRepartitionOperations() {
         maybeUpdateKeyChangingRepartitionNodeMap();
 
-        for (final Map.Entry<StreamsGraphNode, Set<OptimizableRepartitionNode>> entry : keyChangingOperationsToOptimizableRepartitionNodes.entrySet()) {
+        for (final Map.Entry<StreamsGraphNode, LinkedHashSet<OptimizableRepartitionNode>> entry : keyChangingOperationsToOptimizableRepartitionNodes.entrySet()) {
 
             final StreamsGraphNode keyChangingNode = entry.getKey();
 
@@ -313,11 +314,13 @@ private void maybeOptimizeRepartitionOperations() {
                 continue;
             }
 
-            final SerializedInternal serialized = new SerializedInternal(getRepartitionSerdes(entry.getValue()));
+            final GroupedInternal groupedInternal = new GroupedInternal(getRepartitionSerdes(entry.getValue()));
 
-            final StreamsGraphNode optimizedSingleRepartition = createRepartitionNode(keyChangingNode.nodeName(),
-                                                                                      serialized.keySerde(),
-                                                                                      serialized.valueSerde());
+            final String repartitionTopicName = getFirstRepartitionTopicName(entry.getValue());
+            //passing in the name of the first repartition topic, re-used to create the optimized repartition topic
+            final StreamsGraphNode optimizedSingleRepartition = createRepartitionNode(repartitionTopicName,
+                                                                                      groupedInternal.keySerde(),
+                                                                                      groupedInternal.valueSerde());
 
             // re-use parent buildPriority to make sure the single repartition graph node is evaluated before downstream nodes
             optimizedSingleRepartition.setBuildPriority(keyChangingNode.buildPriority());
@@ -366,7 +369,7 @@ private void maybeOptimizeRepartitionOperations() {
     private void maybeUpdateKeyChangingRepartitionNodeMap() {
         final Map<StreamsGraphNode, Set<StreamsGraphNode>> mergeNodesToKeyChangers = new HashMap<>();
         for (final StreamsGraphNode mergeNode : mergeNodes) {
-            mergeNodesToKeyChangers.put(mergeNode, new HashSet<>());
+            mergeNodesToKeyChangers.put(mergeNode, new LinkedHashSet<>());
             final Collection<StreamsGraphNode> keys = keyChangingOperationsToOptimizableRepartitionNodes.keySet();
             for (final StreamsGraphNode key : keys) {
                 final StreamsGraphNode maybeParentKey = findParentNodeMatching(mergeNode, node -> node.parentNodes().contains(key));
@@ -379,7 +382,7 @@ private void maybeUpdateKeyChangingRepartitionNodeMap() {
         for (final Map.Entry<StreamsGraphNode, Set<StreamsGraphNode>> entry : mergeNodesToKeyChangers.entrySet()) {
             final StreamsGraphNode mergeKey = entry.getKey();
             final Collection<StreamsGraphNode> keyChangingParents = entry.getValue();
-            final Set<OptimizableRepartitionNode> repartitionNodes = new HashSet<>();
+            final LinkedHashSet<OptimizableRepartitionNode> repartitionNodes = new LinkedHashSet<>();
             for (final StreamsGraphNode keyChangingParent : keyChangingParents) {
                 repartitionNodes.addAll(keyChangingOperationsToOptimizableRepartitionNodes.get(keyChangingParent));
                 keyChangingOperationsToOptimizableRepartitionNodes.remove(keyChangingParent);
@@ -390,7 +393,7 @@ private void maybeUpdateKeyChangingRepartitionNodeMap() {
     }
 
     @SuppressWarnings("unchecked")
-    private OptimizableRepartitionNode createRepartitionNode(final String name,
+    private OptimizableRepartitionNode createRepartitionNode(final String repartitionTopicName,
                                                              final Serde keySerde,
                                                              final Serde valueSerde) {
 
@@ -398,10 +401,14 @@ private OptimizableRepartitionNode createRepartitionNode(final String name,
         KStreamImpl.createRepartitionedSource(this,
                                               keySerde,
                                               valueSerde,
-                                              name + "-optimized",
-                                              name,
+                                              repartitionTopicName,
                                               repartitionNodeBuilder);
 
+        // ensures setting the repartition topic to the name of the
+        // first repartition topic to get merged
+        // this may be an auto-generated name or a user specified name
+        repartitionNodeBuilder.withRepartitionTopic(repartitionTopicName);
+
         return repartitionNodeBuilder.build();
 
     }
@@ -416,8 +423,12 @@ private StreamsGraphNode getKeyChangingParentNode(final StreamsGraphNode reparti
         return null;
     }
 
+    private String getFirstRepartitionTopicName(final Collection<OptimizableRepartitionNode> repartitionNodes) {
+        return repartitionNodes.iterator().next().repartitionTopic();
+    }
+
     @SuppressWarnings("unchecked")
-    private SerializedInternal getRepartitionSerdes(final Collection<OptimizableRepartitionNode> repartitionNodes) {
+    private GroupedInternal getRepartitionSerdes(final Collection<OptimizableRepartitionNode> repartitionNodes) {
         Serde keySerde = null;
         Serde valueSerde = null;
 
@@ -435,7 +446,7 @@ private SerializedInternal getRepartitionSerdes(final Collection<OptimizableRepa
             }
         }
 
-        return new SerializedInternal(Serialized.with(keySerde, valueSerde));
+        return new GroupedInternal(Grouped.with(keySerde, valueSerde));
     }
 
     private StreamsGraphNode findParentNodeMatching(final StreamsGraphNode startSeekingNode,
diff --git a/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KGroupedStreamImpl.java b/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KGroupedStreamImpl.java
index da2eeb6ab8f..e53e37f3f65 100644
--- a/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KGroupedStreamImpl.java
+++ b/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KGroupedStreamImpl.java
@@ -16,7 +16,6 @@
  */
 package org.apache.kafka.streams.kstream.internals;
 
-import org.apache.kafka.common.serialization.Serde;
 import org.apache.kafka.common.serialization.Serdes;
 import org.apache.kafka.common.utils.Bytes;
 import org.apache.kafka.streams.kstream.Aggregator;
@@ -45,17 +44,15 @@
     private final GroupedStreamAggregateBuilder<K, V> aggregateBuilder;
 
     KGroupedStreamImpl(final String name,
-                       final Serde<K> keySerde,
-                       final Serde<V> valSerde,
                        final Set<String> sourceNodes,
+                       final GroupedInternal<K, V> groupedInternal,
                        final boolean repartitionRequired,
                        final StreamsGraphNode streamsGraphNode,
                        final InternalStreamsBuilder builder) {
-        super(name, keySerde, valSerde, sourceNodes, streamsGraphNode, builder);
+        super(name, groupedInternal.keySerde(), groupedInternal.valueSerde(), sourceNodes, streamsGraphNode, builder);
         this.aggregateBuilder = new GroupedStreamAggregateBuilder<>(
             builder,
-            keySerde,
-            valSerde,
+            groupedInternal,
             repartitionRequired,
             sourceNodes,
             name,
@@ -165,7 +162,7 @@
             name,
             keySerde,
             valSerde,
-            repartitionRequired,
+            aggregateBuilder,
             streamsGraphNode
         );
     }
diff --git a/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KGroupedTableImpl.java b/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KGroupedTableImpl.java
index 6ec3c0d1481..c97576b8aa0 100644
--- a/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KGroupedTableImpl.java
+++ b/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KGroupedTableImpl.java
@@ -16,7 +16,6 @@
  */
 package org.apache.kafka.streams.kstream.internals;
 
-import org.apache.kafka.common.serialization.Serde;
 import org.apache.kafka.common.serialization.Serdes;
 import org.apache.kafka.common.utils.Bytes;
 import org.apache.kafka.streams.kstream.Aggregator;
@@ -48,6 +47,8 @@
 
     private static final String REDUCE_NAME = "KTABLE-REDUCE-";
 
+    protected final String userSpecifiedName;
+
     private final Initializer<Long> countInitializer = () -> 0L;
 
     private final Aggregator<K, V, Long> countAdder = (aggKey, value, aggregate) -> aggregate + 1L;
@@ -57,10 +58,11 @@
     KGroupedTableImpl(final InternalStreamsBuilder builder,
                       final String name,
                       final Set<String> sourceNodes,
-                      final Serde<K> keySerde,
-                      final Serde<V> valSerde,
+                      final GroupedInternal<K, V> groupedInternal,
                       final StreamsGraphNode streamsGraphNode) {
-        super(name, keySerde, valSerde, sourceNodes, streamsGraphNode, builder);
+        super(name, groupedInternal.keySerde(), groupedInternal.valueSerde(), sourceNodes, streamsGraphNode, builder);
+
+        this.userSpecifiedName = groupedInternal.name();
     }
 
     private <T> KTable<K, T> doAggregate(final ProcessorSupplier<K, Change<V>> aggregateSupplier,
@@ -69,9 +71,10 @@
         final String sinkName = builder.newProcessorName(KStreamImpl.SINK_NAME);
         final String sourceName = builder.newProcessorName(KStreamImpl.SOURCE_NAME);
         final String funcName = builder.newProcessorName(functionName);
-        final String topic = materialized.storeName() + KStreamImpl.REPARTITION_TOPIC_SUFFIX;
+        final String repartitionTopic = (userSpecifiedName != null ? userSpecifiedName : materialized.storeName())
+                + KStreamImpl.REPARTITION_TOPIC_SUFFIX;
 
-        final StreamsGraphNode repartitionNode = createRepartitionNode(sinkName, sourceName, topic);
+        final StreamsGraphNode repartitionNode = createRepartitionNode(sinkName, sourceName, repartitionTopic);
 
         // the passed in StreamsGraphNode must be the parent of the repartition node
         builder.addGraphNode(this.streamsGraphNode, repartitionNode);
diff --git a/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KStreamImpl.java b/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KStreamImpl.java
index 2a3bc8f9230..168e210cbe4 100644
--- a/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KStreamImpl.java
+++ b/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KStreamImpl.java
@@ -20,6 +20,7 @@
 import org.apache.kafka.streams.KeyValue;
 import org.apache.kafka.streams.kstream.ForeachAction;
 import org.apache.kafka.streams.kstream.GlobalKTable;
+import org.apache.kafka.streams.kstream.Grouped;
 import org.apache.kafka.streams.kstream.JoinWindows;
 import org.apache.kafka.streams.kstream.Joined;
 import org.apache.kafka.streams.kstream.KGroupedStream;
@@ -565,11 +566,15 @@ public void process(final ProcessorSupplier<? super K, ? super V> processorSuppl
         KStreamImpl<K, V1> joinOther = (KStreamImpl<K, V1>) other;
 
         if (joinThis.repartitionRequired) {
-            joinThis = joinThis.repartitionForJoin(joined);
+            final String leftJoinRepartitionTopicName = joined.name() != null ? joined.name() + "-left" : joinThis.name;
+
+            joinThis = joinThis.repartitionForJoin(Joined.with(joined.keySerde(), joined.valueSerde(), joined.otherValueSerde(), leftJoinRepartitionTopicName));
         }
 
         if (joinOther.repartitionRequired) {
-            joinOther = joinOther.repartitionForJoin(Joined.with(joined.keySerde(), joined.otherValueSerde(), joined.valueSerde()));
+            final String rightJoinRepartitionTopicName = joined.name() != null ? joined.name() + "-right" : joinOther.name;
+            final Joined newJoined = Joined.with(joined.keySerde(), joined.valueSerde(), joined.otherValueSerde(), rightJoinRepartitionTopicName);
+            joinOther = joinOther.repartitionForJoin(newJoined);
         }
 
         joinThis.ensureJoinableWith(joinOther);
@@ -597,8 +602,7 @@ public void process(final ProcessorSupplier<? super K, ? super V> processorSuppl
         final String repartitionedSourceName = createRepartitionedSource(builder,
                                                                          repartitionKeySerde,
                                                                          repartitionValueSerde,
-                                                                         null,
-                                                                         name,
+                                                                         joined.name(),
                                                                          optimizableRepartitionNodeBuilder);
 
         final OptimizableRepartitionNode<K, V> optimizableRepartitionNode = optimizableRepartitionNodeBuilder.build();
@@ -610,12 +614,11 @@ public void process(final ProcessorSupplier<? super K, ? super V> processorSuppl
     static <K1, V1> String createRepartitionedSource(final InternalStreamsBuilder builder,
                                                      final Serde<K1> keySerde,
                                                      final Serde<V1> valSerde,
-                                                     final String topicNamePrefix,
-                                                     final String name,
+                                                     final String repartitionTopicNamePrefix,
                                                      final OptimizableRepartitionNode.OptimizableRepartitionNodeBuilder<K1, V1> optimizableRepartitionNodeBuilder) {
 
-        final String baseName = topicNamePrefix != null ? topicNamePrefix : name;
-        final String repartitionTopic = baseName + REPARTITION_TOPIC_SUFFIX;
+
+        final String repartitionTopic = repartitionTopicNamePrefix + REPARTITION_TOPIC_SUFFIX;
         final String sinkName = builder.newProcessorName(SINK_NAME);
         final String nullKeyFilterProcessorName = builder.newProcessorName(FILTER_NAME);
         final String sourceName = builder.newProcessorName(SOURCE_NAME);
@@ -677,7 +680,8 @@ public void process(final ProcessorSupplier<? super K, ? super V> processorSuppl
         Objects.requireNonNull(joiner, "joiner can't be null");
         Objects.requireNonNull(joined, "joined can't be null");
         if (repartitionRequired) {
-            final KStreamImpl<K, V> thisStreamRepartitioned = repartitionForJoin(joined);
+            final Joined<K, V, ?> updatedJoined = joined.name() != null ? joined : joined.withName(name);
+            final KStreamImpl<K, V> thisStreamRepartitioned = repartitionForJoin(updatedJoined);
             return thisStreamRepartitioned.doStreamTableJoin(other, joiner, joined, false);
         } else {
             return doStreamTableJoin(other, joiner, joined, false);
@@ -697,7 +701,8 @@ public void process(final ProcessorSupplier<? super K, ? super V> processorSuppl
         Objects.requireNonNull(joiner, "joiner can't be null");
         Objects.requireNonNull(joined, "joined can't be null");
         if (repartitionRequired) {
-            final KStreamImpl<K, V> thisStreamRepartitioned = repartitionForJoin(joined);
+            final Joined<K, V, ?> updatedJoined = joined.name() != null ? joined : joined.withName(name);
+            final KStreamImpl<K, V> thisStreamRepartitioned = repartitionForJoin(updatedJoined);
             return thisStreamRepartitioned.doStreamTableJoin(other, joiner, joined, true);
         } else {
             return doStreamTableJoin(other, joiner, joined, true);
@@ -776,47 +781,68 @@ public void process(final ProcessorSupplier<? super K, ? super V> processorSuppl
 
         // do not have serde for joined result
         return new KStreamImpl<>(name, joined.keySerde() != null ? joined.keySerde() : keySerde, null, allSourceNodes, false, streamTableJoinNode, builder);
+
     }
 
     @Override
     public <K1> KGroupedStream<K1, V> groupBy(final KeyValueMapper<? super K, ? super V, K1> selector) {
-        return groupBy(selector, Serialized.with(null, null));
+        return groupBy(selector, Grouped.with(null, valSerde));
     }
 
     @Override
+    @Deprecated
     public <KR> KGroupedStream<KR, V> groupBy(final KeyValueMapper<? super K, ? super V, KR> selector,
                                               final Serialized<KR, V> serialized) {
         Objects.requireNonNull(selector, "selector can't be null");
         Objects.requireNonNull(serialized, "serialized can't be null");
         final SerializedInternal<KR, V> serializedInternal = new SerializedInternal<>(serialized);
+
+        return groupBy(selector, Grouped.with(serializedInternal.keySerde(), serializedInternal.valueSerde()));
+    }
+
+    @Override
+    public <KR> KGroupedStream<KR, V> groupBy(final KeyValueMapper<? super K, ? super V, KR> selector,
+                                              final Grouped<KR, V> grouped) {
+        Objects.requireNonNull(selector, "selector can't be null");
+        Objects.requireNonNull(grouped, "grouped can't be null");
+        final GroupedInternal<KR, V> groupedInternal = new GroupedInternal<>(grouped);
         final ProcessorGraphNode<K, V> selectKeyMapNode = internalSelectKey(selector);
         selectKeyMapNode.keyChangingOperation(true);
 
         builder.addGraphNode(this.streamsGraphNode, selectKeyMapNode);
-        return new KGroupedStreamImpl<>(selectKeyMapNode.nodeName(),
-                                        serializedInternal.keySerde(),
-                                        serializedInternal.valueSerde() != null ? serializedInternal.valueSerde() : valSerde,
-                                        sourceNodes,
-                                        true,
-                                        selectKeyMapNode,
-                                        builder);
+
+        return new KGroupedStreamImpl<>(
+            selectKeyMapNode.nodeName(),
+            sourceNodes,
+            groupedInternal,
+            true,
+            selectKeyMapNode,
+            builder);
     }
 
     @Override
     public KGroupedStream<K, V> groupByKey() {
-        return groupByKey(Serialized.with(null, null));
+        return groupByKey(Grouped.with(keySerde, valSerde));
     }
 
     @Override
+    @Deprecated
     public KGroupedStream<K, V> groupByKey(final Serialized<K, V> serialized) {
         final SerializedInternal<K, V> serializedInternal = new SerializedInternal<>(serialized);
-        return new KGroupedStreamImpl<>(this.name,
-                                        serializedInternal.keySerde() != null ? serializedInternal.keySerde() : keySerde,
-                                        serializedInternal.valueSerde() != null ? serializedInternal.valueSerde() : valSerde,
-                                        sourceNodes,
-                                        this.repartitionRequired,
-                                        streamsGraphNode,
-                                        builder);
+        return groupByKey(Grouped.with(serializedInternal.keySerde(), serializedInternal.valueSerde()));
+    }
+
+    @Override
+    public KGroupedStream<K, V> groupByKey(final Grouped<K, V> grouped) {
+        final GroupedInternal<K, V> groupedInternal = new GroupedInternal<>(grouped);
+
+        return new KGroupedStreamImpl<>(
+            name,
+            sourceNodes,
+            groupedInternal,
+            repartitionRequired,
+            streamsGraphNode,
+            builder);
     }
 
     @SuppressWarnings("deprecation") // continuing to support Windows#maintainMs/segmentInterval in fallback mode
diff --git a/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KTableImpl.java b/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KTableImpl.java
index c5b29702c7c..53e7a4ba3bd 100644
--- a/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KTableImpl.java
+++ b/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KTableImpl.java
@@ -19,6 +19,7 @@
 import org.apache.kafka.common.serialization.Serde;
 import org.apache.kafka.common.utils.Bytes;
 import org.apache.kafka.streams.KeyValue;
+import org.apache.kafka.streams.kstream.Grouped;
 import org.apache.kafka.streams.kstream.KGroupedTable;
 import org.apache.kafka.streams.kstream.KStream;
 import org.apache.kafka.streams.kstream.KTable;
@@ -552,14 +553,24 @@ public String queryableStoreName() {
 
     @Override
     public <K1, V1> KGroupedTable<K1, V1> groupBy(final KeyValueMapper<? super K, ? super V, KeyValue<K1, V1>> selector) {
-        return groupBy(selector, Serialized.with(null, null));
+        return this.groupBy(selector, Grouped.with(null, null));
     }
 
     @Override
+    @Deprecated
     public <K1, V1> KGroupedTable<K1, V1> groupBy(final KeyValueMapper<? super K, ? super V, KeyValue<K1, V1>> selector,
                                                   final Serialized<K1, V1> serialized) {
         Objects.requireNonNull(selector, "selector can't be null");
         Objects.requireNonNull(serialized, "serialized can't be null");
+        final SerializedInternal<K1, V1> serializedInternal = new SerializedInternal<>(serialized);
+        return groupBy(selector, Grouped.with(serializedInternal.keySerde(), serializedInternal.valueSerde()));
+    }
+
+    @Override
+    public <K1, V1> KGroupedTable<K1, V1> groupBy(final KeyValueMapper<? super K, ? super V, KeyValue<K1, V1>> selector,
+                                                  final Grouped<K1, V1> grouped) {
+        Objects.requireNonNull(selector, "selector can't be null");
+        Objects.requireNonNull(grouped, "grouped can't be null");
         final String selectName = builder.newProcessorName(SELECT_NAME);
 
         final KTableProcessorSupplier<K, V, KeyValue<K1, V1>> selectSupplier = new KTableRepartitionMap<>(this, selector);
@@ -571,18 +582,13 @@ public String queryableStoreName() {
         builder.addGraphNode(this.streamsGraphNode, groupByMapNode);
 
         this.enableSendingOldValues();
-
-        final SerializedInternal<K1, V1> serializedInternal = new SerializedInternal<>(serialized);
-
-        // we cannot inherit parent key and value serdes since both of them may have changed;
-        // we can only inherit from what serialized specified here
+        final GroupedInternal<K1, V1> groupedInternal = new GroupedInternal<>(grouped);
         return new KGroupedTableImpl<>(
-            builder,
-            selectName,
-            sourceNodes,
-            serializedInternal.keySerde(),
-            serializedInternal.valueSerde(),
-            groupByMapNode
+                builder,
+                selectName,
+                sourceNodes,
+                groupedInternal,
+                groupByMapNode
         );
     }
 
diff --git a/streams/src/main/java/org/apache/kafka/streams/kstream/internals/SerializedInternal.java b/streams/src/main/java/org/apache/kafka/streams/kstream/internals/SerializedInternal.java
index c6df11fb562..0cb7050cd8e 100644
--- a/streams/src/main/java/org/apache/kafka/streams/kstream/internals/SerializedInternal.java
+++ b/streams/src/main/java/org/apache/kafka/streams/kstream/internals/SerializedInternal.java
@@ -19,6 +19,7 @@
 import org.apache.kafka.common.serialization.Serde;
 import org.apache.kafka.streams.kstream.Serialized;
 
+@Deprecated
 public class SerializedInternal<K, V> extends Serialized<K, V> {
     public SerializedInternal(final Serialized<K, V> serialized) {
         super(serialized);
diff --git a/streams/src/main/java/org/apache/kafka/streams/kstream/internals/TimeWindowedKStreamImpl.java b/streams/src/main/java/org/apache/kafka/streams/kstream/internals/TimeWindowedKStreamImpl.java
index 2ee8f7c5958..8519671f6b6 100644
--- a/streams/src/main/java/org/apache/kafka/streams/kstream/internals/TimeWindowedKStreamImpl.java
+++ b/streams/src/main/java/org/apache/kafka/streams/kstream/internals/TimeWindowedKStreamImpl.java
@@ -52,11 +52,11 @@
                             final String name,
                             final Serde<K> keySerde,
                             final Serde<V> valSerde,
-                            final boolean repartitionRequired,
+                            final GroupedStreamAggregateBuilder<K, V> aggregateBuilder,
                             final StreamsGraphNode streamsGraphNode) {
         super(name, keySerde, valSerde, sourceNodes, streamsGraphNode, builder);
         this.windows = Objects.requireNonNull(windows, "windows can't be null");
-        this.aggregateBuilder = new GroupedStreamAggregateBuilder<>(builder, keySerde, valSerde, repartitionRequired, sourceNodes, name, streamsGraphNode);
+        this.aggregateBuilder = aggregateBuilder;
     }
 
     @Override
diff --git a/streams/src/main/java/org/apache/kafka/streams/kstream/internals/graph/OptimizableRepartitionNode.java b/streams/src/main/java/org/apache/kafka/streams/kstream/internals/graph/OptimizableRepartitionNode.java
index 4d81b1f699c..05ec6fb89b5 100644
--- a/streams/src/main/java/org/apache/kafka/streams/kstream/internals/graph/OptimizableRepartitionNode.java
+++ b/streams/src/main/java/org/apache/kafka/streams/kstream/internals/graph/OptimizableRepartitionNode.java
@@ -54,6 +54,10 @@
         return valueSerde;
     }
 
+    public String repartitionTopic() {
+        return repartitionTopic;
+    }
+
     @Override
     Serializer<V> getValueSerializer() {
         return valueSerde != null ? valueSerde.serializer() : null;
diff --git a/streams/src/test/java/org/apache/kafka/streams/integration/RepartitionOptimizingIntegrationTest.java b/streams/src/test/java/org/apache/kafka/streams/integration/RepartitionOptimizingIntegrationTest.java
index 5eebf0411f3..cde2349fe5c 100644
--- a/streams/src/test/java/org/apache/kafka/streams/integration/RepartitionOptimizingIntegrationTest.java
+++ b/streams/src/test/java/org/apache/kafka/streams/integration/RepartitionOptimizingIntegrationTest.java
@@ -18,6 +18,7 @@
 package org.apache.kafka.streams.integration;
 
 
+import kafka.utils.MockTime;
 import org.apache.kafka.common.serialization.IntegerDeserializer;
 import org.apache.kafka.common.serialization.LongDeserializer;
 import org.apache.kafka.common.serialization.Serdes;
@@ -58,8 +59,6 @@
 import java.util.regex.Matcher;
 import java.util.regex.Pattern;
 
-import kafka.utils.MockTime;
-
 import static org.hamcrest.CoreMatchers.equalTo;
 import static org.junit.Assert.assertEquals;
 import static org.junit.Assert.assertThat;
@@ -273,11 +272,11 @@ public void process(final String key, final String value) {
                                                               + "    Processor: KSTREAM-PROCESSOR-0000000004 (stores: [])\n"
                                                               + "      --> none\n"
                                                               + "      <-- KSTREAM-MAPVALUES-0000000003\n"
-                                                              + "    Sink: KSTREAM-SINK-0000000039 (topic: KSTREAM-MAP-0000000001-optimized-repartition)\n"
+                                                              + "    Sink: KSTREAM-SINK-0000000039 (topic: KSTREAM-AGGREGATE-STATE-STORE-0000000006-repartition)\n"
                                                               + "      <-- KSTREAM-FILTER-0000000040\n"
                                                               + "\n"
                                                               + "  Sub-topology: 1\n"
-                                                              + "    Source: KSTREAM-SOURCE-0000000041 (topics: [KSTREAM-MAP-0000000001-optimized-repartition])\n"
+                                                              + "    Source: KSTREAM-SOURCE-0000000041 (topics: [KSTREAM-AGGREGATE-STATE-STORE-0000000006-repartition])\n"
                                                               + "      --> KSTREAM-FILTER-0000000020, KSTREAM-AGGREGATE-0000000007, KSTREAM-AGGREGATE-0000000014, KSTREAM-FILTER-0000000029\n"
                                                               + "    Processor: KSTREAM-AGGREGATE-0000000007 (stores: [KSTREAM-AGGREGATE-STATE-STORE-0000000006])\n"
                                                               + "      --> KTABLE-TOSTREAM-0000000011\n"
diff --git a/streams/src/test/java/org/apache/kafka/streams/integration/RepartitionWithMergeOptimizingIntegrationTest.java b/streams/src/test/java/org/apache/kafka/streams/integration/RepartitionWithMergeOptimizingIntegrationTest.java
index af1f5f19fd5..29901a616d4 100644
--- a/streams/src/test/java/org/apache/kafka/streams/integration/RepartitionWithMergeOptimizingIntegrationTest.java
+++ b/streams/src/test/java/org/apache/kafka/streams/integration/RepartitionWithMergeOptimizingIntegrationTest.java
@@ -18,6 +18,7 @@
 package org.apache.kafka.streams.integration;
 
 
+import kafka.utils.MockTime;
 import org.apache.kafka.common.serialization.LongDeserializer;
 import org.apache.kafka.common.serialization.Serdes;
 import org.apache.kafka.common.serialization.StringDeserializer;
@@ -49,8 +50,6 @@
 import java.util.regex.Matcher;
 import java.util.regex.Pattern;
 
-import kafka.utils.MockTime;
-
 import static org.junit.Assert.assertEquals;
 
 @Category({IntegrationTest.class})
@@ -214,11 +213,11 @@ private int getCountOfRepartitionTopicsFound(final String topologyString) {
                                                               + "    Processor: KSTREAM-FILTER-0000000021 (stores: [])\n"
                                                               + "      --> KSTREAM-SINK-0000000020\n"
                                                               + "      <-- KSTREAM-MERGE-0000000004\n"
-                                                              + "    Sink: KSTREAM-SINK-0000000020 (topic: KSTREAM-MERGE-0000000004-optimized-repartition)\n"
+                                                              + "    Sink: KSTREAM-SINK-0000000020 (topic: KSTREAM-AGGREGATE-STATE-STORE-0000000005-repartition)\n"
                                                               + "      <-- KSTREAM-FILTER-0000000021\n"
                                                               + "\n"
                                                               + "  Sub-topology: 1\n"
-                                                              + "    Source: KSTREAM-SOURCE-0000000022 (topics: [KSTREAM-MERGE-0000000004-optimized-repartition])\n"
+                                                              + "    Source: KSTREAM-SOURCE-0000000022 (topics: [KSTREAM-AGGREGATE-STATE-STORE-0000000005-repartition])\n"
                                                               + "      --> KSTREAM-AGGREGATE-0000000006, KSTREAM-AGGREGATE-0000000013\n"
                                                               + "    Processor: KSTREAM-AGGREGATE-0000000013 (stores: [KSTREAM-AGGREGATE-STATE-STORE-0000000012])\n"
                                                               + "      --> KTABLE-TOSTREAM-0000000017\n"
diff --git a/streams/src/test/java/org/apache/kafka/streams/kstream/RepartitionTopicNamingTest.java b/streams/src/test/java/org/apache/kafka/streams/kstream/RepartitionTopicNamingTest.java
new file mode 100644
index 00000000000..872ae5ccd42
--- /dev/null
+++ b/streams/src/test/java/org/apache/kafka/streams/kstream/RepartitionTopicNamingTest.java
@@ -0,0 +1,610 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.kafka.streams.kstream;
+
+import org.apache.kafka.common.serialization.Serdes;
+import org.apache.kafka.streams.KeyValue;
+import org.apache.kafka.streams.StreamsBuilder;
+import org.apache.kafka.streams.StreamsConfig;
+import org.apache.kafka.streams.Topology;
+import org.apache.kafka.streams.errors.TopologyException;
+import org.apache.kafka.streams.processor.AbstractProcessor;
+import org.junit.Test;
+
+import java.util.ArrayList;
+import java.util.List;
+import java.util.Locale;
+import java.util.Properties;
+import java.util.regex.Matcher;
+import java.util.regex.Pattern;
+
+import static org.hamcrest.CoreMatchers.is;
+import static org.junit.Assert.assertThat;
+import static org.junit.Assert.assertTrue;
+import static org.junit.Assert.fail;
+
+public class RepartitionTopicNamingTest {
+
+    private final KeyValueMapper<String, String, String> kvMapper = (k, v) -> k + v;
+    private static final String INPUT_TOPIC = "input";
+    private static final String COUNT_TOPIC = "outputTopic_0";
+    private static final String AGGREGATION_TOPIC = "outputTopic_1";
+    private static final String REDUCE_TOPIC = "outputTopic_2";
+    private static final String JOINED_TOPIC = "outputTopicForJoin";
+
+    private final String firstRepartitionTopicName = "count-stream";
+    private final String secondRepartitionTopicName = "aggregate-stream";
+    private final String thirdRepartitionTopicName = "reduced-stream";
+    private final String fourthRepartitionTopicName = "joined-stream";
+
+
+    @Test
+    public void shouldReuseFirstRepartitionTopicNameWhenOptimizing() {
+
+        final String optimizedTopology = buildTopology(StreamsConfig.OPTIMIZE).describe().toString();
+        final String unOptimizedTopology = buildTopology(StreamsConfig.NO_OPTIMIZATION).describe().toString();
+        final Pattern repartitionTopicPattern = Pattern.compile("Sink: .*-repartition");
+
+        assertThat(optimizedTopology, is(EXPECTED_OPTIMIZED_TOPOLOGY));
+        // only one repartition topic
+        assertThat(1, is(getCountOfRepartitionTopicsFound(optimizedTopology, repartitionTopicPattern)));
+        // the first named repartition topic
+        assertTrue(optimizedTopology.contains(firstRepartitionTopicName + "-repartition"));
+
+
+        assertThat(unOptimizedTopology, is(EXPECTED_UNOPTIMIZED_TOPOLOGY));
+        // now 4 repartition topic
+        assertThat(4, is(getCountOfRepartitionTopicsFound(unOptimizedTopology, repartitionTopicPattern)));
+        // all 4 named repartition topics present
+        assertTrue(unOptimizedTopology.contains(firstRepartitionTopicName + "-repartition"));
+        assertTrue(unOptimizedTopology.contains(secondRepartitionTopicName + "-repartition"));
+        assertTrue(unOptimizedTopology.contains(thirdRepartitionTopicName + "-repartition"));
+        assertTrue(unOptimizedTopology.contains(fourthRepartitionTopicName + "-left-repartition"));
+
+    }
+
+    // can't use same repartition topic name
+    @Test
+    public void shouldFailWithSameRepartitionTopicName() {
+        try {
+            final StreamsBuilder builder = new StreamsBuilder();
+            builder.<String, String>stream("topic").selectKey((k, v) -> k).groupByKey(Grouped.as("grouping")).count().toStream();
+            builder.<String, String>stream("topicII").selectKey((k, v) -> k).groupByKey(Grouped.as("grouping")).count().toStream();
+            builder.build();
+            fail("Should not build re-using repartition topic name");
+        } catch (final TopologyException te) {
+              // ok
+        }
+    }
+
+    // each KGroupedStream will result in repartition, can't reuse
+    // KGroupedStreams when specifying repartition topic names
+    // need to have separate groupByKey calls when naming repartition topics
+    // see test shouldHandleUniqueGroupedInstances below for an example
+    @Test
+    public void shouldFailWithSameRepartitionTopicNameUsingSameKGroupedStream() {
+        try {
+            final StreamsBuilder builder = new StreamsBuilder();
+            final KGroupedStream<String, String> kGroupedStream = builder.<String, String>stream("topic").selectKey((k, v) -> k).groupByKey(Grouped.as("grouping"));
+            kGroupedStream.windowedBy(TimeWindows.of(10)).count();
+            kGroupedStream.windowedBy(TimeWindows.of(30)).count();
+            builder.build();
+            fail("Should not build re-using repartition topic name");
+        } catch (final TopologyException te) {
+            // ok
+        }
+    }
+
+
+    // can't use same repartition topic name in joins
+    @Test
+    public void shouldFailWithSameRepartitionTopicNameInJoin() {
+        try {
+            final StreamsBuilder builder = new StreamsBuilder();
+            final KStream<String, String> stream1 = builder.<String, String>stream("topic").selectKey((k, v) -> k);
+            final KStream<String, String> stream2 = builder.<String, String>stream("topic2").selectKey((k, v) -> k);
+            final KStream<String, String> stream3 = builder.<String, String>stream("topic3").selectKey((k, v) -> k);
+
+            final KStream<String, String> joined = stream1.join(stream2, (v1, v2) -> v1 + v2, JoinWindows.of(30), Joined.named("join-repartition"));
+            joined.join(stream3, (v1, v2) -> v1 + v2, JoinWindows.of(30), Joined.named("join-repartition"));
+            builder.build();
+            fail("Should not build re-using repartition topic name");
+        } catch (final TopologyException te) {
+            // ok
+        }
+    }
+
+
+    @Test
+    public void shouldPassWithSameRepartitionTopicNameUsingSameKGroupedStreamOptimized() {
+        final StreamsBuilder builder = new StreamsBuilder();
+        final Properties properties = new Properties();
+        properties.put(StreamsConfig.TOPOLOGY_OPTIMIZATION, StreamsConfig.OPTIMIZE);
+        final KGroupedStream<String, String> kGroupedStream = builder.<String, String>stream("topic").selectKey((k, v) -> k).groupByKey(Grouped.as("grouping"));
+        kGroupedStream.windowedBy(TimeWindows.of(10)).count();
+        kGroupedStream.windowedBy(TimeWindows.of(30)).count();
+        builder.build(properties);
+    }
+
+
+    @Test
+    public void shouldKeepRepartitionTopicNameForJoins() {
+
+        final String expectedLeftRepartitionTopic = "(topic: my-join-left-repartition)";
+        final String expectedRightRepartitionTopic = "(topic: my-join-right-repartition)";
+
+
+        final String joinTopologyFirst = buildStreamJoin(false);
+
+        assertTrue(joinTopologyFirst.contains(expectedLeftRepartitionTopic));
+        assertTrue(joinTopologyFirst.contains(expectedRightRepartitionTopic));
+
+        final String joinTopologyUpdated = buildStreamJoin(true);
+
+        assertTrue(joinTopologyUpdated.contains(expectedLeftRepartitionTopic));
+        assertTrue(joinTopologyUpdated.contains(expectedRightRepartitionTopic));
+    }
+
+    @Test
+    public void shouldKeepRepartitionTopicNameForGroupByKeyTimeWindows() {
+
+        final String expectedTimeWindowRepartitionTopic = "(topic: time-window-grouping-repartition)";
+
+        final String timeWindowGroupingRepartitionTopology = buildStreamGroupByKeyTimeWindows(false, true);
+        assertTrue(timeWindowGroupingRepartitionTopology.contains(expectedTimeWindowRepartitionTopic));
+
+        final String timeWindowGroupingUpdatedTopology = buildStreamGroupByKeyTimeWindows(true, true);
+        assertTrue(timeWindowGroupingUpdatedTopology.contains(expectedTimeWindowRepartitionTopic));
+    }
+
+    @Test
+    public void shouldKeepRepartitionTopicNameForGroupByTimeWindows() {
+
+        final String expectedTimeWindowRepartitionTopic = "(topic: time-window-grouping-repartition)";
+
+        final String timeWindowGroupingRepartitionTopology = buildStreamGroupByKeyTimeWindows(false, false);
+        assertTrue(timeWindowGroupingRepartitionTopology.contains(expectedTimeWindowRepartitionTopic));
+
+        final String timeWindowGroupingUpdatedTopology = buildStreamGroupByKeyTimeWindows(true, false);
+        assertTrue(timeWindowGroupingUpdatedTopology.contains(expectedTimeWindowRepartitionTopic));
+    }
+
+
+    @Test
+    public void shouldKeepRepartitionTopicNameForGroupByKeyNoWindows() {
+
+        final String expectedNoWindowRepartitionTopic = "(topic: kstream-grouping-repartition)";
+
+        final String noWindowGroupingRepartitionTopology = buildStreamGroupByKeyNoWindows(false, true);
+        assertTrue(noWindowGroupingRepartitionTopology.contains(expectedNoWindowRepartitionTopic));
+
+        final String noWindowGroupingUpdatedTopology = buildStreamGroupByKeyNoWindows(true, true);
+        assertTrue(noWindowGroupingUpdatedTopology.contains(expectedNoWindowRepartitionTopic));
+    }
+
+    @Test
+    public void shouldKeepRepartitionTopicNameForGroupByNoWindows() {
+
+        final String expectedNoWindowRepartitionTopic = "(topic: kstream-grouping-repartition)";
+
+        final String noWindowGroupingRepartitionTopology = buildStreamGroupByKeyNoWindows(false, false);
+        assertTrue(noWindowGroupingRepartitionTopology.contains(expectedNoWindowRepartitionTopic));
+
+        final String noWindowGroupingUpdatedTopology = buildStreamGroupByKeyNoWindows(true, false);
+        assertTrue(noWindowGroupingUpdatedTopology.contains(expectedNoWindowRepartitionTopic));
+    }
+
+
+    @Test
+    public void shouldKeepRepartitionTopicNameForGroupByKeySessionWindows() {
+
+        final String expectedSessionWindowRepartitionTopic = "(topic: session-window-grouping-repartition)";
+
+        final String sessionWindowGroupingRepartitionTopology = buildStreamGroupByKeySessionWindows(false, true);
+        assertTrue(sessionWindowGroupingRepartitionTopology.contains(expectedSessionWindowRepartitionTopic));
+
+        final String sessionWindowGroupingUpdatedTopology = buildStreamGroupByKeySessionWindows(true, true);
+        assertTrue(sessionWindowGroupingUpdatedTopology.contains(expectedSessionWindowRepartitionTopic));
+    }
+
+    @Test
+    public void shouldKeepRepartitionTopicNameForGroupBySessionWindows() {
+
+        final String expectedSessionWindowRepartitionTopic = "(topic: session-window-grouping-repartition)";
+
+        final String sessionWindowGroupingRepartitionTopology = buildStreamGroupByKeySessionWindows(false, false);
+        assertTrue(sessionWindowGroupingRepartitionTopology.contains(expectedSessionWindowRepartitionTopic));
+
+        final String sessionWindowGroupingUpdatedTopology = buildStreamGroupByKeySessionWindows(true, false);
+        assertTrue(sessionWindowGroupingUpdatedTopology.contains(expectedSessionWindowRepartitionTopic));
+    }
+
+    @Test
+    public void shouldKeepRepartitionNameForGroupByKTable() {
+        final String expectedKTableGroupByRepartitionTopic = "(topic: ktable-group-by-repartition)";
+
+        final String ktableGroupByTopology = buildKTableGroupBy(false);
+        assertTrue(ktableGroupByTopology.contains(expectedKTableGroupByRepartitionTopic));
+
+        final String ktableUpdatedGroupByTopology = buildKTableGroupBy(true);
+        assertTrue(ktableUpdatedGroupByTopology.contains(expectedKTableGroupByRepartitionTopic));
+    }
+
+
+    private String buildKTableGroupBy(final boolean otherOperations) {
+        final String ktableGroupByTopicName = "ktable-group-by";
+        final StreamsBuilder builder = new StreamsBuilder();
+
+        final KTable<String, String> ktable = builder.table("topic");
+
+        if (otherOperations) {
+            ktable.filter((k, v) -> true).groupBy(KeyValue::pair, Grouped.as(ktableGroupByTopicName)).count();
+        } else {
+            ktable.groupBy(KeyValue::pair, Grouped.as(ktableGroupByTopicName)).count();
+        }
+
+        return builder.build().describe().toString();
+    }
+
+    private String buildStreamGroupByKeyTimeWindows(final boolean otherOperations, final boolean isGroupByKey) {
+
+        final String groupedTimeWindowRepartitionTopicName = "time-window-grouping";
+        final StreamsBuilder builder = new StreamsBuilder();
+
+        final KStream<String, String> selectKeyStream = builder.<String, String>stream("topic").selectKey((k, v) -> k + v);
+
+
+        if (isGroupByKey) {
+            if (otherOperations) {
+                selectKeyStream.filter((k, v) -> true).mapValues(v -> v).groupByKey(Grouped.as(groupedTimeWindowRepartitionTopicName)).windowedBy(TimeWindows.of(10)).count();
+            } else {
+                selectKeyStream.groupByKey(Grouped.as(groupedTimeWindowRepartitionTopicName)).windowedBy(TimeWindows.of(10)).count();
+            }
+        } else {
+            if (otherOperations) {
+                selectKeyStream.filter((k, v) -> true).mapValues(v -> v).groupBy(kvMapper, Grouped.as(groupedTimeWindowRepartitionTopicName)).count();
+            } else {
+                selectKeyStream.groupBy(kvMapper, Grouped.as(groupedTimeWindowRepartitionTopicName)).count();
+            }
+        }
+
+        return builder.build().describe().toString();
+    }
+
+
+    private String buildStreamGroupByKeySessionWindows(final boolean otherOperations, final boolean isGroupByKey) {
+
+        final StreamsBuilder builder = new StreamsBuilder();
+
+        final KStream<String, String> selectKeyStream = builder.<String, String>stream("topic").selectKey((k, v) -> k + v);
+
+        final String groupedSessionWindowRepartitionTopicName = "session-window-grouping";
+        if (isGroupByKey) {
+            if (otherOperations) {
+                selectKeyStream.filter((k, v) -> true).mapValues(v -> v).groupByKey(Grouped.as(groupedSessionWindowRepartitionTopicName)).windowedBy(SessionWindows.with(10)).count();
+            } else {
+                selectKeyStream.groupByKey(Grouped.as(groupedSessionWindowRepartitionTopicName)).windowedBy(SessionWindows.with(10)).count();
+            }
+        } else {
+            if (otherOperations) {
+                selectKeyStream.filter((k, v) -> true).mapValues(v -> v).groupBy(kvMapper, Grouped.as(groupedSessionWindowRepartitionTopicName)).windowedBy(SessionWindows.with(10)).count();
+            } else {
+                selectKeyStream.groupBy(kvMapper, Grouped.as(groupedSessionWindowRepartitionTopicName)).windowedBy(SessionWindows.with(10)).count();
+            }
+        }
+
+        return builder.build().describe().toString();
+    }
+
+
+    private String buildStreamGroupByKeyNoWindows(final boolean otherOperations, final boolean isGroupByKey) {
+
+        final StreamsBuilder builder = new StreamsBuilder();
+
+        final KStream<String, String> selectKeyStream = builder.<String, String>stream("topic").selectKey((k, v) -> k + v);
+
+        final String groupByAndCountRepartitionTopicName = "kstream-grouping";
+        if (isGroupByKey) {
+            if (otherOperations) {
+                selectKeyStream.filter((k, v) -> true).mapValues(v -> v).groupByKey(Grouped.as(groupByAndCountRepartitionTopicName)).count();
+            } else {
+                selectKeyStream.groupByKey(Grouped.as(groupByAndCountRepartitionTopicName)).count();
+            }
+        } else {
+            if (otherOperations) {
+                selectKeyStream.filter((k, v) -> true).mapValues(v -> v).groupBy(kvMapper, Grouped.as(groupByAndCountRepartitionTopicName)).count();
+            } else {
+                selectKeyStream.groupBy(kvMapper, Grouped.as(groupByAndCountRepartitionTopicName)).count();
+            }
+        }
+
+        return builder.build().describe().toString();
+    }
+
+    private String buildStreamJoin(final boolean includeOtherOperations) {
+        final StreamsBuilder builder = new StreamsBuilder();
+        final KStream<String, String> initialStreamOne = builder.stream("topic-one");
+        final KStream<String, String> initialStreamTwo = builder.stream("topic-two");
+
+        final KStream<String, String> updatedStreamOne;
+        final KStream<String, String> updatedStreamTwo;
+
+        if (includeOtherOperations) {
+            // without naming the join, the repartition topic name would change due to operator changing before join performed
+            updatedStreamOne = initialStreamOne.selectKey((k, v) -> k + v).filter((k, v) -> true).peek((k, v) -> System.out.println(k + v));
+            updatedStreamTwo = initialStreamTwo.selectKey((k, v) -> k + v).filter((k, v) -> true).peek((k, v) -> System.out.println(k + v));
+        } else {
+            updatedStreamOne = initialStreamOne.selectKey((k, v) -> k + v);
+            updatedStreamTwo = initialStreamTwo.selectKey((k, v) -> k + v);
+        }
+
+        final String joinRepartitionTopicName = "my-join";
+        updatedStreamOne.join(updatedStreamTwo, (v1, v2) -> v1 + v2,
+                JoinWindows.of(1000), Joined.with(Serdes.String(), Serdes.String(), Serdes.String(), joinRepartitionTopicName));
+
+        return builder.build().describe().toString();
+    }
+
+
+    private int getCountOfRepartitionTopicsFound(final String topologyString, final Pattern repartitionTopicPattern) {
+        final Matcher matcher = repartitionTopicPattern.matcher(topologyString);
+        final List<String> repartitionTopicsFound = new ArrayList<>();
+        while (matcher.find()) {
+            repartitionTopicsFound.add(matcher.group());
+        }
+        return repartitionTopicsFound.size();
+    }
+
+
+    private Topology buildTopology(final String optimizationConfig) {
+        final Initializer<Integer> initializer = () -> 0;
+        final Aggregator<String, String, Integer> aggregator = (k, v, agg) -> agg + v.length();
+        final Reducer<String> reducer = (v1, v2) -> v1 + ":" + v2;
+        final List<String> processorValueCollector = new ArrayList<>();
+
+        final StreamsBuilder builder = new StreamsBuilder();
+
+        final KStream<String, String> sourceStream = builder.stream(INPUT_TOPIC, Consumed.with(Serdes.String(), Serdes.String()));
+
+        final KStream<String, String> mappedStream = sourceStream.map((k, v) -> KeyValue.pair(k.toUpperCase(Locale.getDefault()), v));
+
+        mappedStream.filter((k, v) -> k.equals("B")).mapValues(v -> v.toUpperCase(Locale.getDefault()))
+                .process(() -> new SimpleProcessor(processorValueCollector));
+
+        final KStream<String, Long> countStream = mappedStream.groupByKey(Grouped.as(firstRepartitionTopicName)).count(Materialized.with(Serdes.String(), Serdes.Long())).toStream();
+
+        countStream.to(COUNT_TOPIC, Produced.with(Serdes.String(), Serdes.Long()));
+
+        mappedStream.groupByKey(Grouped.as(secondRepartitionTopicName)).aggregate(initializer,
+                aggregator,
+                Materialized.with(Serdes.String(), Serdes.Integer()))
+                .toStream().to(AGGREGATION_TOPIC, Produced.with(Serdes.String(), Serdes.Integer()));
+
+        // adding operators for case where the repartition node is further downstream
+        mappedStream.filter((k, v) -> true).peek((k, v) -> System.out.println(k + ":" + v)).groupByKey(Grouped.as(thirdRepartitionTopicName))
+                .reduce(reducer, Materialized.with(Serdes.String(), Serdes.String()))
+                .toStream().to(REDUCE_TOPIC, Produced.with(Serdes.String(), Serdes.String()));
+
+        mappedStream.filter((k, v) -> k.equals("A"))
+                .join(countStream, (v1, v2) -> v1 + ":" + v2.toString(),
+                        JoinWindows.of(5000),
+                        Joined.with(Serdes.String(), Serdes.String(), Serdes.Long(), fourthRepartitionTopicName))
+                .to(JOINED_TOPIC);
+
+        final Properties properties = new Properties();
+
+        properties.put(StreamsConfig.TOPOLOGY_OPTIMIZATION, optimizationConfig);
+        return builder.build(properties);
+    }
+
+
+    private static class SimpleProcessor extends AbstractProcessor<String, String> {
+
+        final List<String> valueList;
+
+        SimpleProcessor(final List<String> valueList) {
+            this.valueList = valueList;
+        }
+
+        @Override
+        public void process(final String key, final String value) {
+            valueList.add(value);
+        }
+    }
+
+
+    private static final String EXPECTED_OPTIMIZED_TOPOLOGY = "Topologies:\n" +
+            "   Sub-topology: 0\n" +
+            "    Source: KSTREAM-SOURCE-0000000000 (topics: [input])\n" +
+            "      --> KSTREAM-MAP-0000000001\n" +
+            "    Processor: KSTREAM-MAP-0000000001 (stores: [])\n" +
+            "      --> KSTREAM-FILTER-0000000002, KSTREAM-FILTER-0000000040\n" +
+            "      <-- KSTREAM-SOURCE-0000000000\n" +
+            "    Processor: KSTREAM-FILTER-0000000002 (stores: [])\n" +
+            "      --> KSTREAM-MAPVALUES-0000000003\n" +
+            "      <-- KSTREAM-MAP-0000000001\n" +
+            "    Processor: KSTREAM-FILTER-0000000040 (stores: [])\n" +
+            "      --> KSTREAM-SINK-0000000039\n" +
+            "      <-- KSTREAM-MAP-0000000001\n" +
+            "    Processor: KSTREAM-MAPVALUES-0000000003 (stores: [])\n" +
+            "      --> KSTREAM-PROCESSOR-0000000004\n" +
+            "      <-- KSTREAM-FILTER-0000000002\n" +
+            "    Processor: KSTREAM-PROCESSOR-0000000004 (stores: [])\n" +
+            "      --> none\n" +
+            "      <-- KSTREAM-MAPVALUES-0000000003\n" +
+            "    Sink: KSTREAM-SINK-0000000039 (topic: count-stream-repartition)\n" +
+            "      <-- KSTREAM-FILTER-0000000040\n" +
+            "\n" +
+            "  Sub-topology: 1\n" +
+            "    Source: KSTREAM-SOURCE-0000000041 (topics: [count-stream-repartition])\n" +
+            "      --> KSTREAM-FILTER-0000000020, KSTREAM-AGGREGATE-0000000007, KSTREAM-AGGREGATE-0000000014, KSTREAM-FILTER-0000000029\n" +
+            "    Processor: KSTREAM-AGGREGATE-0000000007 (stores: [KSTREAM-AGGREGATE-STATE-STORE-0000000006])\n" +
+            "      --> KTABLE-TOSTREAM-0000000011\n" +
+            "      <-- KSTREAM-SOURCE-0000000041\n" +
+            "    Processor: KTABLE-TOSTREAM-0000000011 (stores: [])\n" +
+            "      --> KSTREAM-SINK-0000000012, KSTREAM-WINDOWED-0000000034\n" +
+            "      <-- KSTREAM-AGGREGATE-0000000007\n" +
+            "    Processor: KSTREAM-FILTER-0000000020 (stores: [])\n" +
+            "      --> KSTREAM-PEEK-0000000021\n" +
+            "      <-- KSTREAM-SOURCE-0000000041\n" +
+            "    Processor: KSTREAM-FILTER-0000000029 (stores: [])\n" +
+            "      --> KSTREAM-WINDOWED-0000000033\n" +
+            "      <-- KSTREAM-SOURCE-0000000041\n" +
+            "    Processor: KSTREAM-PEEK-0000000021 (stores: [])\n" +
+            "      --> KSTREAM-REDUCE-0000000023\n" +
+            "      <-- KSTREAM-FILTER-0000000020\n" +
+            "    Processor: KSTREAM-WINDOWED-0000000033 (stores: [KSTREAM-JOINTHIS-0000000035-store])\n" +
+            "      --> KSTREAM-JOINTHIS-0000000035\n" +
+            "      <-- KSTREAM-FILTER-0000000029\n" +
+            "    Processor: KSTREAM-WINDOWED-0000000034 (stores: [KSTREAM-JOINOTHER-0000000036-store])\n" +
+            "      --> KSTREAM-JOINOTHER-0000000036\n" +
+            "      <-- KTABLE-TOSTREAM-0000000011\n" +
+            "    Processor: KSTREAM-AGGREGATE-0000000014 (stores: [KSTREAM-AGGREGATE-STATE-STORE-0000000013])\n" +
+            "      --> KTABLE-TOSTREAM-0000000018\n" +
+            "      <-- KSTREAM-SOURCE-0000000041\n" +
+            "    Processor: KSTREAM-JOINOTHER-0000000036 (stores: [KSTREAM-JOINTHIS-0000000035-store])\n" +
+            "      --> KSTREAM-MERGE-0000000037\n" +
+            "      <-- KSTREAM-WINDOWED-0000000034\n" +
+            "    Processor: KSTREAM-JOINTHIS-0000000035 (stores: [KSTREAM-JOINOTHER-0000000036-store])\n" +
+            "      --> KSTREAM-MERGE-0000000037\n" +
+            "      <-- KSTREAM-WINDOWED-0000000033\n" +
+            "    Processor: KSTREAM-REDUCE-0000000023 (stores: [KSTREAM-REDUCE-STATE-STORE-0000000022])\n" +
+            "      --> KTABLE-TOSTREAM-0000000027\n" +
+            "      <-- KSTREAM-PEEK-0000000021\n" +
+            "    Processor: KSTREAM-MERGE-0000000037 (stores: [])\n" +
+            "      --> KSTREAM-SINK-0000000038\n" +
+            "      <-- KSTREAM-JOINTHIS-0000000035, KSTREAM-JOINOTHER-0000000036\n" +
+            "    Processor: KTABLE-TOSTREAM-0000000018 (stores: [])\n" +
+            "      --> KSTREAM-SINK-0000000019\n" +
+            "      <-- KSTREAM-AGGREGATE-0000000014\n" +
+            "    Processor: KTABLE-TOSTREAM-0000000027 (stores: [])\n" +
+            "      --> KSTREAM-SINK-0000000028\n" +
+            "      <-- KSTREAM-REDUCE-0000000023\n" +
+            "    Sink: KSTREAM-SINK-0000000012 (topic: outputTopic_0)\n" +
+            "      <-- KTABLE-TOSTREAM-0000000011\n" +
+            "    Sink: KSTREAM-SINK-0000000019 (topic: outputTopic_1)\n" +
+            "      <-- KTABLE-TOSTREAM-0000000018\n" +
+            "    Sink: KSTREAM-SINK-0000000028 (topic: outputTopic_2)\n" +
+            "      <-- KTABLE-TOSTREAM-0000000027\n" +
+            "    Sink: KSTREAM-SINK-0000000038 (topic: outputTopicForJoin)\n" +
+            "      <-- KSTREAM-MERGE-0000000037\n\n";
+
+
+    private static final String EXPECTED_UNOPTIMIZED_TOPOLOGY = "Topologies:\n" +
+            "   Sub-topology: 0\n" +
+            "    Source: KSTREAM-SOURCE-0000000000 (topics: [input])\n" +
+            "      --> KSTREAM-MAP-0000000001\n" +
+            "    Processor: KSTREAM-MAP-0000000001 (stores: [])\n" +
+            "      --> KSTREAM-FILTER-0000000020, KSTREAM-FILTER-0000000002, KSTREAM-FILTER-0000000009, KSTREAM-FILTER-0000000016, KSTREAM-FILTER-0000000029\n" +
+            "      <-- KSTREAM-SOURCE-0000000000\n" +
+            "    Processor: KSTREAM-FILTER-0000000020 (stores: [])\n" +
+            "      --> KSTREAM-PEEK-0000000021\n" +
+            "      <-- KSTREAM-MAP-0000000001\n" +
+            "    Processor: KSTREAM-FILTER-0000000002 (stores: [])\n" +
+            "      --> KSTREAM-MAPVALUES-0000000003\n" +
+            "      <-- KSTREAM-MAP-0000000001\n" +
+            "    Processor: KSTREAM-FILTER-0000000029 (stores: [])\n" +
+            "      --> KSTREAM-FILTER-0000000031\n" +
+            "      <-- KSTREAM-MAP-0000000001\n" +
+            "    Processor: KSTREAM-PEEK-0000000021 (stores: [])\n" +
+            "      --> KSTREAM-FILTER-0000000025\n" +
+            "      <-- KSTREAM-FILTER-0000000020\n" +
+            "    Processor: KSTREAM-FILTER-0000000009 (stores: [])\n" +
+            "      --> KSTREAM-SINK-0000000008\n" +
+            "      <-- KSTREAM-MAP-0000000001\n" +
+            "    Processor: KSTREAM-FILTER-0000000016 (stores: [])\n" +
+            "      --> KSTREAM-SINK-0000000015\n" +
+            "      <-- KSTREAM-MAP-0000000001\n" +
+            "    Processor: KSTREAM-FILTER-0000000025 (stores: [])\n" +
+            "      --> KSTREAM-SINK-0000000024\n" +
+            "      <-- KSTREAM-PEEK-0000000021\n" +
+            "    Processor: KSTREAM-FILTER-0000000031 (stores: [])\n" +
+            "      --> KSTREAM-SINK-0000000030\n" +
+            "      <-- KSTREAM-FILTER-0000000029\n" +
+            "    Processor: KSTREAM-MAPVALUES-0000000003 (stores: [])\n" +
+            "      --> KSTREAM-PROCESSOR-0000000004\n" +
+            "      <-- KSTREAM-FILTER-0000000002\n" +
+            "    Processor: KSTREAM-PROCESSOR-0000000004 (stores: [])\n" +
+            "      --> none\n" +
+            "      <-- KSTREAM-MAPVALUES-0000000003\n" +
+            "    Sink: KSTREAM-SINK-0000000008 (topic: count-stream-repartition)\n" +
+            "      <-- KSTREAM-FILTER-0000000009\n" +
+            "    Sink: KSTREAM-SINK-0000000015 (topic: aggregate-stream-repartition)\n" +
+            "      <-- KSTREAM-FILTER-0000000016\n" +
+            "    Sink: KSTREAM-SINK-0000000024 (topic: reduced-stream-repartition)\n" +
+            "      <-- KSTREAM-FILTER-0000000025\n" +
+            "    Sink: KSTREAM-SINK-0000000030 (topic: joined-stream-left-repartition)\n" +
+            "      <-- KSTREAM-FILTER-0000000031\n" +
+            "\n" +
+            "  Sub-topology: 1\n" +
+            "    Source: KSTREAM-SOURCE-0000000010 (topics: [count-stream-repartition])\n" +
+            "      --> KSTREAM-AGGREGATE-0000000007\n" +
+            "    Processor: KSTREAM-AGGREGATE-0000000007 (stores: [KSTREAM-AGGREGATE-STATE-STORE-0000000006])\n" +
+            "      --> KTABLE-TOSTREAM-0000000011\n" +
+            "      <-- KSTREAM-SOURCE-0000000010\n" +
+            "    Processor: KTABLE-TOSTREAM-0000000011 (stores: [])\n" +
+            "      --> KSTREAM-SINK-0000000012, KSTREAM-WINDOWED-0000000034\n" +
+            "      <-- KSTREAM-AGGREGATE-0000000007\n" +
+            "    Source: KSTREAM-SOURCE-0000000032 (topics: [joined-stream-left-repartition])\n" +
+            "      --> KSTREAM-WINDOWED-0000000033\n" +
+            "    Processor: KSTREAM-WINDOWED-0000000033 (stores: [KSTREAM-JOINTHIS-0000000035-store])\n" +
+            "      --> KSTREAM-JOINTHIS-0000000035\n" +
+            "      <-- KSTREAM-SOURCE-0000000032\n" +
+            "    Processor: KSTREAM-WINDOWED-0000000034 (stores: [KSTREAM-JOINOTHER-0000000036-store])\n" +
+            "      --> KSTREAM-JOINOTHER-0000000036\n" +
+            "      <-- KTABLE-TOSTREAM-0000000011\n" +
+            "    Processor: KSTREAM-JOINOTHER-0000000036 (stores: [KSTREAM-JOINTHIS-0000000035-store])\n" +
+            "      --> KSTREAM-MERGE-0000000037\n" +
+            "      <-- KSTREAM-WINDOWED-0000000034\n" +
+            "    Processor: KSTREAM-JOINTHIS-0000000035 (stores: [KSTREAM-JOINOTHER-0000000036-store])\n" +
+            "      --> KSTREAM-MERGE-0000000037\n" +
+            "      <-- KSTREAM-WINDOWED-0000000033\n" +
+            "    Processor: KSTREAM-MERGE-0000000037 (stores: [])\n" +
+            "      --> KSTREAM-SINK-0000000038\n" +
+            "      <-- KSTREAM-JOINTHIS-0000000035, KSTREAM-JOINOTHER-0000000036\n" +
+            "    Sink: KSTREAM-SINK-0000000012 (topic: outputTopic_0)\n" +
+            "      <-- KTABLE-TOSTREAM-0000000011\n" +
+            "    Sink: KSTREAM-SINK-0000000038 (topic: outputTopicForJoin)\n" +
+            "      <-- KSTREAM-MERGE-0000000037\n" +
+            "\n" +
+            "  Sub-topology: 2\n" +
+            "    Source: KSTREAM-SOURCE-0000000017 (topics: [aggregate-stream-repartition])\n" +
+            "      --> KSTREAM-AGGREGATE-0000000014\n" +
+            "    Processor: KSTREAM-AGGREGATE-0000000014 (stores: [KSTREAM-AGGREGATE-STATE-STORE-0000000013])\n" +
+            "      --> KTABLE-TOSTREAM-0000000018\n" +
+            "      <-- KSTREAM-SOURCE-0000000017\n" +
+            "    Processor: KTABLE-TOSTREAM-0000000018 (stores: [])\n" +
+            "      --> KSTREAM-SINK-0000000019\n" +
+            "      <-- KSTREAM-AGGREGATE-0000000014\n" +
+            "    Sink: KSTREAM-SINK-0000000019 (topic: outputTopic_1)\n" +
+            "      <-- KTABLE-TOSTREAM-0000000018\n" +
+            "\n" +
+            "  Sub-topology: 3\n" +
+            "    Source: KSTREAM-SOURCE-0000000026 (topics: [reduced-stream-repartition])\n" +
+            "      --> KSTREAM-REDUCE-0000000023\n" +
+            "    Processor: KSTREAM-REDUCE-0000000023 (stores: [KSTREAM-REDUCE-STATE-STORE-0000000022])\n" +
+            "      --> KTABLE-TOSTREAM-0000000027\n" +
+            "      <-- KSTREAM-SOURCE-0000000026\n" +
+            "    Processor: KTABLE-TOSTREAM-0000000027 (stores: [])\n" +
+            "      --> KSTREAM-SINK-0000000028\n" +
+            "      <-- KSTREAM-REDUCE-0000000023\n" +
+            "    Sink: KSTREAM-SINK-0000000028 (topic: outputTopic_2)\n" +
+            "      <-- KTABLE-TOSTREAM-0000000027\n\n";
+
+
+}


 

----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
users@infra.apache.org


> Naming Join and Grouping Repartition Topics
> -------------------------------------------
>
>                 Key: KAFKA-7406
>                 URL: https://issues.apache.org/jira/browse/KAFKA-7406
>             Project: Kafka
>          Issue Type: Improvement
>          Components: streams
>            Reporter: Bill Bejeck
>            Assignee: Bill Bejeck
>            Priority: Major
>              Labels: needs-kip
>             Fix For: 2.1.0
>
>
> To help make Streams compatible with topology changes, we will need to give users the ability to name some operators so after adjusting the topology a rolling upgrade is possible.  
> This Jira is the first in this effort to allow for giving operators deterministic names.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)