You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@kafka.apache.org by gu...@apache.org on 2015/11/22 03:46:18 UTC
kafka git commit: KAFKA-2872: unite sink nodes with parent nodes in
addSink
Repository: kafka
Updated Branches:
refs/heads/trunk 3e0333d69 -> 84c8d2bb8
KAFKA-2872: unite sink nodes with parent nodes in addSink
Starting a KafkaStream was getting an error due to the fact that the TopologyBuilder.addSink method was not connecting the sink with it parent(s) processor/sources. Just needed to wire up the sink with it parent(s) in TopologyBuilder.addSink .
Author: bbejeck <bb...@gmail.com>
Reviewers: Guozhang Wang
Closes #572 from bbejeck/KAFKA-2872_kafka_stream_sink_not_connected_to_parent
Project: http://git-wip-us.apache.org/repos/asf/kafka/repo
Commit: http://git-wip-us.apache.org/repos/asf/kafka/commit/84c8d2bb
Tree: http://git-wip-us.apache.org/repos/asf/kafka/tree/84c8d2bb
Diff: http://git-wip-us.apache.org/repos/asf/kafka/diff/84c8d2bb
Branch: refs/heads/trunk
Commit: 84c8d2bb86dc2794a3d6a86ae28b3cb51cea5c4b
Parents: 3e0333d
Author: Bill Bejeck <bb...@gmail.com>
Authored: Sat Nov 21 18:46:15 2015 -0800
Committer: wangguoz@gmail.com <gu...@Guozhang-Macbook.local>
Committed: Sat Nov 21 18:46:15 2015 -0800
----------------------------------------------------------------------
.../streams/processor/TopologyBuilder.java | 2 +
.../streams/processor/TopologyBuilderTest.java | 42 +++++++++++++++++---
2 files changed, 39 insertions(+), 5 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/kafka/blob/84c8d2bb/streams/src/main/java/org/apache/kafka/streams/processor/TopologyBuilder.java
----------------------------------------------------------------------
diff --git a/streams/src/main/java/org/apache/kafka/streams/processor/TopologyBuilder.java b/streams/src/main/java/org/apache/kafka/streams/processor/TopologyBuilder.java
index 893f7de..021a47f 100644
--- a/streams/src/main/java/org/apache/kafka/streams/processor/TopologyBuilder.java
+++ b/streams/src/main/java/org/apache/kafka/streams/processor/TopologyBuilder.java
@@ -230,6 +230,8 @@ public class TopologyBuilder {
}
nodeFactories.put(name, new SinkNodeFactory(name, parentNames, topic, keySerializer, valSerializer));
+ nodeGrouper.add(name);
+ nodeGrouper.unite(name, parentNames);
return this;
}
http://git-wip-us.apache.org/repos/asf/kafka/blob/84c8d2bb/streams/src/test/java/org/apache/kafka/streams/processor/TopologyBuilderTest.java
----------------------------------------------------------------------
diff --git a/streams/src/test/java/org/apache/kafka/streams/processor/TopologyBuilderTest.java b/streams/src/test/java/org/apache/kafka/streams/processor/TopologyBuilderTest.java
index b1b71b6..f6924ad 100644
--- a/streams/src/test/java/org/apache/kafka/streams/processor/TopologyBuilderTest.java
+++ b/streams/src/test/java/org/apache/kafka/streams/processor/TopologyBuilderTest.java
@@ -17,11 +17,6 @@
package org.apache.kafka.streams.processor;
-import static org.junit.Assert.assertEquals;
-
-import static org.apache.kafka.common.utils.Utils.mkSet;
-import static org.apache.kafka.common.utils.Utils.mkList;
-
import org.apache.kafka.streams.processor.internals.ProcessorNode;
import org.apache.kafka.streams.processor.internals.ProcessorTopology;
import org.apache.kafka.test.MockProcessorSupplier;
@@ -35,6 +30,11 @@ import java.util.List;
import java.util.Map;
import java.util.Set;
+import static org.apache.kafka.common.utils.Utils.mkList;
+import static org.apache.kafka.common.utils.Utils.mkSet;
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertTrue;
+
public class TopologyBuilderTest {
@Test(expected = TopologyException.class)
@@ -100,6 +100,38 @@ public class TopologyBuilderTest {
}
@Test
+ public void testAddSinkConnectedWithParent() {
+ final TopologyBuilder builder = new TopologyBuilder();
+
+ builder.addSource("source", "source-topic");
+ builder.addSink("sink", "dest-topic", "source");
+
+ Map<Integer, Set<String>> nodeGroups = builder.nodeGroups();
+ Set<String> nodeGroup = nodeGroups.get(0);
+
+ assertTrue(nodeGroup.contains("sink"));
+ assertTrue(nodeGroup.contains("source"));
+
+ }
+
+ @Test
+ public void testAddSinkConnectedWithMultipleParent() {
+ final TopologyBuilder builder = new TopologyBuilder();
+
+ builder.addSource("source", "source-topic");
+ builder.addSource("sourceII", "source-topicII");
+ builder.addSink("sink", "dest-topic", "source", "sourceII");
+
+ Map<Integer, Set<String>> nodeGroups = builder.nodeGroups();
+ Set<String> nodeGroup = nodeGroups.get(0);
+
+ assertTrue(nodeGroup.contains("sink"));
+ assertTrue(nodeGroup.contains("source"));
+ assertTrue(nodeGroup.contains("sourceII"));
+
+ }
+
+ @Test
public void testSourceTopics() {
final TopologyBuilder builder = new TopologyBuilder();