You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@kafka.apache.org by gu...@apache.org on 2016/04/07 19:20:21 UTC

kafka git commit: KAFKA-3497: Streams ProcessorContext should support forward() based on child name

Repository: kafka
Updated Branches:
  refs/heads/trunk 99d232922 -> 8dbd688b1


KAFKA-3497: Streams ProcessorContext should support forward() based on child name

Author: Eno Thereska <en...@gmail.com>

Reviewers: Yuto Kawamura, Michael G. Noll, Guozhang Wang

Closes #1194 from enothereska/kafka-3497-forward


Project: http://git-wip-us.apache.org/repos/asf/kafka/repo
Commit: http://git-wip-us.apache.org/repos/asf/kafka/commit/8dbd688b
Tree: http://git-wip-us.apache.org/repos/asf/kafka/tree/8dbd688b
Diff: http://git-wip-us.apache.org/repos/asf/kafka/diff/8dbd688b

Branch: refs/heads/trunk
Commit: 8dbd688b1617968329087317fa6bde8b8df0392e
Parents: 99d2329
Author: Eno Thereska <en...@gmail.com>
Authored: Thu Apr 7 10:20:17 2016 -0700
Committer: Guozhang Wang <wa...@gmail.com>
Committed: Thu Apr 7 10:20:17 2016 -0700

----------------------------------------------------------------------
 .../streams/processor/ProcessorContext.java     |  9 ++++
 .../internals/ProcessorContextImpl.java         |  5 ++
 .../processor/internals/StandbyContextImpl.java |  5 ++
 .../streams/processor/internals/StreamTask.java | 16 ++++++
 .../internals/ProcessorTopologyTest.java        | 56 ++++++++++++++++++++
 .../apache/kafka/test/KStreamTestDriver.java    | 16 ++++++
 .../apache/kafka/test/MockProcessorContext.java |  6 +++
 7 files changed, 113 insertions(+)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/kafka/blob/8dbd688b/streams/src/main/java/org/apache/kafka/streams/processor/ProcessorContext.java
----------------------------------------------------------------------
diff --git a/streams/src/main/java/org/apache/kafka/streams/processor/ProcessorContext.java b/streams/src/main/java/org/apache/kafka/streams/processor/ProcessorContext.java
index 434996e..8bac3e3 100644
--- a/streams/src/main/java/org/apache/kafka/streams/processor/ProcessorContext.java
+++ b/streams/src/main/java/org/apache/kafka/streams/processor/ProcessorContext.java
@@ -106,10 +106,19 @@ public interface ProcessorContext {
      * Forwards a key/value pair to one of the downstream processors designated by childIndex
      * @param key key
      * @param value value
+     * @param childIndex index in list of children of this node
      */
     <K, V> void forward(K key, V value, int childIndex);
 
     /**
+     * Forwards a key/value pair to one of the downstream processors designated by the downstream processor name
+     * @param key key
+     * @param value value
+     * @param childName name of downstream processor
+     */
+    <K, V> void forward(K key, V value, String childName);
+
+    /**
      * Requests a commit
      */
     void commit();

http://git-wip-us.apache.org/repos/asf/kafka/blob/8dbd688b/streams/src/main/java/org/apache/kafka/streams/processor/internals/ProcessorContextImpl.java
----------------------------------------------------------------------
diff --git a/streams/src/main/java/org/apache/kafka/streams/processor/internals/ProcessorContextImpl.java b/streams/src/main/java/org/apache/kafka/streams/processor/internals/ProcessorContextImpl.java
index 888b89e..5bda856 100644
--- a/streams/src/main/java/org/apache/kafka/streams/processor/internals/ProcessorContextImpl.java
+++ b/streams/src/main/java/org/apache/kafka/streams/processor/internals/ProcessorContextImpl.java
@@ -168,6 +168,11 @@ public class ProcessorContextImpl implements ProcessorContext, RecordCollector.S
     }
 
     @Override
+    public <K, V> void forward(K key, V value, String childName) {
+        task.forward(key, value, childName);
+    }
+
+    @Override
     public void commit() {
         task.needCommit();
     }

http://git-wip-us.apache.org/repos/asf/kafka/blob/8dbd688b/streams/src/main/java/org/apache/kafka/streams/processor/internals/StandbyContextImpl.java
----------------------------------------------------------------------
diff --git a/streams/src/main/java/org/apache/kafka/streams/processor/internals/StandbyContextImpl.java b/streams/src/main/java/org/apache/kafka/streams/processor/internals/StandbyContextImpl.java
index 3ad06e2..d5a9683 100644
--- a/streams/src/main/java/org/apache/kafka/streams/processor/internals/StandbyContextImpl.java
+++ b/streams/src/main/java/org/apache/kafka/streams/processor/internals/StandbyContextImpl.java
@@ -142,6 +142,11 @@ public class StandbyContextImpl implements ProcessorContext, RecordCollector.Sup
     }
 
     @Override
+    public <K, V> void forward(K key, V value, String childName) {
+        throw new UnsupportedOperationException();
+    }
+
+    @Override
     public void commit() {
         throw new UnsupportedOperationException();
     }

http://git-wip-us.apache.org/repos/asf/kafka/blob/8dbd688b/streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamTask.java
----------------------------------------------------------------------
diff --git a/streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamTask.java b/streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamTask.java
index 61aeced..a484980 100644
--- a/streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamTask.java
+++ b/streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamTask.java
@@ -342,4 +342,20 @@ public class StreamTask extends AbstractTask implements Punctuator {
         }
     }
 
+    @SuppressWarnings("unchecked")
+    public <K, V> void forward(K key, V value, String childName) {
+        ProcessorNode thisNode = currNode;
+        for (ProcessorNode childNode : (List<ProcessorNode<K, V>>) thisNode.children()) {
+            if (childNode.name().equals(childName)) {
+                currNode = childNode;
+                try {
+                    childNode.process(key, value);
+                } finally {
+                    currNode = thisNode;
+                }
+                break;
+            }
+        }
+    }
+
 }

http://git-wip-us.apache.org/repos/asf/kafka/blob/8dbd688b/streams/src/test/java/org/apache/kafka/streams/processor/internals/ProcessorTopologyTest.java
----------------------------------------------------------------------
diff --git a/streams/src/test/java/org/apache/kafka/streams/processor/internals/ProcessorTopologyTest.java b/streams/src/test/java/org/apache/kafka/streams/processor/internals/ProcessorTopologyTest.java
index ef08176..1095fcf 100644
--- a/streams/src/test/java/org/apache/kafka/streams/processor/internals/ProcessorTopologyTest.java
+++ b/streams/src/test/java/org/apache/kafka/streams/processor/internals/ProcessorTopologyTest.java
@@ -157,6 +157,28 @@ public class ProcessorTopologyTest {
     }
 
     @Test
+    public void testDrivingMultiplexByNameTopology() {
+        driver = new ProcessorTopologyTestDriver(config, createMultiplexByNameTopology());
+        driver.process(INPUT_TOPIC, "key1", "value1", STRING_SERIALIZER, STRING_SERIALIZER);
+        assertNextOutputRecord(OUTPUT_TOPIC_1, "key1", "value1(1)");
+        assertNextOutputRecord(OUTPUT_TOPIC_2, "key1", "value1(2)");
+
+        driver.process(INPUT_TOPIC, "key2", "value2", STRING_SERIALIZER, STRING_SERIALIZER);
+        assertNextOutputRecord(OUTPUT_TOPIC_1, "key2", "value2(1)");
+        assertNextOutputRecord(OUTPUT_TOPIC_2, "key2", "value2(2)");
+
+        driver.process(INPUT_TOPIC, "key3", "value3", STRING_SERIALIZER, STRING_SERIALIZER);
+        driver.process(INPUT_TOPIC, "key4", "value4", STRING_SERIALIZER, STRING_SERIALIZER);
+        driver.process(INPUT_TOPIC, "key5", "value5", STRING_SERIALIZER, STRING_SERIALIZER);
+        assertNextOutputRecord(OUTPUT_TOPIC_1, "key3", "value3(1)");
+        assertNextOutputRecord(OUTPUT_TOPIC_1, "key4", "value4(1)");
+        assertNextOutputRecord(OUTPUT_TOPIC_1, "key5", "value5(1)");
+        assertNextOutputRecord(OUTPUT_TOPIC_2, "key3", "value3(2)");
+        assertNextOutputRecord(OUTPUT_TOPIC_2, "key4", "value4(2)");
+        assertNextOutputRecord(OUTPUT_TOPIC_2, "key5", "value5(2)");
+    }
+
+    @Test
     public void testDrivingStatefulTopology() {
         String storeName = "entries";
         driver = new ProcessorTopologyTestDriver(config, createStatefulTopology(storeName), storeName);
@@ -215,6 +237,13 @@ public class ProcessorTopologyTest {
                                     .addSink("sink2", OUTPUT_TOPIC_2, "processor");
     }
 
+    protected TopologyBuilder createMultiplexByNameTopology() {
+        return new TopologyBuilder().addSource("source", STRING_DESERIALIZER, STRING_DESERIALIZER, INPUT_TOPIC)
+            .addProcessor("processor", define(new MultiplexByNameProcessor(2)), "source")
+            .addSink("sink0", OUTPUT_TOPIC_1, "processor")
+            .addSink("sink1", OUTPUT_TOPIC_2, "processor");
+    }
+
     protected TopologyBuilder createStatefulTopology(String storeName) {
         return new TopologyBuilder().addSource("source", STRING_DESERIALIZER, STRING_DESERIALIZER, INPUT_TOPIC)
                                     .addProcessor("processor", define(new StatefulProcessor(storeName)), "source")
@@ -268,6 +297,33 @@ public class ProcessorTopologyTest {
     }
 
     /**
+     * A processor that forwards slightly-modified messages to each named child.
+     * Note: the children are assumed to be named "sink{child number}", e.g., sink1, or sink2, etc.
+     */
+    protected static class MultiplexByNameProcessor extends AbstractProcessor<String, String> {
+
+        private final int numChildren;
+
+        public MultiplexByNameProcessor(int numChildren) {
+            this.numChildren = numChildren;
+        }
+
+        @Override
+        public void process(String key, String value) {
+            for (int i = 0; i != numChildren; ++i) {
+                context().forward(key, value + "(" + (i + 1) + ")", "sink" + i);
+            }
+        }
+
+        @Override
+        public void punctuate(long streamTime) {
+            for (int i = 0; i != numChildren; ++i) {
+                context().forward(Long.toString(streamTime), "punctuate(" + (i + 1) + ")", "sink" + i);
+            }
+        }
+    }
+
+    /**
      * A processor that stores each key-value pair in an in-memory key-value store registered with the context. When
      * {@link #punctuate(long)} is called, it outputs the total number of entries in the store.
      */

http://git-wip-us.apache.org/repos/asf/kafka/blob/8dbd688b/streams/src/test/java/org/apache/kafka/test/KStreamTestDriver.java
----------------------------------------------------------------------
diff --git a/streams/src/test/java/org/apache/kafka/test/KStreamTestDriver.java b/streams/src/test/java/org/apache/kafka/test/KStreamTestDriver.java
index 0c56c26..5cfee6b 100644
--- a/streams/src/test/java/org/apache/kafka/test/KStreamTestDriver.java
+++ b/streams/src/test/java/org/apache/kafka/test/KStreamTestDriver.java
@@ -120,6 +120,22 @@ public class KStreamTestDriver {
         }
     }
 
+    @SuppressWarnings("unchecked")
+    public <K, V> void forward(K key, V value, String childName) {
+        ProcessorNode thisNode = currNode;
+        for (ProcessorNode childNode : (List<ProcessorNode<K, V>>) thisNode.children()) {
+            if (childNode.name().equals(childName)) {
+                currNode = childNode;
+                try {
+                    childNode.process(key, value);
+                } finally {
+                    currNode = thisNode;
+                }
+                break;
+            }
+        }
+    }
+
     public Map<String, StateStore> allStateStores() {
         return context.allStateStores();
     }

http://git-wip-us.apache.org/repos/asf/kafka/blob/8dbd688b/streams/src/test/java/org/apache/kafka/test/MockProcessorContext.java
----------------------------------------------------------------------
diff --git a/streams/src/test/java/org/apache/kafka/test/MockProcessorContext.java b/streams/src/test/java/org/apache/kafka/test/MockProcessorContext.java
index e57e1c7..d3b8081 100644
--- a/streams/src/test/java/org/apache/kafka/test/MockProcessorContext.java
+++ b/streams/src/test/java/org/apache/kafka/test/MockProcessorContext.java
@@ -159,6 +159,12 @@ public class MockProcessorContext implements ProcessorContext, RecordCollector.S
     }
 
     @Override
+    @SuppressWarnings("unchecked")
+    public <K, V> void forward(K key, V value, String childName) {
+        driver.forward(key, value, childName);
+    }
+
+    @Override
     public void commit() {
         throw new UnsupportedOperationException("commit() not supported.");
     }