You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@kafka.apache.org by gu...@apache.org on 2017/05/09 21:31:53 UTC

kafka git commit: KAFKA-4927: Fix KStreamsTestDriver to not throw NPE when KStream.to() sinks are used

Repository: kafka
Updated Branches:
  refs/heads/trunk 8ace736f7 -> b982eefd3


KAFKA-4927: Fix KStreamsTestDriver to not throw NPE when KStream.to() sinks are used

a KStream.to() sink is also a topic
... so the KStreamTestDriver to fetch it when required

Author: Wim Van Leuven <wi...@bigboards.io>
Author: Wim Van Leuven <wi...@highestpoint.biz>

Reviewers: Eno Thereska, Matthias J. Sax, Guozhang Wang

Closes #2716 from wimvanleuven/KAFKA-4927


Project: http://git-wip-us.apache.org/repos/asf/kafka/repo
Commit: http://git-wip-us.apache.org/repos/asf/kafka/commit/b982eefd
Tree: http://git-wip-us.apache.org/repos/asf/kafka/tree/b982eefd
Diff: http://git-wip-us.apache.org/repos/asf/kafka/diff/b982eefd

Branch: refs/heads/trunk
Commit: b982eefd37595b30600b7a979915db5da22271fe
Parents: 8ace736
Author: Wim Van Leuven <wi...@bigboards.io>
Authored: Tue May 9 14:31:50 2017 -0700
Committer: Guozhang Wang <wa...@gmail.com>
Committed: Tue May 9 14:31:50 2017 -0700

----------------------------------------------------------------------
 .../streams/kstream/KStreamBuilderTest.java     | 53 +++++++++++++++++---
 .../apache/kafka/test/KStreamTestDriver.java    | 45 ++++++++++-------
 2 files changed, 72 insertions(+), 26 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/kafka/blob/b982eefd/streams/src/test/java/org/apache/kafka/streams/kstream/KStreamBuilderTest.java
----------------------------------------------------------------------
diff --git a/streams/src/test/java/org/apache/kafka/streams/kstream/KStreamBuilderTest.java b/streams/src/test/java/org/apache/kafka/streams/kstream/KStreamBuilderTest.java
index 6fc6bd4..e7cb669 100644
--- a/streams/src/test/java/org/apache/kafka/streams/kstream/KStreamBuilderTest.java
+++ b/streams/src/test/java/org/apache/kafka/streams/kstream/KStreamBuilderTest.java
@@ -79,13 +79,52 @@ public class KStreamBuilderTest {
         assertEquals("Y-0000000001", builder.newName("Y-"));
         assertEquals("Z-0000000002", builder.newName("Z-"));
 
-        KStreamBuilder newBuilder = new KStreamBuilder();
+        final KStreamBuilder newBuilder = new KStreamBuilder();
 
         assertEquals("X-0000000000", newBuilder.newName("X-"));
         assertEquals("Y-0000000001", newBuilder.newName("Y-"));
         assertEquals("Z-0000000002", newBuilder.newName("Z-"));
     }
 
+
+    @Test
+    public void shouldNotTryProcessingFromSinkTopic() {
+        final KStream<String, String> source = builder.stream("topic-source");
+        source.to("topic-sink");
+
+        final MockProcessorSupplier<String, String> processorSupplier = new MockProcessorSupplier<>();
+
+        source.process(processorSupplier);
+
+        driver = new KStreamTestDriver(builder);
+        driver.setTime(0L);
+
+        driver.process("topic-source", "A", "aa");
+
+        // no exception was thrown
+        assertEquals(Utils.mkList("A:aa"), processorSupplier.processed);
+    }
+
+    @Test
+    public void shouldTryProcessingFromThoughTopic() {
+        final KStream<String, String> source = builder.stream("topic-source");
+        final KStream<String, String> through = source.through("topic-sink");
+
+        final MockProcessorSupplier<String, String> sourceProcessorSupplier = new MockProcessorSupplier<>();
+        final MockProcessorSupplier<String, String> throughProcessorSupplier = new MockProcessorSupplier<>();
+
+        source.process(sourceProcessorSupplier);
+        through.process(throughProcessorSupplier);
+
+        driver = new KStreamTestDriver(builder);
+        driver.setTime(0L);
+
+        driver.process("topic-source", "A", "aa");
+
+        assertEquals(Utils.mkList("A:aa"), sourceProcessorSupplier.processed);
+        assertEquals(Utils.mkList("A:aa"), throughProcessorSupplier.processed);
+    }
+
     @Test
     public void testNewStoreName() {
         assertEquals("X-STATE-STORE-0000000000", builder.newStoreName("X-"));
@@ -101,14 +140,14 @@ public class KStreamBuilderTest {
 
     @Test
     public void testMerge() {
-        String topic1 = "topic-1";
-        String topic2 = "topic-2";
+        final String topic1 = "topic-1";
+        final String topic2 = "topic-2";
 
-        KStream<String, String> source1 = builder.stream(topic1);
-        KStream<String, String> source2 = builder.stream(topic2);
-        KStream<String, String> merged = builder.merge(source1, source2);
+        final KStream<String, String> source1 = builder.stream(topic1);
+        final KStream<String, String> source2 = builder.stream(topic2);
+        final KStream<String, String> merged = builder.merge(source1, source2);
 
-        MockProcessorSupplier<String, String> processorSupplier = new MockProcessorSupplier<>();
+        final MockProcessorSupplier<String, String> processorSupplier = new MockProcessorSupplier<>();
         merged.process(processorSupplier);
 
         driver = new KStreamTestDriver(builder);

http://git-wip-us.apache.org/repos/asf/kafka/blob/b982eefd/streams/src/test/java/org/apache/kafka/test/KStreamTestDriver.java
----------------------------------------------------------------------
diff --git a/streams/src/test/java/org/apache/kafka/test/KStreamTestDriver.java b/streams/src/test/java/org/apache/kafka/test/KStreamTestDriver.java
index 73c944a..9fb83ba 100644
--- a/streams/src/test/java/org/apache/kafka/test/KStreamTestDriver.java
+++ b/streams/src/test/java/org/apache/kafka/test/KStreamTestDriver.java
@@ -22,12 +22,11 @@ import org.apache.kafka.common.serialization.Serdes;
 import org.apache.kafka.common.serialization.Serializer;
 import org.apache.kafka.streams.kstream.KStreamBuilder;
 import org.apache.kafka.streams.processor.ProcessorContext;
-import org.apache.kafka.streams.processor.internals.MockStreamsMetrics;
-import org.apache.kafka.streams.processor.internals.ProcessorRecordContext;
 import org.apache.kafka.streams.processor.StateStore;
 import org.apache.kafka.streams.processor.StreamPartitioner;
+import org.apache.kafka.streams.processor.internals.MockStreamsMetrics;
 import org.apache.kafka.streams.processor.internals.ProcessorNode;
-import org.apache.kafka.streams.processor.internals.ProcessorStateManager;
+import org.apache.kafka.streams.processor.internals.ProcessorRecordContext;
 import org.apache.kafka.streams.processor.internals.ProcessorTopology;
 import org.apache.kafka.streams.processor.internals.RecordCollectorImpl;
 import org.apache.kafka.streams.state.internals.ThreadCache;
@@ -109,23 +108,27 @@ public class KStreamTestDriver {
 
     public void process(final String topicName, final Object key, final Object value) {
         final ProcessorNode prevNode = context.currentNode();
-        ProcessorNode currNode = topology.source(topicName);
-        if (currNode == null && globalTopology != null) {
-            currNode = globalTopology.source(topicName);
-        }
+        final ProcessorNode currNode = sourceNodeByTopicName(topicName);
 
-        // if currNode is null, check if this topic is a changelog topic;
-        // if yes, skip
-        if (topicName.endsWith(ProcessorStateManager.STATE_CHANGELOG_TOPIC_SUFFIX)) {
-            return;
+        if (currNode != null) {
+            context.setRecordContext(createRecordContext(context.timestamp()));
+            context.setCurrentNode(currNode);
+            try {
+                context.forward(key, value);
+            } finally {
+                context.setCurrentNode(prevNode);
+            }
         }
-        context.setRecordContext(createRecordContext(context.timestamp()));
-        context.setCurrentNode(currNode);
-        try {
-            context.forward(key, value);
-        } finally {
-            context.setCurrentNode(prevNode);
+    }
+
+    private ProcessorNode sourceNodeByTopicName(final String topicName) {
+        ProcessorNode topicNode = topology.source(topicName);
+
+        if (topicNode == null && globalTopology != null) {
+            topicNode = globalTopology.source(topicName);
         }
+
+        return topicNode;
     }
 
     public void punctuate(final long timestamp) {
@@ -224,7 +227,9 @@ public class KStreamTestDriver {
                                 final Serializer<V> valueSerializer,
                                 final StreamPartitioner<? super K, ? super V> partitioner) {
             // The serialization is skipped.
-            process(topic, key, value);
+            if (sourceNodeByTopicName(topic) != null) {
+                process(topic, key, value);
+            }
         }
 
         @Override
@@ -236,7 +241,9 @@ public class KStreamTestDriver {
                                 final Serializer<K> keySerializer,
                                 final Serializer<V> valueSerializer) {
         // The serialization is skipped.
-            process(topic, key, value);
+            if (sourceNodeByTopicName(topic) != null) {
+                process(topic, key, value);
+            }
         }
 
         @Override