You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@kafka.apache.org by da...@apache.org on 2017/09/26 08:42:58 UTC

kafka git commit: KAFKA-5765; Move merge() from StreamsBuilder to KStream

Repository: kafka
Updated Branches:
  refs/heads/trunk 4e43a7231 -> b8be86b80


KAFKA-5765; Move merge() from StreamsBuilder to KStream

This is the polished version.
1. The old merge() method in StreamsBuilder has been removed,
2. The merge() method in KStreamBuilder was changed so that it would use the single variable argument
rather than several variable arguments in the KStreamImpl implementation
3. The merge() method in KStream has been declared as final and tests have been added to test correctness.

Author: Richard Yu <ri...@Richards-Air.attlocal.net>

Reviewers: Matthias J. Sax <ma...@confluent.io>, Bill Bejeck <bi...@confluent.io>, Guozhang Wang <wa...@gmail.com>, Damian Guy <da...@gmail.com>

Closes #3916 from ConcurrencyPractitioner/trunk


Project: http://git-wip-us.apache.org/repos/asf/kafka/repo
Commit: http://git-wip-us.apache.org/repos/asf/kafka/commit/b8be86b8
Tree: http://git-wip-us.apache.org/repos/asf/kafka/tree/b8be86b8
Diff: http://git-wip-us.apache.org/repos/asf/kafka/diff/b8be86b8

Branch: refs/heads/trunk
Commit: b8be86b80543e41fd3181a8de8f1a3ac0a72e4c5
Parents: 4e43a72
Author: Richard Yu <ri...@Richards-Air.attlocal.net>
Authored: Tue Sep 26 09:42:53 2017 +0100
Committer: Damian Guy <da...@gmail.com>
Committed: Tue Sep 26 09:42:53 2017 +0100

----------------------------------------------------------------------
 docs/streams/upgrade-guide.html                 | 10 +++-
 .../apache/kafka/streams/StreamsBuilder.java    | 14 +----
 .../apache/kafka/streams/kstream/KStream.java   | 13 +++++
 .../kafka/streams/kstream/KStreamBuilder.java   | 10 +++-
 .../internals/InternalStreamsBuilder.java       |  5 --
 .../streams/kstream/internals/KStreamImpl.java  | 30 +++++------
 .../kafka/streams/StreamsBuilderTest.java       |  6 +--
 .../streams/kstream/KStreamBuilderTest.java     |  2 +-
 .../internals/InternalStreamsBuilderTest.java   |  2 +-
 .../kstream/internals/KStreamImplTest.java      | 55 ++++++++++++++++++++
 .../internals/StreamsMetadataStateTest.java     |  2 +-
 11 files changed, 107 insertions(+), 42 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/kafka/blob/b8be86b8/docs/streams/upgrade-guide.html
----------------------------------------------------------------------
diff --git a/docs/streams/upgrade-guide.html b/docs/streams/upgrade-guide.html
index c90024f..c2835a3 100644
--- a/docs/streams/upgrade-guide.html
+++ b/docs/streams/upgrade-guide.html
@@ -84,7 +84,13 @@
         and can be obtained by calling <code>Topology#describe()</code>.
         An example using this new API is shown in the <a href="/{{version}}/documentation/streams/quickstart">quickstart section</a>.
     </p>
-
+    
+    <p>
+        With the introduction of <a href="https://cwiki.apache.org/confluence/display/KAFKA/KIP-202+Move+merge%28%29+from+StreamsBuilder+to+KStream">KIP-202</a>
+        a new method <code>merge()</code> has been created in <code>KStream</code> as the StreamsBuilder class's <code>StreamsBuilder#merge()</code> has been removed. 
+        The method signature was also changed, too: instead of providing multiple <code>KStream</code>s into the method at the once, only a single <code>KStream</code> is accepted.
+    </p>
+    
     <p>
         New methods in <code>KafkaStreams</code>:
     </p>
@@ -214,7 +220,9 @@
         If exactly-once processing is enabled via the <code>processing.guarantees</code> parameter, internally Streams switches from a producer per thread to a producer per task runtime model.
         In order to distinguish the different producers, the producer's <code>client.id</code> additionally encodes the task-ID for this case.
         Because the producer's <code>client.id</code> is used to report JMX metrics, it might be required to update tools that receive those metrics.
+    
     </p>
+
     <p> Producer's <code>client.id</code> naming schema: </p>
     <ul>
         <li> at-least-once (default): <code>[client.Id]-StreamThread-[sequence-number]</code> </li>

http://git-wip-us.apache.org/repos/asf/kafka/blob/b8be86b8/streams/src/main/java/org/apache/kafka/streams/StreamsBuilder.java
----------------------------------------------------------------------
diff --git a/streams/src/main/java/org/apache/kafka/streams/StreamsBuilder.java b/streams/src/main/java/org/apache/kafka/streams/StreamsBuilder.java
index 7e746e6..94d19ae 100644
--- a/streams/src/main/java/org/apache/kafka/streams/StreamsBuilder.java
+++ b/streams/src/main/java/org/apache/kafka/streams/StreamsBuilder.java
@@ -59,7 +59,7 @@ public class StreamsBuilder {
     final InternalTopologyBuilder internalTopologyBuilder = topology.internalTopologyBuilder;
 
     private final InternalStreamsBuilder internalStreamsBuilder = new InternalStreamsBuilder(internalTopologyBuilder);
-
+    
     /**
      * Create a {@link KStream} from the specified topics.
      * The default {@code "auto.offset.reset"} strategy, default {@link TimestampExtractor}, and default key and value
@@ -493,18 +493,6 @@ public class StreamsBuilder {
     }
 
     /**
-     * Create a new instance of {@link KStream} by merging the given {@link KStream}s.
-     * <p>
-     * There is no ordering guarantee for records from different {@link KStream}s.
-     *
-     * @param streams the {@link KStream}s to be merged
-     * @return a {@link KStream} containing all records of the given streams
-     */
-    public synchronized <K, V> KStream<K, V> merge(final KStream<K, V>... streams) {
-        return internalStreamsBuilder.merge(streams);
-    }
-
-    /**
      * Returns the {@link Topology} that represents the specified processing logic.
      *
      * @return the {@link Topology} that represents the specified processing logic

http://git-wip-us.apache.org/repos/asf/kafka/blob/b8be86b8/streams/src/main/java/org/apache/kafka/streams/kstream/KStream.java
----------------------------------------------------------------------
diff --git a/streams/src/main/java/org/apache/kafka/streams/kstream/KStream.java b/streams/src/main/java/org/apache/kafka/streams/kstream/KStream.java
index f8f99f2..c56a4ed 100644
--- a/streams/src/main/java/org/apache/kafka/streams/kstream/KStream.java
+++ b/streams/src/main/java/org/apache/kafka/streams/kstream/KStream.java
@@ -446,6 +446,19 @@ public interface KStream<K, V> {
      * @param printed options for printing
      */
     void print(final Printed<K, V> printed);
+    
+    /**
+     * Merge this stream and the given stream into one larger stream.
+     * <p>
+     * There is no ordering guarantee between records from this {@code KStream} and records from
+     * the provided {@code KStream} in the merged stream.
+     * Relative order is preserved within each input stream though (ie, records within one input
+     * stream are processed in order).
+     *
+     * @param a stream which is to be merged into this stream
+     * @return a merged stream containing all records from this and the provided {@code KStream}
+     */
+    KStream<K, V> merge(final KStream<K, V> stream);
 
     /**
      * Write the records of this stream to a file at the given path.

http://git-wip-us.apache.org/repos/asf/kafka/blob/b8be86b8/streams/src/main/java/org/apache/kafka/streams/kstream/KStreamBuilder.java
----------------------------------------------------------------------
diff --git a/streams/src/main/java/org/apache/kafka/streams/kstream/KStreamBuilder.java b/streams/src/main/java/org/apache/kafka/streams/kstream/KStreamBuilder.java
index d1dd6ac..e7bcc95 100644
--- a/streams/src/main/java/org/apache/kafka/streams/kstream/KStreamBuilder.java
+++ b/streams/src/main/java/org/apache/kafka/streams/kstream/KStreamBuilder.java
@@ -1225,8 +1225,16 @@ public class KStreamBuilder extends org.apache.kafka.streams.processor.TopologyB
      * @return a {@link KStream} containing all records of the given streams
      */
     public <K, V> KStream<K, V> merge(final KStream<K, V>... streams) {
+        Objects.requireNonNull(streams, "streams can't be null");
+        if (streams.length <= 1) {
+            throw new IllegalArgumentException("Number of arguments required needs to be greater than one.");
+        }
         try {
-            return KStreamImpl.merge(internalStreamsBuilder, streams);
+            KStream<K, V> mergedStream = streams[0];
+            for (int i = 1; i < streams.length; i++) {
+                mergedStream = mergedStream.merge(streams[i]);
+            }
+            return mergedStream;
         } catch (final org.apache.kafka.streams.errors.TopologyException e) {
             throw new org.apache.kafka.streams.errors.TopologyBuilderException(e);
         }

http://git-wip-us.apache.org/repos/asf/kafka/blob/b8be86b8/streams/src/main/java/org/apache/kafka/streams/kstream/internals/InternalStreamsBuilder.java
----------------------------------------------------------------------
diff --git a/streams/src/main/java/org/apache/kafka/streams/kstream/internals/InternalStreamsBuilder.java b/streams/src/main/java/org/apache/kafka/streams/kstream/internals/InternalStreamsBuilder.java
index 3963657..fa696fe 100644
--- a/streams/src/main/java/org/apache/kafka/streams/kstream/internals/InternalStreamsBuilder.java
+++ b/streams/src/main/java/org/apache/kafka/streams/kstream/internals/InternalStreamsBuilder.java
@@ -159,11 +159,6 @@ public class InternalStreamsBuilder {
         return new GlobalKTableImpl<>(new KTableSourceValueGetterSupplier<K, V>(storeBuilder.name()));
     }
 
-
-    public <K, V> KStream<K, V> merge(final KStream<K, V>... streams) {
-        return KStreamImpl.merge(this, streams);
-    }
-
     String newName(final String prefix) {
         return prefix + String.format("%010d", index.getAndIncrement());
     }

http://git-wip-us.apache.org/repos/asf/kafka/blob/b8be86b8/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KStreamImpl.java
----------------------------------------------------------------------
diff --git a/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KStreamImpl.java b/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KStreamImpl.java
index cbaf95a..ae3808e 100644
--- a/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KStreamImpl.java
+++ b/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KStreamImpl.java
@@ -183,7 +183,7 @@ public class KStreamImpl<K, V> extends AbstractStream<K> implements KStream<K, V
 
         return new KStreamImpl<>(builder, name, sourceNodes, this.repartitionRequired);
     }
-
+    
     @Override
     public void print() {
         print(defaultKeyValueMapper, null, null, this.name);
@@ -346,24 +346,22 @@ public class KStreamImpl<K, V> extends AbstractStream<K> implements KStream<K, V
         return branchChildren;
     }
 
-    public static <K, V> KStream<K, V> merge(final InternalStreamsBuilder builder,
-                                             final KStream<K, V>[] streams) {
-        if (streams == null || streams.length == 0) {
-            throw new IllegalArgumentException("Parameter <streams> must not be null or has length zero");
-        }
-
+    @Override 
+    public KStream<K, V> merge(final KStream<K, V> stream) {
+        Objects.requireNonNull(stream);
+        return merge(builder, stream);
+    }
+    
+    private KStream<K, V> merge(final InternalStreamsBuilder builder,
+                                final KStream<K, V> stream) {
+        KStreamImpl<K, V> streamImpl = (KStreamImpl<K, V>) stream;
         String name = builder.newName(MERGE_NAME);
-        String[] parentNames = new String[streams.length];
+        String[] parentNames = {this.name, streamImpl.name};
         Set<String> allSourceNodes = new HashSet<>();
-        boolean requireRepartitioning = false;
-
-        for (int i = 0; i < streams.length; i++) {
-            KStreamImpl<K, V> stream = (KStreamImpl<K, V>) streams[i];
 
-            parentNames[i] = stream.name;
-            requireRepartitioning |= stream.repartitionRequired;
-            allSourceNodes.addAll(stream.sourceNodes);
-        }
+        boolean requireRepartitioning = streamImpl.repartitionRequired || repartitionRequired;
+        allSourceNodes.addAll(sourceNodes);
+        allSourceNodes.addAll(streamImpl.sourceNodes);
 
         builder.internalTopologyBuilder.addProcessor(name, new KStreamPassThrough<>(), parentNames);
 

http://git-wip-us.apache.org/repos/asf/kafka/blob/b8be86b8/streams/src/test/java/org/apache/kafka/streams/StreamsBuilderTest.java
----------------------------------------------------------------------
diff --git a/streams/src/test/java/org/apache/kafka/streams/StreamsBuilderTest.java b/streams/src/test/java/org/apache/kafka/streams/StreamsBuilderTest.java
index 4ce202b..33ede93 100644
--- a/streams/src/test/java/org/apache/kafka/streams/StreamsBuilderTest.java
+++ b/streams/src/test/java/org/apache/kafka/streams/StreamsBuilderTest.java
@@ -94,7 +94,7 @@ public class StreamsBuilderTest {
         assertEquals(Utils.mkList("A:aa"), sourceProcessorSupplier.processed);
         assertEquals(Utils.mkList("A:aa"), throughProcessorSupplier.processed);
     }
-
+    
     @Test
     public void testMerge() {
         final String topic1 = "topic-1";
@@ -102,7 +102,7 @@ public class StreamsBuilderTest {
 
         final KStream<String, String> source1 = builder.stream(topic1);
         final KStream<String, String> source2 = builder.stream(topic2);
-        final KStream<String, String> merged = builder.merge(source1, source2);
+        final KStream<String, String> merged = source1.merge(source2);
 
         final MockProcessorSupplier<String, String> processorSupplier = new MockProcessorSupplier<>();
         merged.process(processorSupplier);
@@ -160,7 +160,7 @@ public class StreamsBuilderTest {
         assertThat(store.get(1L), equalTo("value1"));
         assertThat(store.get(2L), equalTo("value2"));
     }
-
+    
     @Test(expected = TopologyException.class)
     public void shouldThrowExceptionWhenNoTopicPresent() throws Exception {
         builder.stream(Collections.<String>emptyList());

http://git-wip-us.apache.org/repos/asf/kafka/blob/b8be86b8/streams/src/test/java/org/apache/kafka/streams/kstream/KStreamBuilderTest.java
----------------------------------------------------------------------
diff --git a/streams/src/test/java/org/apache/kafka/streams/kstream/KStreamBuilderTest.java b/streams/src/test/java/org/apache/kafka/streams/kstream/KStreamBuilderTest.java
index 6a6d2a4..c0bfa99 100644
--- a/streams/src/test/java/org/apache/kafka/streams/kstream/KStreamBuilderTest.java
+++ b/streams/src/test/java/org/apache/kafka/streams/kstream/KStreamBuilderTest.java
@@ -185,7 +185,7 @@ public class KStreamBuilderTest {
             }
         });
 
-        final KStream<String, String> merged = builder.merge(processedSource1, processedSource2, source3);
+        final KStream<String, String> merged = processedSource1.merge(processedSource2).merge(source3);
         merged.groupByKey().count("my-table");
         final Map<String, List<String>> actual = builder.stateStoreNameToSourceTopics();
         assertEquals(Utils.mkList("topic-1", "topic-2", "topic-3"), actual.get("my-table"));

http://git-wip-us.apache.org/repos/asf/kafka/blob/b8be86b8/streams/src/test/java/org/apache/kafka/streams/kstream/internals/InternalStreamsBuilderTest.java
----------------------------------------------------------------------
diff --git a/streams/src/test/java/org/apache/kafka/streams/kstream/internals/InternalStreamsBuilderTest.java b/streams/src/test/java/org/apache/kafka/streams/kstream/internals/InternalStreamsBuilderTest.java
index 494e197..68d0e24 100644
--- a/streams/src/test/java/org/apache/kafka/streams/kstream/internals/InternalStreamsBuilderTest.java
+++ b/streams/src/test/java/org/apache/kafka/streams/kstream/internals/InternalStreamsBuilderTest.java
@@ -130,7 +130,7 @@ public class InternalStreamsBuilderTest {
             }
         });
 
-        final KStream<String, String> merged = builder.merge(processedSource1, processedSource2, source3);
+        final KStream<String, String> merged = processedSource1.merge(processedSource2).merge(source3);
         merged.groupByKey().count("my-table");
         final Map<String, List<String>> actual = builder.internalTopologyBuilder.stateStoreNameToSourceTopics();
         assertEquals(Utils.mkList("topic-1", "topic-2", "topic-3"), actual.get("my-table"));

http://git-wip-us.apache.org/repos/asf/kafka/blob/b8be86b8/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KStreamImplTest.java
----------------------------------------------------------------------
diff --git a/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KStreamImplTest.java b/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KStreamImplTest.java
index be1d865..0a0232c 100644
--- a/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KStreamImplTest.java
+++ b/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KStreamImplTest.java
@@ -18,6 +18,7 @@ package org.apache.kafka.streams.kstream.internals;
 
 import org.apache.kafka.common.serialization.Serde;
 import org.apache.kafka.common.serialization.Serdes;
+import org.apache.kafka.common.utils.Utils;
 import org.apache.kafka.streams.Consumed;
 import org.apache.kafka.streams.KeyValue;
 import org.apache.kafka.streams.StreamsBuilder;
@@ -468,5 +469,59 @@ public class KStreamImplTest {
     public void shouldThrowNullPointerOnOuterJoinJoinedIsNull() {
         testStream.outerJoin(testStream, MockValueJoiner.TOSTRING_JOINER, JoinWindows.of(10), null);
     }
+    
+    @Test
+    public void shouldMergeTwoStreams() {
+        final String topic1 = "topic-1";
+        final String topic2 = "topic-2";
+
+        final KStream<String, String> source1 = builder.stream(topic1);
+        final KStream<String, String> source2 = builder.stream(topic2);
+        final KStream<String, String> merged = source1.merge(source2);
+
+        final MockProcessorSupplier<String, String> processorSupplier = new MockProcessorSupplier<>();
+        merged.process(processorSupplier);
+
+        driver.setUp(builder);
+        driver.setTime(0L);
+
+        driver.process(topic1, "A", "aa");
+        driver.process(topic2, "B", "bb");
+        driver.process(topic2, "C", "cc");
+        driver.process(topic1, "D", "dd");
+
+        assertEquals(Utils.mkList("A:aa", "B:bb", "C:cc", "D:dd"), processorSupplier.processed);
+    }
+    
+    @Test
+    public void shouldMergeMultipleStreams() {
+        final String topic1 = "topic-1";
+        final String topic2 = "topic-2";
+        final String topic3 = "topic-3";
+        final String topic4 = "topic-4";
+
+        final KStream<String, String> source1 = builder.stream(topic1);
+        final KStream<String, String> source2 = builder.stream(topic2);
+        final KStream<String, String> source3 = builder.stream(topic3);
+        final KStream<String, String> source4 = builder.stream(topic4);
+        final KStream<String, String> merged = source1.merge(source2).merge(source3).merge(source4);
+
+        final MockProcessorSupplier<String, String> processorSupplier = new MockProcessorSupplier<>();
+        merged.process(processorSupplier);
 
+        driver.setUp(builder);
+        driver.setTime(0L);
+
+        driver.process(topic1, "A", "aa");
+        driver.process(topic2, "B", "bb");
+        driver.process(topic3, "C", "cc");
+        driver.process(topic4, "D", "dd");
+        driver.process(topic4, "E", "ee");
+        driver.process(topic3, "F", "ff");
+        driver.process(topic2, "G", "gg");
+        driver.process(topic1, "H", "hh");
+
+        assertEquals(Utils.mkList("A:aa", "B:bb", "C:cc", "D:dd", "E:ee", "F:ff", "G:gg", "H:hh"),
+                     processorSupplier.processed);
+    }
 }

http://git-wip-us.apache.org/repos/asf/kafka/blob/b8be86b8/streams/src/test/java/org/apache/kafka/streams/processor/internals/StreamsMetadataStateTest.java
----------------------------------------------------------------------
diff --git a/streams/src/test/java/org/apache/kafka/streams/processor/internals/StreamsMetadataStateTest.java b/streams/src/test/java/org/apache/kafka/streams/processor/internals/StreamsMetadataStateTest.java
index f3dbb32..8e5d90d 100644
--- a/streams/src/test/java/org/apache/kafka/streams/processor/internals/StreamsMetadataStateTest.java
+++ b/streams/src/test/java/org/apache/kafka/streams/processor/internals/StreamsMetadataStateTest.java
@@ -83,7 +83,7 @@ public class StreamsMetadataStateTest {
                 .groupByKey()
                 .count("table-three");
 
-        builder.merge(one, two).groupByKey().count("merged-table");
+        one.merge(two).groupByKey().count("merged-table");
 
         builder.stream("topic-four").mapValues(new ValueMapper<Object, Object>() {
             @Override