You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@kafka.apache.org by gu...@apache.org on 2018/11/29 02:30:56 UTC

[kafka] branch 1.1 updated: KAFKA-7671: Stream-Global Table join should not reset repartition flag (#5959)

This is an automated email from the ASF dual-hosted git repository.

guozhang pushed a commit to branch 1.1
in repository https://gitbox.apache.org/repos/asf/kafka.git


The following commit(s) were added to refs/heads/1.1 by this push:
     new 7f69d17  KAFKA-7671: Stream-Global Table join should not reset repartition flag (#5959)
7f69d17 is described below

commit 7f69d1767e3e0fd51416f65ff6a9ba1a65ed74dc
Author: Bill Bejeck <bb...@gmail.com>
AuthorDate: Wed Nov 28 21:15:26 2018 -0500

    KAFKA-7671: Stream-Global Table join should not reset repartition flag (#5959)
    
    This PR fixes an issue reported from a user. When we join a KStream with a GlobalKTable we should not reset the repartition flag as the stream may have previously changed its key, and the resulting stream could be used in an aggregation operation or join with another stream which may require a repartition for correct results.
    
    I've added a test which fails without the fix.
    
    Reviewers: John Roesler <jo...@confluent.io>, Matthias J. Sax <ma...@confluent.io>, Guozhang Wang <wa...@gmail.com>
---
 .../streams/kstream/internals/KStreamImpl.java     |  3 ++-
 .../streams/kstream/internals/KStreamImplTest.java | 23 ++++++++++++++++++++++
 2 files changed, 25 insertions(+), 1 deletion(-)

diff --git a/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KStreamImpl.java b/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KStreamImpl.java
index 8aaab49..549f3ce 100644
--- a/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KStreamImpl.java
+++ b/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KStreamImpl.java
@@ -779,8 +779,9 @@ public class KStreamImpl<K, V> extends AbstractStream<K> implements KStream<K, V
 
         final KTableValueGetterSupplier<K1, V1> valueGetterSupplier = ((GlobalKTableImpl<K1, V1>) globalTable).valueGetterSupplier();
         final String name = builder.newProcessorName(LEFTJOIN_NAME);
+
         builder.internalTopologyBuilder.addProcessor(name, new KStreamGlobalKTableJoin<>(valueGetterSupplier, joiner, keyMapper, leftJoin), this.name);
-        return new KStreamImpl<>(builder, name, sourceNodes, false);
+        return new KStreamImpl<>(builder, name, sourceNodes, repartitionRequired);
     }
 
     @SuppressWarnings("unchecked")
diff --git a/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KStreamImplTest.java b/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KStreamImplTest.java
index 2009806..95b364d 100644
--- a/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KStreamImplTest.java
+++ b/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KStreamImplTest.java
@@ -53,11 +53,15 @@ import org.junit.Test;
 import java.util.Arrays;
 import java.util.Collections;
 import java.util.concurrent.TimeUnit;
+import java.util.regex.Matcher;
+import java.util.regex.Pattern;
 
 import static org.hamcrest.CoreMatchers.equalTo;
+import static org.hamcrest.CoreMatchers.notNullValue;
 import static org.hamcrest.core.IsInstanceOf.instanceOf;
 import static org.junit.Assert.assertEquals;
 import static org.junit.Assert.assertThat;
+import static org.junit.Assert.assertTrue;
 import static org.junit.Assert.fail;
 
 
@@ -255,6 +259,25 @@ public class KStreamImplTest {
             }
         }
     }
+
+    @Test
+    public void shouldPropagateRepartitionFlagAfterGlobalKTableJoin() {
+        final StreamsBuilder builder = new StreamsBuilder();
+        final GlobalKTable<String, String> globalKTable = builder.globalTable("globalTopic");
+        builder.<String, String>stream("topic").selectKey(MockMapper.<Object, String>selectValueMapper())
+            .join(globalKTable, MockMapper.<Object, String>selectValueMapper(), MockValueJoiner.<String, String>instance("."))
+            .groupByKey()
+            .count();
+
+        final Pattern repartitionTopicPattern = Pattern.compile("Sink: .*-repartition");
+        final String topology = builder.build().describe().toString();
+        final Matcher matcher = repartitionTopicPattern.matcher(topology);
+        assertTrue(matcher.find());
+        final String match = matcher.group();
+        assertThat(match, notNullValue());
+        assertTrue(match.endsWith("repartition"));
+
+    }
     
     @Test
     public void testToWithNullValueSerdeDoesntNPE() {