You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@kafka.apache.org by bb...@apache.org on 2020/05/27 23:27:31 UTC

[kafka] branch 2.5 updated: KAFKA-9298: reuse mapped stream error in joins (#8504)

This is an automated email from the ASF dual-hosted git repository.

bbejeck pushed a commit to branch 2.5
in repository https://gitbox.apache.org/repos/asf/kafka.git


The following commit(s) were added to refs/heads/2.5 by this push:
     new f5c1da8  KAFKA-9298: reuse mapped stream error in joins (#8504)
f5c1da8 is described below

commit f5c1da8f5520ffff900d5cc92ea30efec7e61845
Author: Bill Bejeck <bb...@gmail.com>
AuthorDate: Wed May 27 14:54:41 2020 -0400

    KAFKA-9298: reuse mapped stream error in joins (#8504)
    
    Reviewers: Matthias J. Sax <ma...@confluent.io>, John Roesler <jo...@confluent.io>
---
 .../streams/kstream/internals/KStreamImpl.java     |  13 +-
 .../kstream/internals/KStreamKStreamJoinTest.java  | 152 +++++++++++++++++++++
 .../kstream/internals/KStreamKTableJoinTest.java   | 151 ++++++++++++++++++--
 3 files changed, 298 insertions(+), 18 deletions(-)

diff --git a/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KStreamImpl.java b/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KStreamImpl.java
index 308aafe..e72dc17 100644
--- a/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KStreamImpl.java
+++ b/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KStreamImpl.java
@@ -120,6 +120,8 @@ public class KStreamImpl<K, V> extends AbstractStream<K, V> implements KStream<K
 
     private final boolean repartitionRequired;
 
+    private OptimizableRepartitionNode<K, V> repartitionNode;
+
     KStreamImpl(final String name,
                 final Serde<K> keySerde,
                 final Serde<V> valueSerde,
@@ -918,6 +920,9 @@ public class KStreamImpl<K, V> extends AbstractStream<K, V> implements KStream<K
         final Serde<V> repartitionValueSerde = valueSerdeOverride != null ? valueSerdeOverride : valSerde;
         final OptimizableRepartitionNodeBuilder<K, V> optimizableRepartitionNodeBuilder =
             OptimizableRepartitionNode.optimizableRepartitionNodeBuilder();
+        // we still need to create the repartitioned source each time
+        // as it increments the counter which
+        // is needed to maintain topology compatibility
         final String repartitionedSourceName = createRepartitionedSource(
             builder,
             repartitionKeySerde,
@@ -925,8 +930,10 @@ public class KStreamImpl<K, V> extends AbstractStream<K, V> implements KStream<K
             repartitionName,
             optimizableRepartitionNodeBuilder);
 
-        final OptimizableRepartitionNode<K, V> optimizableRepartitionNode = optimizableRepartitionNodeBuilder.build();
-        builder.addGraphNode(streamsGraphNode, optimizableRepartitionNode);
+        if (repartitionNode == null || !name.equals(repartitionName)) {
+            repartitionNode = optimizableRepartitionNodeBuilder.build();
+            builder.addGraphNode(streamsGraphNode, repartitionNode);
+        }
 
         return new KStreamImpl<>(
             repartitionedSourceName,
@@ -934,7 +941,7 @@ public class KStreamImpl<K, V> extends AbstractStream<K, V> implements KStream<K
             repartitionValueSerde,
             Collections.singleton(repartitionedSourceName),
             false,
-            optimizableRepartitionNode,
+            repartitionNode,
             builder);
     }
 
diff --git a/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KStreamKStreamJoinTest.java b/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KStreamKStreamJoinTest.java
index 2e7fa5c..b69a854 100644
--- a/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KStreamKStreamJoinTest.java
+++ b/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KStreamKStreamJoinTest.java
@@ -19,9 +19,11 @@ package org.apache.kafka.streams.kstream.internals;
 import org.apache.kafka.common.serialization.IntegerSerializer;
 import org.apache.kafka.common.serialization.Serdes;
 import org.apache.kafka.common.serialization.StringSerializer;
+import org.apache.kafka.streams.KeyValue;
 import org.apache.kafka.streams.KeyValueTimestamp;
 import org.apache.kafka.streams.StreamsBuilder;
 import org.apache.kafka.streams.StreamsConfig;
+import org.apache.kafka.streams.Topology;
 import org.apache.kafka.streams.TopologyTestDriver;
 import org.apache.kafka.streams.TopologyWrapper;
 import org.apache.kafka.streams.errors.StreamsException;
@@ -77,6 +79,38 @@ public class KStreamKStreamJoinTest {
         shouldLogAndMeterOnSkippedRecordsWithNullValue(StreamsConfig.METRICS_LATEST);
     }
 
+
+    @Test
+    public void shouldReuseRepartitionTopicWithGeneratedName() {
+        final StreamsBuilder builder = new StreamsBuilder();
+        final Properties props = new Properties();
+        props.put(StreamsConfig.TOPOLOGY_OPTIMIZATION, StreamsConfig.NO_OPTIMIZATION);
+        final KStream<String, String> stream1 = builder.stream("topic", Consumed.with(Serdes.String(), Serdes.String()));
+        final KStream<String, String> stream2 = builder.stream("topic2", Consumed.with(Serdes.String(), Serdes.String()));
+        final KStream<String, String> stream3 = builder.stream("topic3", Consumed.with(Serdes.String(), Serdes.String()));
+        final KStream<String, String> newStream = stream1.map((k, v) -> new KeyValue<>(v, k));
+        newStream.join(stream2, (value1, value2) -> value1 + value2, JoinWindows.of(ofMillis(100))).to("out-one");
+        newStream.join(stream3, (value1, value2) -> value1 + value2, JoinWindows.of(ofMillis(100))).to("out-to");
+        assertEquals(expectedTopologyWithGeneratedRepartitionTopic, builder.build(props).describe().toString());
+    }
+
+    @Test
+    public void shouldCreateRepartitionTopicsWithUserProvidedName() {
+        final StreamsBuilder builder = new StreamsBuilder();
+        final Properties props = new Properties();
+        props.put(StreamsConfig.TOPOLOGY_OPTIMIZATION, StreamsConfig.NO_OPTIMIZATION);
+        final KStream<String, String> stream1 = builder.stream("topic", Consumed.with(Serdes.String(), Serdes.String()));
+        final KStream<String, String> stream2 = builder.stream("topic2", Consumed.with(Serdes.String(), Serdes.String()));
+        final KStream<String, String> stream3 = builder.stream("topic3", Consumed.with(Serdes.String(), Serdes.String()));
+        final KStream<String, String> newStream = stream1.map((k, v) -> new KeyValue<>(v, k));
+        final StreamJoined<String, String, String> streamJoined = StreamJoined.with(Serdes.String(), Serdes.String(), Serdes.String());
+        newStream.join(stream2, (value1, value2) -> value1 + value2, JoinWindows.of(ofMillis(100)), streamJoined.withName("first-join")).to("out-one");
+        newStream.join(stream3, (value1, value2) -> value1 + value2, JoinWindows.of(ofMillis(100)), streamJoined.withName("second-join")).to("out-two");
+        final Topology topology =  builder.build(props);
+        System.out.println(topology.describe().toString());
+        assertEquals(expectedTopologyWithUserNamedRepartitionTopics, topology.describe().toString());
+    }
+
     private void shouldLogAndMeterOnSkippedRecordsWithNullValue(final String builtInMetricsVersion) {
         final StreamsBuilder builder = new StreamsBuilder();
 
@@ -1557,4 +1591,122 @@ public class KStreamKStreamJoinTest {
     }
 
 
+    private final String expectedTopologyWithUserNamedRepartitionTopics = "Topologies:\n" +
+            "   Sub-topology: 0\n" +
+            "    Source: KSTREAM-SOURCE-0000000000 (topics: [topic])\n" +
+            "      --> KSTREAM-MAP-0000000003\n" +
+            "    Processor: KSTREAM-MAP-0000000003 (stores: [])\n" +
+            "      --> second-join-left-repartition-filter, first-join-left-repartition-filter\n" +
+            "      <-- KSTREAM-SOURCE-0000000000\n" +
+            "    Processor: first-join-left-repartition-filter (stores: [])\n" +
+            "      --> first-join-left-repartition-sink\n" +
+            "      <-- KSTREAM-MAP-0000000003\n" +
+            "    Processor: second-join-left-repartition-filter (stores: [])\n" +
+            "      --> second-join-left-repartition-sink\n" +
+            "      <-- KSTREAM-MAP-0000000003\n" +
+            "    Sink: first-join-left-repartition-sink (topic: first-join-left-repartition)\n" +
+            "      <-- first-join-left-repartition-filter\n" +
+            "    Sink: second-join-left-repartition-sink (topic: second-join-left-repartition)\n" +
+            "      <-- second-join-left-repartition-filter\n" +
+            "\n" +
+            "  Sub-topology: 1\n" +
+            "    Source: KSTREAM-SOURCE-0000000001 (topics: [topic2])\n" +
+            "      --> first-join-other-windowed\n" +
+            "    Source: first-join-left-repartition-source (topics: [first-join-left-repartition])\n" +
+            "      --> first-join-this-windowed\n" +
+            "    Processor: first-join-other-windowed (stores: [KSTREAM-JOINOTHER-0000000010-store])\n" +
+            "      --> first-join-other-join\n" +
+            "      <-- KSTREAM-SOURCE-0000000001\n" +
+            "    Processor: first-join-this-windowed (stores: [KSTREAM-JOINTHIS-0000000009-store])\n" +
+            "      --> first-join-this-join\n" +
+            "      <-- first-join-left-repartition-source\n" +
+            "    Processor: first-join-other-join (stores: [KSTREAM-JOINTHIS-0000000009-store])\n" +
+            "      --> first-join-merge\n" +
+            "      <-- first-join-other-windowed\n" +
+            "    Processor: first-join-this-join (stores: [KSTREAM-JOINOTHER-0000000010-store])\n" +
+            "      --> first-join-merge\n" +
+            "      <-- first-join-this-windowed\n" +
+            "    Processor: first-join-merge (stores: [])\n" +
+            "      --> KSTREAM-SINK-0000000012\n" +
+            "      <-- first-join-this-join, first-join-other-join\n" +
+            "    Sink: KSTREAM-SINK-0000000012 (topic: out-one)\n" +
+            "      <-- first-join-merge\n" +
+            "\n" +
+            "  Sub-topology: 2\n" +
+            "    Source: KSTREAM-SOURCE-0000000002 (topics: [topic3])\n" +
+            "      --> second-join-other-windowed\n" +
+            "    Source: second-join-left-repartition-source (topics: [second-join-left-repartition])\n" +
+            "      --> second-join-this-windowed\n" +
+            "    Processor: second-join-other-windowed (stores: [KSTREAM-JOINOTHER-0000000019-store])\n" +
+            "      --> second-join-other-join\n" +
+            "      <-- KSTREAM-SOURCE-0000000002\n" +
+            "    Processor: second-join-this-windowed (stores: [KSTREAM-JOINTHIS-0000000018-store])\n" +
+            "      --> second-join-this-join\n" +
+            "      <-- second-join-left-repartition-source\n" +
+            "    Processor: second-join-other-join (stores: [KSTREAM-JOINTHIS-0000000018-store])\n" +
+            "      --> second-join-merge\n" +
+            "      <-- second-join-other-windowed\n" +
+            "    Processor: second-join-this-join (stores: [KSTREAM-JOINOTHER-0000000019-store])\n" +
+            "      --> second-join-merge\n" +
+            "      <-- second-join-this-windowed\n" +
+            "    Processor: second-join-merge (stores: [])\n" +
+            "      --> KSTREAM-SINK-0000000021\n" +
+            "      <-- second-join-this-join, second-join-other-join\n" +
+            "    Sink: KSTREAM-SINK-0000000021 (topic: out-two)\n" +
+            "      <-- second-join-merge\n\n";
+    
+    private final String expectedTopologyWithGeneratedRepartitionTopic = "Topologies:\n" +
+            "   Sub-topology: 0\n" +
+            "    Source: KSTREAM-SOURCE-0000000000 (topics: [topic])\n" +
+            "      --> KSTREAM-MAP-0000000003\n" +
+            "    Processor: KSTREAM-MAP-0000000003 (stores: [])\n" +
+            "      --> KSTREAM-FILTER-0000000005\n" +
+            "      <-- KSTREAM-SOURCE-0000000000\n" +
+            "    Processor: KSTREAM-FILTER-0000000005 (stores: [])\n" +
+            "      --> KSTREAM-SINK-0000000004\n" +
+            "      <-- KSTREAM-MAP-0000000003\n" +
+            "    Sink: KSTREAM-SINK-0000000004 (topic: KSTREAM-MAP-0000000003-repartition)\n" +
+            "      <-- KSTREAM-FILTER-0000000005\n" +
+            "\n" +
+            "  Sub-topology: 1\n" +
+            "    Source: KSTREAM-SOURCE-0000000006 (topics: [KSTREAM-MAP-0000000003-repartition])\n" +
+            "      --> KSTREAM-WINDOWED-0000000007, KSTREAM-WINDOWED-0000000016\n" +
+            "    Source: KSTREAM-SOURCE-0000000001 (topics: [topic2])\n" +
+            "      --> KSTREAM-WINDOWED-0000000008\n" +
+            "    Source: KSTREAM-SOURCE-0000000002 (topics: [topic3])\n" +
+            "      --> KSTREAM-WINDOWED-0000000017\n" +
+            "    Processor: KSTREAM-WINDOWED-0000000007 (stores: [KSTREAM-JOINTHIS-0000000009-store])\n" +
+            "      --> KSTREAM-JOINTHIS-0000000009\n" +
+            "      <-- KSTREAM-SOURCE-0000000006\n" +
+            "    Processor: KSTREAM-WINDOWED-0000000008 (stores: [KSTREAM-JOINOTHER-0000000010-store])\n" +
+            "      --> KSTREAM-JOINOTHER-0000000010\n" +
+            "      <-- KSTREAM-SOURCE-0000000001\n" +
+            "    Processor: KSTREAM-WINDOWED-0000000016 (stores: [KSTREAM-JOINTHIS-0000000018-store])\n" +
+            "      --> KSTREAM-JOINTHIS-0000000018\n" +
+            "      <-- KSTREAM-SOURCE-0000000006\n" +
+            "    Processor: KSTREAM-WINDOWED-0000000017 (stores: [KSTREAM-JOINOTHER-0000000019-store])\n" +
+            "      --> KSTREAM-JOINOTHER-0000000019\n" +
+            "      <-- KSTREAM-SOURCE-0000000002\n" +
+            "    Processor: KSTREAM-JOINOTHER-0000000010 (stores: [KSTREAM-JOINTHIS-0000000009-store])\n" +
+            "      --> KSTREAM-MERGE-0000000011\n" +
+            "      <-- KSTREAM-WINDOWED-0000000008\n" +
+            "    Processor: KSTREAM-JOINOTHER-0000000019 (stores: [KSTREAM-JOINTHIS-0000000018-store])\n" +
+            "      --> KSTREAM-MERGE-0000000020\n" +
+            "      <-- KSTREAM-WINDOWED-0000000017\n" +
+            "    Processor: KSTREAM-JOINTHIS-0000000009 (stores: [KSTREAM-JOINOTHER-0000000010-store])\n" +
+            "      --> KSTREAM-MERGE-0000000011\n" +
+            "      <-- KSTREAM-WINDOWED-0000000007\n" +
+            "    Processor: KSTREAM-JOINTHIS-0000000018 (stores: [KSTREAM-JOINOTHER-0000000019-store])\n" +
+            "      --> KSTREAM-MERGE-0000000020\n" +
+            "      <-- KSTREAM-WINDOWED-0000000016\n" +
+            "    Processor: KSTREAM-MERGE-0000000011 (stores: [])\n" +
+            "      --> KSTREAM-SINK-0000000012\n" +
+            "      <-- KSTREAM-JOINTHIS-0000000009, KSTREAM-JOINOTHER-0000000010\n" +
+            "    Processor: KSTREAM-MERGE-0000000020 (stores: [])\n" +
+            "      --> KSTREAM-SINK-0000000021\n" +
+            "      <-- KSTREAM-JOINTHIS-0000000018, KSTREAM-JOINOTHER-0000000019\n" +
+            "    Sink: KSTREAM-SINK-0000000012 (topic: out-one)\n" +
+            "      <-- KSTREAM-MERGE-0000000011\n" +
+            "    Sink: KSTREAM-SINK-0000000021 (topic: out-to)\n" +
+            "      <-- KSTREAM-MERGE-0000000020\n\n";
 }
\ No newline at end of file
diff --git a/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KStreamKTableJoinTest.java b/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KStreamKTableJoinTest.java
index 7bee745..d1b70d4 100644
--- a/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KStreamKTableJoinTest.java
+++ b/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KStreamKTableJoinTest.java
@@ -16,19 +16,35 @@
  */
 package org.apache.kafka.streams.kstream.internals;
 
+import static org.apache.kafka.test.StreamsTestUtils.getMetricByName;
+import static org.hamcrest.CoreMatchers.hasItem;
+import static org.hamcrest.MatcherAssert.assertThat;
+import static org.junit.Assert.assertEquals;
+
+import java.time.Duration;
+import java.time.Instant;
+import java.util.Arrays;
+import java.util.Collection;
+import java.util.HashSet;
+import java.util.Properties;
+import java.util.Random;
+import java.util.Set;
 import org.apache.kafka.common.serialization.IntegerSerializer;
 import org.apache.kafka.common.serialization.Serdes;
 import org.apache.kafka.common.serialization.StringSerializer;
+import org.apache.kafka.streams.KeyValue;
 import org.apache.kafka.streams.KeyValueTimestamp;
 import org.apache.kafka.streams.StreamsBuilder;
 import org.apache.kafka.streams.StreamsConfig;
+import org.apache.kafka.streams.TestInputTopic;
+import org.apache.kafka.streams.Topology;
 import org.apache.kafka.streams.TopologyTestDriver;
 import org.apache.kafka.streams.TopologyWrapper;
 import org.apache.kafka.streams.kstream.Consumed;
+import org.apache.kafka.streams.kstream.Joined;
 import org.apache.kafka.streams.kstream.KStream;
 import org.apache.kafka.streams.kstream.KTable;
 import org.apache.kafka.streams.processor.internals.testutil.LogCaptureAppender;
-import org.apache.kafka.streams.TestInputTopic;
 import org.apache.kafka.test.MockProcessor;
 import org.apache.kafka.test.MockProcessorSupplier;
 import org.apache.kafka.test.MockValueJoiner;
@@ -37,20 +53,6 @@ import org.junit.After;
 import org.junit.Before;
 import org.junit.Test;
 
-import java.time.Duration;
-import java.time.Instant;
-import java.util.Arrays;
-import java.util.Collection;
-import java.util.HashSet;
-import java.util.Properties;
-import java.util.Random;
-import java.util.Set;
-
-import static org.apache.kafka.test.StreamsTestUtils.getMetricByName;
-import static org.hamcrest.CoreMatchers.hasItem;
-import static org.hamcrest.MatcherAssert.assertThat;
-import static org.junit.Assert.assertEquals;
-
 public class KStreamKTableJoinTest {
     private final static KeyValueTimestamp[] EMPTY = new KeyValueTimestamp[0];
 
@@ -113,6 +115,38 @@ public class KStreamKTableJoinTest {
     }
 
     @Test
+    public void shouldReuseRepartitionTopicWithGeneratedName() {
+        final StreamsBuilder builder = new StreamsBuilder();
+        final Properties props = new Properties();
+        props.put(StreamsConfig.TOPOLOGY_OPTIMIZATION, StreamsConfig.NO_OPTIMIZATION);
+        final KStream<String, String> streamA = builder.stream("topic", Consumed.with(Serdes.String(), Serdes.String()));
+        final KTable<String, String> tableB = builder.table("topic2", Consumed.with(Serdes.String(), Serdes.String()));
+        final KTable<String, String> tableC = builder.table("topic3", Consumed.with(Serdes.String(), Serdes.String()));
+        final KStream<String, String> rekeyedStream = streamA.map((k, v) -> new KeyValue<>(v, k));
+        rekeyedStream.join(tableB, (value1, value2) -> value1 + value2).to("out-one");
+        rekeyedStream.join(tableC, (value1, value2) -> value1 + value2).to("out-two");
+        final Topology topology = builder.build(props);
+        assertEquals(expectedTopologyWithGeneratedRepartitionTopicNames, topology.describe().toString());
+    }
+
+    @Test
+    public void shouldCreateRepartitionTopicsWithUserProvidedName() {
+        final StreamsBuilder builder = new StreamsBuilder();
+        final Properties props = new Properties();
+        props.put(StreamsConfig.TOPOLOGY_OPTIMIZATION, StreamsConfig.NO_OPTIMIZATION);
+        final KStream<String, String> streamA = builder.stream("topic", Consumed.with(Serdes.String(), Serdes.String()));
+        final KTable<String, String> tableB = builder.table("topic2", Consumed.with(Serdes.String(), Serdes.String()));
+        final KTable<String, String> tableC = builder.table("topic3", Consumed.with(Serdes.String(), Serdes.String()));
+        final KStream<String, String> rekeyedStream = streamA.map((k, v) -> new KeyValue<>(v, k));
+
+        rekeyedStream.join(tableB, (value1, value2) -> value1 + value2, Joined.with(Serdes.String(), Serdes.String(), Serdes.String(), "first-join")).to("out-one");
+        rekeyedStream.join(tableC, (value1, value2) -> value1 + value2, Joined.with(Serdes.String(), Serdes.String(), Serdes.String(), "second-join")).to("out-two");
+        final Topology topology = builder.build(props);
+        System.out.println(topology.describe().toString());
+        assertEquals(expectedTopologyWithUserProvidedRepartitionTopicNames, topology.describe().toString());
+    }
+
+    @Test
     public void shouldRequireCopartitionedStreams() {
         final Collection<Set<String>> copartitionGroups =
             TopologyWrapper.getInternalTopologyBuilder(builder.build()).copartitionGroups();
@@ -256,4 +290,91 @@ public class KStreamKTableJoinTest {
                 + "offset=[0]")
         );
     }
+
+
+    private final String expectedTopologyWithGeneratedRepartitionTopicNames =
+        "Topologies:\n"
+        + "   Sub-topology: 0\n"
+        + "    Source: KSTREAM-SOURCE-0000000000 (topics: [topic])\n"
+        + "      --> KSTREAM-MAP-0000000007\n"
+        + "    Processor: KSTREAM-MAP-0000000007 (stores: [])\n"
+        + "      --> KSTREAM-FILTER-0000000009\n"
+        + "      <-- KSTREAM-SOURCE-0000000000\n"
+        + "    Processor: KSTREAM-FILTER-0000000009 (stores: [])\n"
+        + "      --> KSTREAM-SINK-0000000008\n"
+        + "      <-- KSTREAM-MAP-0000000007\n"
+        + "    Sink: KSTREAM-SINK-0000000008 (topic: KSTREAM-MAP-0000000007-repartition)\n"
+        + "      <-- KSTREAM-FILTER-0000000009\n"
+        + "\n"
+        + "  Sub-topology: 1\n"
+        + "    Source: KSTREAM-SOURCE-0000000010 (topics: [KSTREAM-MAP-0000000007-repartition])\n"
+        + "      --> KSTREAM-JOIN-0000000011, KSTREAM-JOIN-0000000016\n"
+        + "    Processor: KSTREAM-JOIN-0000000011 (stores: [topic2-STATE-STORE-0000000001])\n"
+        + "      --> KSTREAM-SINK-0000000012\n"
+        + "      <-- KSTREAM-SOURCE-0000000010\n"
+        + "    Processor: KSTREAM-JOIN-0000000016 (stores: [topic3-STATE-STORE-0000000004])\n"
+        + "      --> KSTREAM-SINK-0000000017\n"
+        + "      <-- KSTREAM-SOURCE-0000000010\n"
+        + "    Source: KSTREAM-SOURCE-0000000002 (topics: [topic2])\n"
+        + "      --> KTABLE-SOURCE-0000000003\n"
+        + "    Source: KSTREAM-SOURCE-0000000005 (topics: [topic3])\n"
+        + "      --> KTABLE-SOURCE-0000000006\n"
+        + "    Sink: KSTREAM-SINK-0000000012 (topic: out-one)\n"
+        + "      <-- KSTREAM-JOIN-0000000011\n"
+        + "    Sink: KSTREAM-SINK-0000000017 (topic: out-two)\n"
+        + "      <-- KSTREAM-JOIN-0000000016\n"
+        + "    Processor: KTABLE-SOURCE-0000000003 (stores: [topic2-STATE-STORE-0000000001])\n"
+        + "      --> none\n"
+        + "      <-- KSTREAM-SOURCE-0000000002\n"
+        + "    Processor: KTABLE-SOURCE-0000000006 (stores: [topic3-STATE-STORE-0000000004])\n"
+        + "      --> none\n"
+        + "      <-- KSTREAM-SOURCE-0000000005\n\n";
+
+
+    private final String expectedTopologyWithUserProvidedRepartitionTopicNames =
+            "Topologies:\n"
+                    + "   Sub-topology: 0\n"
+                    + "    Source: KSTREAM-SOURCE-0000000000 (topics: [topic])\n"
+                    + "      --> KSTREAM-MAP-0000000007\n"
+                    + "    Processor: KSTREAM-MAP-0000000007 (stores: [])\n"
+                    + "      --> first-join-repartition-filter, second-join-repartition-filter\n"
+                    + "      <-- KSTREAM-SOURCE-0000000000\n"
+                    + "    Processor: first-join-repartition-filter (stores: [])\n"
+                    + "      --> first-join-repartition-sink\n"
+                    + "      <-- KSTREAM-MAP-0000000007\n"
+                    + "    Processor: second-join-repartition-filter (stores: [])\n"
+                    + "      --> second-join-repartition-sink\n"
+                    + "      <-- KSTREAM-MAP-0000000007\n"
+                    + "    Sink: first-join-repartition-sink (topic: first-join-repartition)\n"
+                    + "      <-- first-join-repartition-filter\n"
+                    + "    Sink: second-join-repartition-sink (topic: second-join-repartition)\n"
+                    + "      <-- second-join-repartition-filter\n"
+                    + "\n"
+                    + "  Sub-topology: 1\n"
+                    + "    Source: first-join-repartition-source (topics: [first-join-repartition])\n"
+                    + "      --> first-join\n"
+                    + "    Source: KSTREAM-SOURCE-0000000002 (topics: [topic2])\n"
+                    + "      --> KTABLE-SOURCE-0000000003\n"
+                    + "    Processor: first-join (stores: [topic2-STATE-STORE-0000000001])\n"
+                    + "      --> KSTREAM-SINK-0000000012\n"
+                    + "      <-- first-join-repartition-source\n"
+                    + "    Sink: KSTREAM-SINK-0000000012 (topic: out-one)\n"
+                    + "      <-- first-join\n"
+                    + "    Processor: KTABLE-SOURCE-0000000003 (stores: [topic2-STATE-STORE-0000000001])\n"
+                    + "      --> none\n"
+                    + "      <-- KSTREAM-SOURCE-0000000002\n"
+                    + "\n"
+                    + "  Sub-topology: 2\n"
+                    + "    Source: second-join-repartition-source (topics: [second-join-repartition])\n"
+                    + "      --> second-join\n"
+                    + "    Source: KSTREAM-SOURCE-0000000005 (topics: [topic3])\n"
+                    + "      --> KTABLE-SOURCE-0000000006\n"
+                    + "    Processor: second-join (stores: [topic3-STATE-STORE-0000000004])\n"
+                    + "      --> KSTREAM-SINK-0000000017\n"
+                    + "      <-- second-join-repartition-source\n"
+                    + "    Sink: KSTREAM-SINK-0000000017 (topic: out-two)\n"
+                    + "      <-- second-join\n"
+                    + "    Processor: KTABLE-SOURCE-0000000006 (stores: [topic3-STATE-STORE-0000000004])\n"
+                    + "      --> none\n"
+                    + "      <-- KSTREAM-SOURCE-0000000005\n\n";
 }