You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@kafka.apache.org by gu...@apache.org on 2015/11/22 03:46:18 UTC

kafka git commit: KAFKA-2872: unite sink nodes with parent nodes in addSink

Repository: kafka
Updated Branches:
  refs/heads/trunk 3e0333d69 -> 84c8d2bb8


KAFKA-2872: unite sink nodes with parent nodes in addSink

Starting a KafkaStream was getting an error due to the fact that the TopologyBuilder.addSink method was not connecting the sink with it parent(s) processor/sources.  Just needed to wire up the sink with it parent(s) in TopologyBuilder.addSink .

Author: bbejeck <bb...@gmail.com>

Reviewers: Guozhang Wang

Closes #572 from bbejeck/KAFKA-2872_kafka_stream_sink_not_connected_to_parent


Project: http://git-wip-us.apache.org/repos/asf/kafka/repo
Commit: http://git-wip-us.apache.org/repos/asf/kafka/commit/84c8d2bb
Tree: http://git-wip-us.apache.org/repos/asf/kafka/tree/84c8d2bb
Diff: http://git-wip-us.apache.org/repos/asf/kafka/diff/84c8d2bb

Branch: refs/heads/trunk
Commit: 84c8d2bb86dc2794a3d6a86ae28b3cb51cea5c4b
Parents: 3e0333d
Author: Bill Bejeck <bb...@gmail.com>
Authored: Sat Nov 21 18:46:15 2015 -0800
Committer: wangguoz@gmail.com <gu...@Guozhang-Macbook.local>
Committed: Sat Nov 21 18:46:15 2015 -0800

----------------------------------------------------------------------
 .../streams/processor/TopologyBuilder.java      |  2 +
 .../streams/processor/TopologyBuilderTest.java  | 42 +++++++++++++++++---
 2 files changed, 39 insertions(+), 5 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/kafka/blob/84c8d2bb/streams/src/main/java/org/apache/kafka/streams/processor/TopologyBuilder.java
----------------------------------------------------------------------
diff --git a/streams/src/main/java/org/apache/kafka/streams/processor/TopologyBuilder.java b/streams/src/main/java/org/apache/kafka/streams/processor/TopologyBuilder.java
index 893f7de..021a47f 100644
--- a/streams/src/main/java/org/apache/kafka/streams/processor/TopologyBuilder.java
+++ b/streams/src/main/java/org/apache/kafka/streams/processor/TopologyBuilder.java
@@ -230,6 +230,8 @@ public class TopologyBuilder {
         }
 
         nodeFactories.put(name, new SinkNodeFactory(name, parentNames, topic, keySerializer, valSerializer));
+        nodeGrouper.add(name);
+        nodeGrouper.unite(name, parentNames);
         return this;
     }
 

http://git-wip-us.apache.org/repos/asf/kafka/blob/84c8d2bb/streams/src/test/java/org/apache/kafka/streams/processor/TopologyBuilderTest.java
----------------------------------------------------------------------
diff --git a/streams/src/test/java/org/apache/kafka/streams/processor/TopologyBuilderTest.java b/streams/src/test/java/org/apache/kafka/streams/processor/TopologyBuilderTest.java
index b1b71b6..f6924ad 100644
--- a/streams/src/test/java/org/apache/kafka/streams/processor/TopologyBuilderTest.java
+++ b/streams/src/test/java/org/apache/kafka/streams/processor/TopologyBuilderTest.java
@@ -17,11 +17,6 @@
 
 package org.apache.kafka.streams.processor;
 
-import static org.junit.Assert.assertEquals;
-
-import static org.apache.kafka.common.utils.Utils.mkSet;
-import static org.apache.kafka.common.utils.Utils.mkList;
-
 import org.apache.kafka.streams.processor.internals.ProcessorNode;
 import org.apache.kafka.streams.processor.internals.ProcessorTopology;
 import org.apache.kafka.test.MockProcessorSupplier;
@@ -35,6 +30,11 @@ import java.util.List;
 import java.util.Map;
 import java.util.Set;
 
+import static org.apache.kafka.common.utils.Utils.mkList;
+import static org.apache.kafka.common.utils.Utils.mkSet;
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertTrue;
+
 public class TopologyBuilderTest {
 
     @Test(expected = TopologyException.class)
@@ -100,6 +100,38 @@ public class TopologyBuilderTest {
     }
 
     @Test
+    public void testAddSinkConnectedWithParent() {
+        final TopologyBuilder builder = new TopologyBuilder();
+
+        builder.addSource("source", "source-topic");
+        builder.addSink("sink", "dest-topic", "source");
+
+        Map<Integer, Set<String>> nodeGroups = builder.nodeGroups();
+        Set<String> nodeGroup = nodeGroups.get(0);
+
+        assertTrue(nodeGroup.contains("sink"));
+        assertTrue(nodeGroup.contains("source"));
+
+    }
+
+    @Test
+    public void testAddSinkConnectedWithMultipleParent() {
+        final TopologyBuilder builder = new TopologyBuilder();
+
+        builder.addSource("source", "source-topic");
+        builder.addSource("sourceII", "source-topicII");
+        builder.addSink("sink", "dest-topic", "source", "sourceII");
+
+        Map<Integer, Set<String>> nodeGroups = builder.nodeGroups();
+        Set<String> nodeGroup = nodeGroups.get(0);
+
+        assertTrue(nodeGroup.contains("sink"));
+        assertTrue(nodeGroup.contains("source"));
+        assertTrue(nodeGroup.contains("sourceII"));
+
+    }
+
+    @Test
     public void testSourceTopics() {
         final TopologyBuilder builder = new TopologyBuilder();