You are viewing a plain text version of this content. The canonical link for it is here.
Posted to jira@kafka.apache.org by GitBox <gi...@apache.org> on 2020/05/20 22:26:40 UTC

[GitHub] [kafka] bbejeck commented on a change in pull request #8504: KAFKA-9298: reuse mapped stream error in joins

bbejeck commented on a change in pull request #8504:
URL: https://github.com/apache/kafka/pull/8504#discussion_r428343055



##########
File path: streams/src/test/java/org/apache/kafka/streams/kstream/internals/KStreamKStreamJoinTest.java
##########
@@ -77,6 +79,38 @@ public void shouldLogAndMeterOnSkippedRecordsWithNullValueWithBuiltInMetricsVers
         shouldLogAndMeterOnSkippedRecordsWithNullValue(StreamsConfig.METRICS_LATEST);
     }
 
+
+    @Test
+    public void shouldReuseRepartitionTopicWithGeneratedName() {
+        final StreamsBuilder builder = new StreamsBuilder();
+        final Properties props = new Properties();
+        props.put(StreamsConfig.TOPOLOGY_OPTIMIZATION, StreamsConfig.NO_OPTIMIZATION);
+        final KStream<String, String> stream1 = builder.stream("topic", Consumed.with(Serdes.String(), Serdes.String()));
+        final KStream<String, String> stream2 = builder.stream("topic2", Consumed.with(Serdes.String(), Serdes.String()));
+        final KStream<String, String> stream3 = builder.stream("topic3", Consumed.with(Serdes.String(), Serdes.String()));
+        final KStream<String, String> newStream = stream1.map((k, v) -> new KeyValue<>(v, k));
+        newStream.join(stream2, (value1, value2) -> value1 + value2, JoinWindows.of(ofMillis(100))).to("out-one");
+        newStream.join(stream3, (value1, value2) -> value1 + value2, JoinWindows.of(ofMillis(100))).to("out-to");
+        assertEquals(expectedTopologyWithGeneratedRepartitionTopic, builder.build(props).describe().toString());
+    }
+
+    @Test
+    public void shouldCreateRepartitionTopicsWithUserProvidedName() {
+        final StreamsBuilder builder = new StreamsBuilder();
+        final Properties props = new Properties();
+        props.put(StreamsConfig.TOPOLOGY_OPTIMIZATION, StreamsConfig.NO_OPTIMIZATION);
+        final KStream<String, String> stream1 = builder.stream("topic", Consumed.with(Serdes.String(), Serdes.String()));
+        final KStream<String, String> stream2 = builder.stream("topic2", Consumed.with(Serdes.String(), Serdes.String()));
+        final KStream<String, String> stream3 = builder.stream("topic3", Consumed.with(Serdes.String(), Serdes.String()));
+        final KStream<String, String> newStream = stream1.map((k, v) -> new KeyValue<>(v, k));
+        final StreamJoined<String, String, String> streamJoined = StreamJoined.with(Serdes.String(), Serdes.String(), Serdes.String());
+        newStream.join(stream2, (value1, value2) -> value1 + value2, JoinWindows.of(ofMillis(100)), streamJoined.withName("first-join")).to("out-one");
+        newStream.join(stream3, (value1, value2) -> value1 + value2, JoinWindows.of(ofMillis(100)), streamJoined.withName("second-join")).to("out-two");
+        final Topology topology =  builder.build(props);
+        System.out.println(topology.describe().toString());
+        assertEquals(expectedTopologyWithUserNamedRepartitionTopics, topology.describe().toString());

Review comment:
       >This could be fixed by inserting a repartition() i the new code enforcing the old name -- however, this make me wonder if we might want to throw a "naming conflict" (ie, cannot pick a name) exception based on the original topology for this case when both operators are named, and tell people to insert repartition() right away? For this case, if they later remove a join it's clear what is happening to them.
   
   I see your point, but I think that is a bad user experience and IMHO leaks too much detail about an operation we want to handle automatically.
   
   I'm leaning towards the simpler case of what we had before.  With generated names re-use the reputation node, but if the user creates a new join with explicit names, just go ahead and create two repartition topics.
   
   WDYT?




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org