You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@kafka.apache.org by bb...@apache.org on 2020/05/27 23:27:31 UTC
[kafka] branch 2.5 updated: KAFKA-9298: reuse mapped stream error
in joins (#8504)
This is an automated email from the ASF dual-hosted git repository.
bbejeck pushed a commit to branch 2.5
in repository https://gitbox.apache.org/repos/asf/kafka.git
The following commit(s) were added to refs/heads/2.5 by this push:
new f5c1da8 KAFKA-9298: reuse mapped stream error in joins (#8504)
f5c1da8 is described below
commit f5c1da8f5520ffff900d5cc92ea30efec7e61845
Author: Bill Bejeck <bb...@gmail.com>
AuthorDate: Wed May 27 14:54:41 2020 -0400
KAFKA-9298: reuse mapped stream error in joins (#8504)
Reviewers: Matthias J. Sax <ma...@confluent.io>, John Roesler <jo...@confluent.io>
---
.../streams/kstream/internals/KStreamImpl.java | 13 +-
.../kstream/internals/KStreamKStreamJoinTest.java | 152 +++++++++++++++++++++
.../kstream/internals/KStreamKTableJoinTest.java | 151 ++++++++++++++++++--
3 files changed, 298 insertions(+), 18 deletions(-)
diff --git a/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KStreamImpl.java b/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KStreamImpl.java
index 308aafe..e72dc17 100644
--- a/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KStreamImpl.java
+++ b/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KStreamImpl.java
@@ -120,6 +120,8 @@ public class KStreamImpl<K, V> extends AbstractStream<K, V> implements KStream<K
private final boolean repartitionRequired;
+ private OptimizableRepartitionNode<K, V> repartitionNode;
+
KStreamImpl(final String name,
final Serde<K> keySerde,
final Serde<V> valueSerde,
@@ -918,6 +920,9 @@ public class KStreamImpl<K, V> extends AbstractStream<K, V> implements KStream<K
final Serde<V> repartitionValueSerde = valueSerdeOverride != null ? valueSerdeOverride : valSerde;
final OptimizableRepartitionNodeBuilder<K, V> optimizableRepartitionNodeBuilder =
OptimizableRepartitionNode.optimizableRepartitionNodeBuilder();
+ // we still need to create the repartitioned source each time
+ // as it increments the counter which
+ // is needed to maintain topology compatibility
final String repartitionedSourceName = createRepartitionedSource(
builder,
repartitionKeySerde,
@@ -925,8 +930,10 @@ public class KStreamImpl<K, V> extends AbstractStream<K, V> implements KStream<K
repartitionName,
optimizableRepartitionNodeBuilder);
- final OptimizableRepartitionNode<K, V> optimizableRepartitionNode = optimizableRepartitionNodeBuilder.build();
- builder.addGraphNode(streamsGraphNode, optimizableRepartitionNode);
+ if (repartitionNode == null || !name.equals(repartitionName)) {
+ repartitionNode = optimizableRepartitionNodeBuilder.build();
+ builder.addGraphNode(streamsGraphNode, repartitionNode);
+ }
return new KStreamImpl<>(
repartitionedSourceName,
@@ -934,7 +941,7 @@ public class KStreamImpl<K, V> extends AbstractStream<K, V> implements KStream<K
repartitionValueSerde,
Collections.singleton(repartitionedSourceName),
false,
- optimizableRepartitionNode,
+ repartitionNode,
builder);
}
diff --git a/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KStreamKStreamJoinTest.java b/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KStreamKStreamJoinTest.java
index 2e7fa5c..b69a854 100644
--- a/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KStreamKStreamJoinTest.java
+++ b/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KStreamKStreamJoinTest.java
@@ -19,9 +19,11 @@ package org.apache.kafka.streams.kstream.internals;
import org.apache.kafka.common.serialization.IntegerSerializer;
import org.apache.kafka.common.serialization.Serdes;
import org.apache.kafka.common.serialization.StringSerializer;
+import org.apache.kafka.streams.KeyValue;
import org.apache.kafka.streams.KeyValueTimestamp;
import org.apache.kafka.streams.StreamsBuilder;
import org.apache.kafka.streams.StreamsConfig;
+import org.apache.kafka.streams.Topology;
import org.apache.kafka.streams.TopologyTestDriver;
import org.apache.kafka.streams.TopologyWrapper;
import org.apache.kafka.streams.errors.StreamsException;
@@ -77,6 +79,38 @@ public class KStreamKStreamJoinTest {
shouldLogAndMeterOnSkippedRecordsWithNullValue(StreamsConfig.METRICS_LATEST);
}
+
+ @Test
+ public void shouldReuseRepartitionTopicWithGeneratedName() {
+ final StreamsBuilder builder = new StreamsBuilder();
+ final Properties props = new Properties();
+ props.put(StreamsConfig.TOPOLOGY_OPTIMIZATION, StreamsConfig.NO_OPTIMIZATION);
+ final KStream<String, String> stream1 = builder.stream("topic", Consumed.with(Serdes.String(), Serdes.String()));
+ final KStream<String, String> stream2 = builder.stream("topic2", Consumed.with(Serdes.String(), Serdes.String()));
+ final KStream<String, String> stream3 = builder.stream("topic3", Consumed.with(Serdes.String(), Serdes.String()));
+ final KStream<String, String> newStream = stream1.map((k, v) -> new KeyValue<>(v, k));
+ newStream.join(stream2, (value1, value2) -> value1 + value2, JoinWindows.of(ofMillis(100))).to("out-one");
+ newStream.join(stream3, (value1, value2) -> value1 + value2, JoinWindows.of(ofMillis(100))).to("out-to");
+ assertEquals(expectedTopologyWithGeneratedRepartitionTopic, builder.build(props).describe().toString());
+ }
+
+ @Test
+ public void shouldCreateRepartitionTopicsWithUserProvidedName() {
+ final StreamsBuilder builder = new StreamsBuilder();
+ final Properties props = new Properties();
+ props.put(StreamsConfig.TOPOLOGY_OPTIMIZATION, StreamsConfig.NO_OPTIMIZATION);
+ final KStream<String, String> stream1 = builder.stream("topic", Consumed.with(Serdes.String(), Serdes.String()));
+ final KStream<String, String> stream2 = builder.stream("topic2", Consumed.with(Serdes.String(), Serdes.String()));
+ final KStream<String, String> stream3 = builder.stream("topic3", Consumed.with(Serdes.String(), Serdes.String()));
+ final KStream<String, String> newStream = stream1.map((k, v) -> new KeyValue<>(v, k));
+ final StreamJoined<String, String, String> streamJoined = StreamJoined.with(Serdes.String(), Serdes.String(), Serdes.String());
+ newStream.join(stream2, (value1, value2) -> value1 + value2, JoinWindows.of(ofMillis(100)), streamJoined.withName("first-join")).to("out-one");
+ newStream.join(stream3, (value1, value2) -> value1 + value2, JoinWindows.of(ofMillis(100)), streamJoined.withName("second-join")).to("out-two");
+ final Topology topology = builder.build(props);
+ System.out.println(topology.describe().toString());
+ assertEquals(expectedTopologyWithUserNamedRepartitionTopics, topology.describe().toString());
+ }
+
private void shouldLogAndMeterOnSkippedRecordsWithNullValue(final String builtInMetricsVersion) {
final StreamsBuilder builder = new StreamsBuilder();
@@ -1557,4 +1591,122 @@ public class KStreamKStreamJoinTest {
}
+ private final String expectedTopologyWithUserNamedRepartitionTopics = "Topologies:\n" +
+ " Sub-topology: 0\n" +
+ " Source: KSTREAM-SOURCE-0000000000 (topics: [topic])\n" +
+ " --> KSTREAM-MAP-0000000003\n" +
+ " Processor: KSTREAM-MAP-0000000003 (stores: [])\n" +
+ " --> second-join-left-repartition-filter, first-join-left-repartition-filter\n" +
+ " <-- KSTREAM-SOURCE-0000000000\n" +
+ " Processor: first-join-left-repartition-filter (stores: [])\n" +
+ " --> first-join-left-repartition-sink\n" +
+ " <-- KSTREAM-MAP-0000000003\n" +
+ " Processor: second-join-left-repartition-filter (stores: [])\n" +
+ " --> second-join-left-repartition-sink\n" +
+ " <-- KSTREAM-MAP-0000000003\n" +
+ " Sink: first-join-left-repartition-sink (topic: first-join-left-repartition)\n" +
+ " <-- first-join-left-repartition-filter\n" +
+ " Sink: second-join-left-repartition-sink (topic: second-join-left-repartition)\n" +
+ " <-- second-join-left-repartition-filter\n" +
+ "\n" +
+ " Sub-topology: 1\n" +
+ " Source: KSTREAM-SOURCE-0000000001 (topics: [topic2])\n" +
+ " --> first-join-other-windowed\n" +
+ " Source: first-join-left-repartition-source (topics: [first-join-left-repartition])\n" +
+ " --> first-join-this-windowed\n" +
+ " Processor: first-join-other-windowed (stores: [KSTREAM-JOINOTHER-0000000010-store])\n" +
+ " --> first-join-other-join\n" +
+ " <-- KSTREAM-SOURCE-0000000001\n" +
+ " Processor: first-join-this-windowed (stores: [KSTREAM-JOINTHIS-0000000009-store])\n" +
+ " --> first-join-this-join\n" +
+ " <-- first-join-left-repartition-source\n" +
+ " Processor: first-join-other-join (stores: [KSTREAM-JOINTHIS-0000000009-store])\n" +
+ " --> first-join-merge\n" +
+ " <-- first-join-other-windowed\n" +
+ " Processor: first-join-this-join (stores: [KSTREAM-JOINOTHER-0000000010-store])\n" +
+ " --> first-join-merge\n" +
+ " <-- first-join-this-windowed\n" +
+ " Processor: first-join-merge (stores: [])\n" +
+ " --> KSTREAM-SINK-0000000012\n" +
+ " <-- first-join-this-join, first-join-other-join\n" +
+ " Sink: KSTREAM-SINK-0000000012 (topic: out-one)\n" +
+ " <-- first-join-merge\n" +
+ "\n" +
+ " Sub-topology: 2\n" +
+ " Source: KSTREAM-SOURCE-0000000002 (topics: [topic3])\n" +
+ " --> second-join-other-windowed\n" +
+ " Source: second-join-left-repartition-source (topics: [second-join-left-repartition])\n" +
+ " --> second-join-this-windowed\n" +
+ " Processor: second-join-other-windowed (stores: [KSTREAM-JOINOTHER-0000000019-store])\n" +
+ " --> second-join-other-join\n" +
+ " <-- KSTREAM-SOURCE-0000000002\n" +
+ " Processor: second-join-this-windowed (stores: [KSTREAM-JOINTHIS-0000000018-store])\n" +
+ " --> second-join-this-join\n" +
+ " <-- second-join-left-repartition-source\n" +
+ " Processor: second-join-other-join (stores: [KSTREAM-JOINTHIS-0000000018-store])\n" +
+ " --> second-join-merge\n" +
+ " <-- second-join-other-windowed\n" +
+ " Processor: second-join-this-join (stores: [KSTREAM-JOINOTHER-0000000019-store])\n" +
+ " --> second-join-merge\n" +
+ " <-- second-join-this-windowed\n" +
+ " Processor: second-join-merge (stores: [])\n" +
+ " --> KSTREAM-SINK-0000000021\n" +
+ " <-- second-join-this-join, second-join-other-join\n" +
+ " Sink: KSTREAM-SINK-0000000021 (topic: out-two)\n" +
+ " <-- second-join-merge\n\n";
+
+ private final String expectedTopologyWithGeneratedRepartitionTopic = "Topologies:\n" +
+ " Sub-topology: 0\n" +
+ " Source: KSTREAM-SOURCE-0000000000 (topics: [topic])\n" +
+ " --> KSTREAM-MAP-0000000003\n" +
+ " Processor: KSTREAM-MAP-0000000003 (stores: [])\n" +
+ " --> KSTREAM-FILTER-0000000005\n" +
+ " <-- KSTREAM-SOURCE-0000000000\n" +
+ " Processor: KSTREAM-FILTER-0000000005 (stores: [])\n" +
+ " --> KSTREAM-SINK-0000000004\n" +
+ " <-- KSTREAM-MAP-0000000003\n" +
+ " Sink: KSTREAM-SINK-0000000004 (topic: KSTREAM-MAP-0000000003-repartition)\n" +
+ " <-- KSTREAM-FILTER-0000000005\n" +
+ "\n" +
+ " Sub-topology: 1\n" +
+ " Source: KSTREAM-SOURCE-0000000006 (topics: [KSTREAM-MAP-0000000003-repartition])\n" +
+ " --> KSTREAM-WINDOWED-0000000007, KSTREAM-WINDOWED-0000000016\n" +
+ " Source: KSTREAM-SOURCE-0000000001 (topics: [topic2])\n" +
+ " --> KSTREAM-WINDOWED-0000000008\n" +
+ " Source: KSTREAM-SOURCE-0000000002 (topics: [topic3])\n" +
+ " --> KSTREAM-WINDOWED-0000000017\n" +
+ " Processor: KSTREAM-WINDOWED-0000000007 (stores: [KSTREAM-JOINTHIS-0000000009-store])\n" +
+ " --> KSTREAM-JOINTHIS-0000000009\n" +
+ " <-- KSTREAM-SOURCE-0000000006\n" +
+ " Processor: KSTREAM-WINDOWED-0000000008 (stores: [KSTREAM-JOINOTHER-0000000010-store])\n" +
+ " --> KSTREAM-JOINOTHER-0000000010\n" +
+ " <-- KSTREAM-SOURCE-0000000001\n" +
+ " Processor: KSTREAM-WINDOWED-0000000016 (stores: [KSTREAM-JOINTHIS-0000000018-store])\n" +
+ " --> KSTREAM-JOINTHIS-0000000018\n" +
+ " <-- KSTREAM-SOURCE-0000000006\n" +
+ " Processor: KSTREAM-WINDOWED-0000000017 (stores: [KSTREAM-JOINOTHER-0000000019-store])\n" +
+ " --> KSTREAM-JOINOTHER-0000000019\n" +
+ " <-- KSTREAM-SOURCE-0000000002\n" +
+ " Processor: KSTREAM-JOINOTHER-0000000010 (stores: [KSTREAM-JOINTHIS-0000000009-store])\n" +
+ " --> KSTREAM-MERGE-0000000011\n" +
+ " <-- KSTREAM-WINDOWED-0000000008\n" +
+ " Processor: KSTREAM-JOINOTHER-0000000019 (stores: [KSTREAM-JOINTHIS-0000000018-store])\n" +
+ " --> KSTREAM-MERGE-0000000020\n" +
+ " <-- KSTREAM-WINDOWED-0000000017\n" +
+ " Processor: KSTREAM-JOINTHIS-0000000009 (stores: [KSTREAM-JOINOTHER-0000000010-store])\n" +
+ " --> KSTREAM-MERGE-0000000011\n" +
+ " <-- KSTREAM-WINDOWED-0000000007\n" +
+ " Processor: KSTREAM-JOINTHIS-0000000018 (stores: [KSTREAM-JOINOTHER-0000000019-store])\n" +
+ " --> KSTREAM-MERGE-0000000020\n" +
+ " <-- KSTREAM-WINDOWED-0000000016\n" +
+ " Processor: KSTREAM-MERGE-0000000011 (stores: [])\n" +
+ " --> KSTREAM-SINK-0000000012\n" +
+ " <-- KSTREAM-JOINTHIS-0000000009, KSTREAM-JOINOTHER-0000000010\n" +
+ " Processor: KSTREAM-MERGE-0000000020 (stores: [])\n" +
+ " --> KSTREAM-SINK-0000000021\n" +
+ " <-- KSTREAM-JOINTHIS-0000000018, KSTREAM-JOINOTHER-0000000019\n" +
+ " Sink: KSTREAM-SINK-0000000012 (topic: out-one)\n" +
+ " <-- KSTREAM-MERGE-0000000011\n" +
+ " Sink: KSTREAM-SINK-0000000021 (topic: out-to)\n" +
+ " <-- KSTREAM-MERGE-0000000020\n\n";
}
\ No newline at end of file
diff --git a/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KStreamKTableJoinTest.java b/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KStreamKTableJoinTest.java
index 7bee745..d1b70d4 100644
--- a/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KStreamKTableJoinTest.java
+++ b/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KStreamKTableJoinTest.java
@@ -16,19 +16,35 @@
*/
package org.apache.kafka.streams.kstream.internals;
+import static org.apache.kafka.test.StreamsTestUtils.getMetricByName;
+import static org.hamcrest.CoreMatchers.hasItem;
+import static org.hamcrest.MatcherAssert.assertThat;
+import static org.junit.Assert.assertEquals;
+
+import java.time.Duration;
+import java.time.Instant;
+import java.util.Arrays;
+import java.util.Collection;
+import java.util.HashSet;
+import java.util.Properties;
+import java.util.Random;
+import java.util.Set;
import org.apache.kafka.common.serialization.IntegerSerializer;
import org.apache.kafka.common.serialization.Serdes;
import org.apache.kafka.common.serialization.StringSerializer;
+import org.apache.kafka.streams.KeyValue;
import org.apache.kafka.streams.KeyValueTimestamp;
import org.apache.kafka.streams.StreamsBuilder;
import org.apache.kafka.streams.StreamsConfig;
+import org.apache.kafka.streams.TestInputTopic;
+import org.apache.kafka.streams.Topology;
import org.apache.kafka.streams.TopologyTestDriver;
import org.apache.kafka.streams.TopologyWrapper;
import org.apache.kafka.streams.kstream.Consumed;
+import org.apache.kafka.streams.kstream.Joined;
import org.apache.kafka.streams.kstream.KStream;
import org.apache.kafka.streams.kstream.KTable;
import org.apache.kafka.streams.processor.internals.testutil.LogCaptureAppender;
-import org.apache.kafka.streams.TestInputTopic;
import org.apache.kafka.test.MockProcessor;
import org.apache.kafka.test.MockProcessorSupplier;
import org.apache.kafka.test.MockValueJoiner;
@@ -37,20 +53,6 @@ import org.junit.After;
import org.junit.Before;
import org.junit.Test;
-import java.time.Duration;
-import java.time.Instant;
-import java.util.Arrays;
-import java.util.Collection;
-import java.util.HashSet;
-import java.util.Properties;
-import java.util.Random;
-import java.util.Set;
-
-import static org.apache.kafka.test.StreamsTestUtils.getMetricByName;
-import static org.hamcrest.CoreMatchers.hasItem;
-import static org.hamcrest.MatcherAssert.assertThat;
-import static org.junit.Assert.assertEquals;
-
public class KStreamKTableJoinTest {
private final static KeyValueTimestamp[] EMPTY = new KeyValueTimestamp[0];
@@ -113,6 +115,38 @@ public class KStreamKTableJoinTest {
}
@Test
+ public void shouldReuseRepartitionTopicWithGeneratedName() {
+ final StreamsBuilder builder = new StreamsBuilder();
+ final Properties props = new Properties();
+ props.put(StreamsConfig.TOPOLOGY_OPTIMIZATION, StreamsConfig.NO_OPTIMIZATION);
+ final KStream<String, String> streamA = builder.stream("topic", Consumed.with(Serdes.String(), Serdes.String()));
+ final KTable<String, String> tableB = builder.table("topic2", Consumed.with(Serdes.String(), Serdes.String()));
+ final KTable<String, String> tableC = builder.table("topic3", Consumed.with(Serdes.String(), Serdes.String()));
+ final KStream<String, String> rekeyedStream = streamA.map((k, v) -> new KeyValue<>(v, k));
+ rekeyedStream.join(tableB, (value1, value2) -> value1 + value2).to("out-one");
+ rekeyedStream.join(tableC, (value1, value2) -> value1 + value2).to("out-two");
+ final Topology topology = builder.build(props);
+ assertEquals(expectedTopologyWithGeneratedRepartitionTopicNames, topology.describe().toString());
+ }
+
+ @Test
+ public void shouldCreateRepartitionTopicsWithUserProvidedName() {
+ final StreamsBuilder builder = new StreamsBuilder();
+ final Properties props = new Properties();
+ props.put(StreamsConfig.TOPOLOGY_OPTIMIZATION, StreamsConfig.NO_OPTIMIZATION);
+ final KStream<String, String> streamA = builder.stream("topic", Consumed.with(Serdes.String(), Serdes.String()));
+ final KTable<String, String> tableB = builder.table("topic2", Consumed.with(Serdes.String(), Serdes.String()));
+ final KTable<String, String> tableC = builder.table("topic3", Consumed.with(Serdes.String(), Serdes.String()));
+ final KStream<String, String> rekeyedStream = streamA.map((k, v) -> new KeyValue<>(v, k));
+
+ rekeyedStream.join(tableB, (value1, value2) -> value1 + value2, Joined.with(Serdes.String(), Serdes.String(), Serdes.String(), "first-join")).to("out-one");
+ rekeyedStream.join(tableC, (value1, value2) -> value1 + value2, Joined.with(Serdes.String(), Serdes.String(), Serdes.String(), "second-join")).to("out-two");
+ final Topology topology = builder.build(props);
+ System.out.println(topology.describe().toString());
+ assertEquals(expectedTopologyWithUserProvidedRepartitionTopicNames, topology.describe().toString());
+ }
+
+ @Test
public void shouldRequireCopartitionedStreams() {
final Collection<Set<String>> copartitionGroups =
TopologyWrapper.getInternalTopologyBuilder(builder.build()).copartitionGroups();
@@ -256,4 +290,91 @@ public class KStreamKTableJoinTest {
+ "offset=[0]")
);
}
+
+
+ private final String expectedTopologyWithGeneratedRepartitionTopicNames =
+ "Topologies:\n"
+ + " Sub-topology: 0\n"
+ + " Source: KSTREAM-SOURCE-0000000000 (topics: [topic])\n"
+ + " --> KSTREAM-MAP-0000000007\n"
+ + " Processor: KSTREAM-MAP-0000000007 (stores: [])\n"
+ + " --> KSTREAM-FILTER-0000000009\n"
+ + " <-- KSTREAM-SOURCE-0000000000\n"
+ + " Processor: KSTREAM-FILTER-0000000009 (stores: [])\n"
+ + " --> KSTREAM-SINK-0000000008\n"
+ + " <-- KSTREAM-MAP-0000000007\n"
+ + " Sink: KSTREAM-SINK-0000000008 (topic: KSTREAM-MAP-0000000007-repartition)\n"
+ + " <-- KSTREAM-FILTER-0000000009\n"
+ + "\n"
+ + " Sub-topology: 1\n"
+ + " Source: KSTREAM-SOURCE-0000000010 (topics: [KSTREAM-MAP-0000000007-repartition])\n"
+ + " --> KSTREAM-JOIN-0000000011, KSTREAM-JOIN-0000000016\n"
+ + " Processor: KSTREAM-JOIN-0000000011 (stores: [topic2-STATE-STORE-0000000001])\n"
+ + " --> KSTREAM-SINK-0000000012\n"
+ + " <-- KSTREAM-SOURCE-0000000010\n"
+ + " Processor: KSTREAM-JOIN-0000000016 (stores: [topic3-STATE-STORE-0000000004])\n"
+ + " --> KSTREAM-SINK-0000000017\n"
+ + " <-- KSTREAM-SOURCE-0000000010\n"
+ + " Source: KSTREAM-SOURCE-0000000002 (topics: [topic2])\n"
+ + " --> KTABLE-SOURCE-0000000003\n"
+ + " Source: KSTREAM-SOURCE-0000000005 (topics: [topic3])\n"
+ + " --> KTABLE-SOURCE-0000000006\n"
+ + " Sink: KSTREAM-SINK-0000000012 (topic: out-one)\n"
+ + " <-- KSTREAM-JOIN-0000000011\n"
+ + " Sink: KSTREAM-SINK-0000000017 (topic: out-two)\n"
+ + " <-- KSTREAM-JOIN-0000000016\n"
+ + " Processor: KTABLE-SOURCE-0000000003 (stores: [topic2-STATE-STORE-0000000001])\n"
+ + " --> none\n"
+ + " <-- KSTREAM-SOURCE-0000000002\n"
+ + " Processor: KTABLE-SOURCE-0000000006 (stores: [topic3-STATE-STORE-0000000004])\n"
+ + " --> none\n"
+ + " <-- KSTREAM-SOURCE-0000000005\n\n";
+
+
+ private final String expectedTopologyWithUserProvidedRepartitionTopicNames =
+ "Topologies:\n"
+ + " Sub-topology: 0\n"
+ + " Source: KSTREAM-SOURCE-0000000000 (topics: [topic])\n"
+ + " --> KSTREAM-MAP-0000000007\n"
+ + " Processor: KSTREAM-MAP-0000000007 (stores: [])\n"
+ + " --> first-join-repartition-filter, second-join-repartition-filter\n"
+ + " <-- KSTREAM-SOURCE-0000000000\n"
+ + " Processor: first-join-repartition-filter (stores: [])\n"
+ + " --> first-join-repartition-sink\n"
+ + " <-- KSTREAM-MAP-0000000007\n"
+ + " Processor: second-join-repartition-filter (stores: [])\n"
+ + " --> second-join-repartition-sink\n"
+ + " <-- KSTREAM-MAP-0000000007\n"
+ + " Sink: first-join-repartition-sink (topic: first-join-repartition)\n"
+ + " <-- first-join-repartition-filter\n"
+ + " Sink: second-join-repartition-sink (topic: second-join-repartition)\n"
+ + " <-- second-join-repartition-filter\n"
+ + "\n"
+ + " Sub-topology: 1\n"
+ + " Source: first-join-repartition-source (topics: [first-join-repartition])\n"
+ + " --> first-join\n"
+ + " Source: KSTREAM-SOURCE-0000000002 (topics: [topic2])\n"
+ + " --> KTABLE-SOURCE-0000000003\n"
+ + " Processor: first-join (stores: [topic2-STATE-STORE-0000000001])\n"
+ + " --> KSTREAM-SINK-0000000012\n"
+ + " <-- first-join-repartition-source\n"
+ + " Sink: KSTREAM-SINK-0000000012 (topic: out-one)\n"
+ + " <-- first-join\n"
+ + " Processor: KTABLE-SOURCE-0000000003 (stores: [topic2-STATE-STORE-0000000001])\n"
+ + " --> none\n"
+ + " <-- KSTREAM-SOURCE-0000000002\n"
+ + "\n"
+ + " Sub-topology: 2\n"
+ + " Source: second-join-repartition-source (topics: [second-join-repartition])\n"
+ + " --> second-join\n"
+ + " Source: KSTREAM-SOURCE-0000000005 (topics: [topic3])\n"
+ + " --> KTABLE-SOURCE-0000000006\n"
+ + " Processor: second-join (stores: [topic3-STATE-STORE-0000000004])\n"
+ + " --> KSTREAM-SINK-0000000017\n"
+ + " <-- second-join-repartition-source\n"
+ + " Sink: KSTREAM-SINK-0000000017 (topic: out-two)\n"
+ + " <-- second-join\n"
+ + " Processor: KTABLE-SOURCE-0000000006 (stores: [topic3-STATE-STORE-0000000004])\n"
+ + " --> none\n"
+ + " <-- KSTREAM-SOURCE-0000000005\n\n";
}