You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@kafka.apache.org by gu...@apache.org on 2018/12/22 06:56:41 UTC

[kafka] branch trunk updated: MINOR: Add test demonstrating re-use of KGroupedStream with Optimizations enabled (#6050)

This is an automated email from the ASF dual-hosted git repository.

guozhang pushed a commit to branch trunk
in repository https://gitbox.apache.org/repos/asf/kafka.git


The following commit(s) were added to refs/heads/trunk by this push:
     new 4616c0a  MINOR: Add test demonstrating re-use of KGroupedStream with Optimizations enabled (#6050)
4616c0a is described below

commit 4616c0aaff5766b6305baeed521efdfaae0094e8
Author: Bill Bejeck <bb...@gmail.com>
AuthorDate: Sat Dec 22 01:56:32 2018 -0500

    MINOR: Add test demonstrating re-use of KGroupedStream with Optimizations enabled (#6050)
    
    Right now if a repartition is required and users choose to name the repartition topic for an aggregation i.e. kGroupedStream = builder.<String, String>stream("topic").selectKey((k, v) -> k).groupByKey(Grouped.as("grouping")); The resulting KGroupedStream can't be reused
    with optimizations are disabled, as Streams will attempt to create two repartiton topics with the same name.
    
    However, if optimizations are enabled then the resulting KGroupedStream can be re-used
    For example the following will work if optimizations are enabled.
    
    This PR provides a unit test proving as much.
    
    Reviewers: Matthias J. Sax <mj...@apache.org>, Guozhang Wang <wa...@gmail.com>
---
 .../streams/kstream/RepartitionTopicNamingTest.java      | 16 ++++++++++++++--
 1 file changed, 14 insertions(+), 2 deletions(-)

diff --git a/streams/src/test/java/org/apache/kafka/streams/kstream/RepartitionTopicNamingTest.java b/streams/src/test/java/org/apache/kafka/streams/kstream/RepartitionTopicNamingTest.java
index 929bc3f..fe75191 100644
--- a/streams/src/test/java/org/apache/kafka/streams/kstream/RepartitionTopicNamingTest.java
+++ b/streams/src/test/java/org/apache/kafka/streams/kstream/RepartitionTopicNamingTest.java
@@ -53,6 +53,7 @@ public class RepartitionTopicNamingTest {
     private final String secondRepartitionTopicName = "aggregate-stream";
     private final String thirdRepartitionTopicName = "reduced-stream";
     private final String fourthRepartitionTopicName = "joined-stream";
+    private final Pattern repartitionTopicPattern = Pattern.compile("Sink: .*-repartition");
 
 
     @Test
@@ -60,7 +61,6 @@ public class RepartitionTopicNamingTest {
 
         final String optimizedTopology = buildTopology(StreamsConfig.OPTIMIZE).describe().toString();
         final String unOptimizedTopology = buildTopology(StreamsConfig.NO_OPTIMIZATION).describe().toString();
-        final Pattern repartitionTopicPattern = Pattern.compile("Sink: .*-repartition");
 
         assertThat(optimizedTopology, is(EXPECTED_OPTIMIZED_TOPOLOGY));
         // only one repartition topic
@@ -95,7 +95,7 @@ public class RepartitionTopicNamingTest {
     }
 
     // each KGroupedStream will result in repartition, can't reuse
-    // KGroupedStreams when specifying repartition topic names
+    // KGroupedStreams when specifying repartition topic names and Optimization is turned off
     // need to have separate groupByKey calls when naming repartition topics
     // see test shouldHandleUniqueGroupedInstances below for an example
     @Test
@@ -112,6 +112,18 @@ public class RepartitionTopicNamingTest {
         }
     }
 
+    @Test
+    public void shouldNotFailWithSameRepartitionTopicNameUsingSameKGroupedStreamOptimizationsOn() {
+        final StreamsBuilder builder = new StreamsBuilder();
+        final KGroupedStream<String, String> kGroupedStream = builder.<String, String>stream("topic").selectKey((k, v) -> k).groupByKey(Grouped.as("grouping"));
+        kGroupedStream.windowedBy(TimeWindows.of(Duration.ofMillis(10L))).count();
+        kGroupedStream.windowedBy(TimeWindows.of(Duration.ofMillis(30L))).count();
+        final Properties properties = new Properties();
+        properties.put(StreamsConfig.TOPOLOGY_OPTIMIZATION, StreamsConfig.OPTIMIZE);
+        final Topology topology = builder.build(properties);
+        assertThat(getCountOfRepartitionTopicsFound(topology.describe().toString(), repartitionTopicPattern), is(1));
+    }
+
 
     // can't use same repartition topic name in joins
     @Test