You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@kafka.apache.org by gu...@apache.org on 2016/04/07 19:20:21 UTC
kafka git commit: KAFKA-3497: Streams ProcessorContext should support
forward() based on child name
Repository: kafka
Updated Branches:
refs/heads/trunk 99d232922 -> 8dbd688b1
KAFKA-3497: Streams ProcessorContext should support forward() based on child name
Author: Eno Thereska <en...@gmail.com>
Reviewers: Yuto Kawamura, Michael G. Noll, Guozhang Wang
Closes #1194 from enothereska/kafka-3497-forward
Project: http://git-wip-us.apache.org/repos/asf/kafka/repo
Commit: http://git-wip-us.apache.org/repos/asf/kafka/commit/8dbd688b
Tree: http://git-wip-us.apache.org/repos/asf/kafka/tree/8dbd688b
Diff: http://git-wip-us.apache.org/repos/asf/kafka/diff/8dbd688b
Branch: refs/heads/trunk
Commit: 8dbd688b1617968329087317fa6bde8b8df0392e
Parents: 99d2329
Author: Eno Thereska <en...@gmail.com>
Authored: Thu Apr 7 10:20:17 2016 -0700
Committer: Guozhang Wang <wa...@gmail.com>
Committed: Thu Apr 7 10:20:17 2016 -0700
----------------------------------------------------------------------
.../streams/processor/ProcessorContext.java | 9 ++++
.../internals/ProcessorContextImpl.java | 5 ++
.../processor/internals/StandbyContextImpl.java | 5 ++
.../streams/processor/internals/StreamTask.java | 16 ++++++
.../internals/ProcessorTopologyTest.java | 56 ++++++++++++++++++++
.../apache/kafka/test/KStreamTestDriver.java | 16 ++++++
.../apache/kafka/test/MockProcessorContext.java | 6 +++
7 files changed, 113 insertions(+)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/kafka/blob/8dbd688b/streams/src/main/java/org/apache/kafka/streams/processor/ProcessorContext.java
----------------------------------------------------------------------
diff --git a/streams/src/main/java/org/apache/kafka/streams/processor/ProcessorContext.java b/streams/src/main/java/org/apache/kafka/streams/processor/ProcessorContext.java
index 434996e..8bac3e3 100644
--- a/streams/src/main/java/org/apache/kafka/streams/processor/ProcessorContext.java
+++ b/streams/src/main/java/org/apache/kafka/streams/processor/ProcessorContext.java
@@ -106,10 +106,19 @@ public interface ProcessorContext {
* Forwards a key/value pair to one of the downstream processors designated by childIndex
* @param key key
* @param value value
+ * @param childIndex index in list of children of this node
*/
<K, V> void forward(K key, V value, int childIndex);
/**
+ * Forwards a key/value pair to one of the downstream processors designated by the downstream processor name
+ * @param key key
+ * @param value value
+ * @param childName name of downstream processor
+ */
+ <K, V> void forward(K key, V value, String childName);
+
+ /**
* Requests a commit
*/
void commit();
http://git-wip-us.apache.org/repos/asf/kafka/blob/8dbd688b/streams/src/main/java/org/apache/kafka/streams/processor/internals/ProcessorContextImpl.java
----------------------------------------------------------------------
diff --git a/streams/src/main/java/org/apache/kafka/streams/processor/internals/ProcessorContextImpl.java b/streams/src/main/java/org/apache/kafka/streams/processor/internals/ProcessorContextImpl.java
index 888b89e..5bda856 100644
--- a/streams/src/main/java/org/apache/kafka/streams/processor/internals/ProcessorContextImpl.java
+++ b/streams/src/main/java/org/apache/kafka/streams/processor/internals/ProcessorContextImpl.java
@@ -168,6 +168,11 @@ public class ProcessorContextImpl implements ProcessorContext, RecordCollector.S
}
@Override
+ public <K, V> void forward(K key, V value, String childName) {
+ task.forward(key, value, childName);
+ }
+
+ @Override
public void commit() {
task.needCommit();
}
http://git-wip-us.apache.org/repos/asf/kafka/blob/8dbd688b/streams/src/main/java/org/apache/kafka/streams/processor/internals/StandbyContextImpl.java
----------------------------------------------------------------------
diff --git a/streams/src/main/java/org/apache/kafka/streams/processor/internals/StandbyContextImpl.java b/streams/src/main/java/org/apache/kafka/streams/processor/internals/StandbyContextImpl.java
index 3ad06e2..d5a9683 100644
--- a/streams/src/main/java/org/apache/kafka/streams/processor/internals/StandbyContextImpl.java
+++ b/streams/src/main/java/org/apache/kafka/streams/processor/internals/StandbyContextImpl.java
@@ -142,6 +142,11 @@ public class StandbyContextImpl implements ProcessorContext, RecordCollector.Sup
}
@Override
+ public <K, V> void forward(K key, V value, String childName) {
+ throw new UnsupportedOperationException();
+ }
+
+ @Override
public void commit() {
throw new UnsupportedOperationException();
}
http://git-wip-us.apache.org/repos/asf/kafka/blob/8dbd688b/streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamTask.java
----------------------------------------------------------------------
diff --git a/streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamTask.java b/streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamTask.java
index 61aeced..a484980 100644
--- a/streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamTask.java
+++ b/streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamTask.java
@@ -342,4 +342,20 @@ public class StreamTask extends AbstractTask implements Punctuator {
}
}
+ @SuppressWarnings("unchecked")
+ public <K, V> void forward(K key, V value, String childName) {
+ ProcessorNode thisNode = currNode;
+ for (ProcessorNode childNode : (List<ProcessorNode<K, V>>) thisNode.children()) {
+ if (childNode.name().equals(childName)) {
+ currNode = childNode;
+ try {
+ childNode.process(key, value);
+ } finally {
+ currNode = thisNode;
+ }
+ break;
+ }
+ }
+ }
+
}
http://git-wip-us.apache.org/repos/asf/kafka/blob/8dbd688b/streams/src/test/java/org/apache/kafka/streams/processor/internals/ProcessorTopologyTest.java
----------------------------------------------------------------------
diff --git a/streams/src/test/java/org/apache/kafka/streams/processor/internals/ProcessorTopologyTest.java b/streams/src/test/java/org/apache/kafka/streams/processor/internals/ProcessorTopologyTest.java
index ef08176..1095fcf 100644
--- a/streams/src/test/java/org/apache/kafka/streams/processor/internals/ProcessorTopologyTest.java
+++ b/streams/src/test/java/org/apache/kafka/streams/processor/internals/ProcessorTopologyTest.java
@@ -157,6 +157,28 @@ public class ProcessorTopologyTest {
}
@Test
+ public void testDrivingMultiplexByNameTopology() {
+ driver = new ProcessorTopologyTestDriver(config, createMultiplexByNameTopology());
+ driver.process(INPUT_TOPIC, "key1", "value1", STRING_SERIALIZER, STRING_SERIALIZER);
+ assertNextOutputRecord(OUTPUT_TOPIC_1, "key1", "value1(1)");
+ assertNextOutputRecord(OUTPUT_TOPIC_2, "key1", "value1(2)");
+
+ driver.process(INPUT_TOPIC, "key2", "value2", STRING_SERIALIZER, STRING_SERIALIZER);
+ assertNextOutputRecord(OUTPUT_TOPIC_1, "key2", "value2(1)");
+ assertNextOutputRecord(OUTPUT_TOPIC_2, "key2", "value2(2)");
+
+ driver.process(INPUT_TOPIC, "key3", "value3", STRING_SERIALIZER, STRING_SERIALIZER);
+ driver.process(INPUT_TOPIC, "key4", "value4", STRING_SERIALIZER, STRING_SERIALIZER);
+ driver.process(INPUT_TOPIC, "key5", "value5", STRING_SERIALIZER, STRING_SERIALIZER);
+ assertNextOutputRecord(OUTPUT_TOPIC_1, "key3", "value3(1)");
+ assertNextOutputRecord(OUTPUT_TOPIC_1, "key4", "value4(1)");
+ assertNextOutputRecord(OUTPUT_TOPIC_1, "key5", "value5(1)");
+ assertNextOutputRecord(OUTPUT_TOPIC_2, "key3", "value3(2)");
+ assertNextOutputRecord(OUTPUT_TOPIC_2, "key4", "value4(2)");
+ assertNextOutputRecord(OUTPUT_TOPIC_2, "key5", "value5(2)");
+ }
+
+ @Test
public void testDrivingStatefulTopology() {
String storeName = "entries";
driver = new ProcessorTopologyTestDriver(config, createStatefulTopology(storeName), storeName);
@@ -215,6 +237,13 @@ public class ProcessorTopologyTest {
.addSink("sink2", OUTPUT_TOPIC_2, "processor");
}
+ protected TopologyBuilder createMultiplexByNameTopology() {
+ return new TopologyBuilder().addSource("source", STRING_DESERIALIZER, STRING_DESERIALIZER, INPUT_TOPIC)
+ .addProcessor("processor", define(new MultiplexByNameProcessor(2)), "source")
+ .addSink("sink0", OUTPUT_TOPIC_1, "processor")
+ .addSink("sink1", OUTPUT_TOPIC_2, "processor");
+ }
+
protected TopologyBuilder createStatefulTopology(String storeName) {
return new TopologyBuilder().addSource("source", STRING_DESERIALIZER, STRING_DESERIALIZER, INPUT_TOPIC)
.addProcessor("processor", define(new StatefulProcessor(storeName)), "source")
@@ -268,6 +297,33 @@ public class ProcessorTopologyTest {
}
/**
+ * A processor that forwards slightly-modified messages to each named child.
+ * Note: the children are assumed to be named "sink{child number}", e.g., sink1, or sink2, etc.
+ */
+ protected static class MultiplexByNameProcessor extends AbstractProcessor<String, String> {
+
+ private final int numChildren;
+
+ public MultiplexByNameProcessor(int numChildren) {
+ this.numChildren = numChildren;
+ }
+
+ @Override
+ public void process(String key, String value) {
+ for (int i = 0; i != numChildren; ++i) {
+ context().forward(key, value + "(" + (i + 1) + ")", "sink" + i);
+ }
+ }
+
+ @Override
+ public void punctuate(long streamTime) {
+ for (int i = 0; i != numChildren; ++i) {
+ context().forward(Long.toString(streamTime), "punctuate(" + (i + 1) + ")", "sink" + i);
+ }
+ }
+ }
+
+ /**
* A processor that stores each key-value pair in an in-memory key-value store registered with the context. When
* {@link #punctuate(long)} is called, it outputs the total number of entries in the store.
*/
http://git-wip-us.apache.org/repos/asf/kafka/blob/8dbd688b/streams/src/test/java/org/apache/kafka/test/KStreamTestDriver.java
----------------------------------------------------------------------
diff --git a/streams/src/test/java/org/apache/kafka/test/KStreamTestDriver.java b/streams/src/test/java/org/apache/kafka/test/KStreamTestDriver.java
index 0c56c26..5cfee6b 100644
--- a/streams/src/test/java/org/apache/kafka/test/KStreamTestDriver.java
+++ b/streams/src/test/java/org/apache/kafka/test/KStreamTestDriver.java
@@ -120,6 +120,22 @@ public class KStreamTestDriver {
}
}
+ @SuppressWarnings("unchecked")
+ public <K, V> void forward(K key, V value, String childName) {
+ ProcessorNode thisNode = currNode;
+ for (ProcessorNode childNode : (List<ProcessorNode<K, V>>) thisNode.children()) {
+ if (childNode.name().equals(childName)) {
+ currNode = childNode;
+ try {
+ childNode.process(key, value);
+ } finally {
+ currNode = thisNode;
+ }
+ break;
+ }
+ }
+ }
+
public Map<String, StateStore> allStateStores() {
return context.allStateStores();
}
http://git-wip-us.apache.org/repos/asf/kafka/blob/8dbd688b/streams/src/test/java/org/apache/kafka/test/MockProcessorContext.java
----------------------------------------------------------------------
diff --git a/streams/src/test/java/org/apache/kafka/test/MockProcessorContext.java b/streams/src/test/java/org/apache/kafka/test/MockProcessorContext.java
index e57e1c7..d3b8081 100644
--- a/streams/src/test/java/org/apache/kafka/test/MockProcessorContext.java
+++ b/streams/src/test/java/org/apache/kafka/test/MockProcessorContext.java
@@ -159,6 +159,12 @@ public class MockProcessorContext implements ProcessorContext, RecordCollector.S
}
@Override
+ @SuppressWarnings("unchecked")
+ public <K, V> void forward(K key, V value, String childName) {
+ driver.forward(key, value, childName);
+ }
+
+ @Override
public void commit() {
throw new UnsupportedOperationException("commit() not supported.");
}