You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@kafka.apache.org by gu...@apache.org on 2017/05/09 21:31:53 UTC
kafka git commit: KAFKA-4927: Fix KStreamsTestDriver to not throw NPE
when KStream.to() sinks are used
Repository: kafka
Updated Branches:
refs/heads/trunk 8ace736f7 -> b982eefd3
KAFKA-4927: Fix KStreamsTestDriver to not throw NPE when KStream.to() sinks are used
a KStream.to() sink is also a topic
... so the KStreamTestDriver to fetch it when required
Author: Wim Van Leuven <wi...@bigboards.io>
Author: Wim Van Leuven <wi...@highestpoint.biz>
Reviewers: Eno Thereska, Matthias J. Sax, Guozhang Wang
Closes #2716 from wimvanleuven/KAFKA-4927
Project: http://git-wip-us.apache.org/repos/asf/kafka/repo
Commit: http://git-wip-us.apache.org/repos/asf/kafka/commit/b982eefd
Tree: http://git-wip-us.apache.org/repos/asf/kafka/tree/b982eefd
Diff: http://git-wip-us.apache.org/repos/asf/kafka/diff/b982eefd
Branch: refs/heads/trunk
Commit: b982eefd37595b30600b7a979915db5da22271fe
Parents: 8ace736
Author: Wim Van Leuven <wi...@bigboards.io>
Authored: Tue May 9 14:31:50 2017 -0700
Committer: Guozhang Wang <wa...@gmail.com>
Committed: Tue May 9 14:31:50 2017 -0700
----------------------------------------------------------------------
.../streams/kstream/KStreamBuilderTest.java | 53 +++++++++++++++++---
.../apache/kafka/test/KStreamTestDriver.java | 45 ++++++++++-------
2 files changed, 72 insertions(+), 26 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/kafka/blob/b982eefd/streams/src/test/java/org/apache/kafka/streams/kstream/KStreamBuilderTest.java
----------------------------------------------------------------------
diff --git a/streams/src/test/java/org/apache/kafka/streams/kstream/KStreamBuilderTest.java b/streams/src/test/java/org/apache/kafka/streams/kstream/KStreamBuilderTest.java
index 6fc6bd4..e7cb669 100644
--- a/streams/src/test/java/org/apache/kafka/streams/kstream/KStreamBuilderTest.java
+++ b/streams/src/test/java/org/apache/kafka/streams/kstream/KStreamBuilderTest.java
@@ -79,13 +79,52 @@ public class KStreamBuilderTest {
assertEquals("Y-0000000001", builder.newName("Y-"));
assertEquals("Z-0000000002", builder.newName("Z-"));
- KStreamBuilder newBuilder = new KStreamBuilder();
+ final KStreamBuilder newBuilder = new KStreamBuilder();
assertEquals("X-0000000000", newBuilder.newName("X-"));
assertEquals("Y-0000000001", newBuilder.newName("Y-"));
assertEquals("Z-0000000002", newBuilder.newName("Z-"));
}
+
+ @Test
+ public void shouldNotTryProcessingFromSinkTopic() {
+ final KStream<String, String> source = builder.stream("topic-source");
+ source.to("topic-sink");
+
+ final MockProcessorSupplier<String, String> processorSupplier = new MockProcessorSupplier<>();
+
+ source.process(processorSupplier);
+
+ driver = new KStreamTestDriver(builder);
+ driver.setTime(0L);
+
+ driver.process("topic-source", "A", "aa");
+
+ // no exception was thrown
+ assertEquals(Utils.mkList("A:aa"), processorSupplier.processed);
+ }
+
+ @Test
+ public void shouldTryProcessingFromThoughTopic() {
+ final KStream<String, String> source = builder.stream("topic-source");
+ final KStream<String, String> through = source.through("topic-sink");
+
+ final MockProcessorSupplier<String, String> sourceProcessorSupplier = new MockProcessorSupplier<>();
+ final MockProcessorSupplier<String, String> throughProcessorSupplier = new MockProcessorSupplier<>();
+
+ source.process(sourceProcessorSupplier);
+ through.process(throughProcessorSupplier);
+
+ driver = new KStreamTestDriver(builder);
+ driver.setTime(0L);
+
+ driver.process("topic-source", "A", "aa");
+
+ assertEquals(Utils.mkList("A:aa"), sourceProcessorSupplier.processed);
+ assertEquals(Utils.mkList("A:aa"), throughProcessorSupplier.processed);
+ }
+
@Test
public void testNewStoreName() {
assertEquals("X-STATE-STORE-0000000000", builder.newStoreName("X-"));
@@ -101,14 +140,14 @@ public class KStreamBuilderTest {
@Test
public void testMerge() {
- String topic1 = "topic-1";
- String topic2 = "topic-2";
+ final String topic1 = "topic-1";
+ final String topic2 = "topic-2";
- KStream<String, String> source1 = builder.stream(topic1);
- KStream<String, String> source2 = builder.stream(topic2);
- KStream<String, String> merged = builder.merge(source1, source2);
+ final KStream<String, String> source1 = builder.stream(topic1);
+ final KStream<String, String> source2 = builder.stream(topic2);
+ final KStream<String, String> merged = builder.merge(source1, source2);
- MockProcessorSupplier<String, String> processorSupplier = new MockProcessorSupplier<>();
+ final MockProcessorSupplier<String, String> processorSupplier = new MockProcessorSupplier<>();
merged.process(processorSupplier);
driver = new KStreamTestDriver(builder);
http://git-wip-us.apache.org/repos/asf/kafka/blob/b982eefd/streams/src/test/java/org/apache/kafka/test/KStreamTestDriver.java
----------------------------------------------------------------------
diff --git a/streams/src/test/java/org/apache/kafka/test/KStreamTestDriver.java b/streams/src/test/java/org/apache/kafka/test/KStreamTestDriver.java
index 73c944a..9fb83ba 100644
--- a/streams/src/test/java/org/apache/kafka/test/KStreamTestDriver.java
+++ b/streams/src/test/java/org/apache/kafka/test/KStreamTestDriver.java
@@ -22,12 +22,11 @@ import org.apache.kafka.common.serialization.Serdes;
import org.apache.kafka.common.serialization.Serializer;
import org.apache.kafka.streams.kstream.KStreamBuilder;
import org.apache.kafka.streams.processor.ProcessorContext;
-import org.apache.kafka.streams.processor.internals.MockStreamsMetrics;
-import org.apache.kafka.streams.processor.internals.ProcessorRecordContext;
import org.apache.kafka.streams.processor.StateStore;
import org.apache.kafka.streams.processor.StreamPartitioner;
+import org.apache.kafka.streams.processor.internals.MockStreamsMetrics;
import org.apache.kafka.streams.processor.internals.ProcessorNode;
-import org.apache.kafka.streams.processor.internals.ProcessorStateManager;
+import org.apache.kafka.streams.processor.internals.ProcessorRecordContext;
import org.apache.kafka.streams.processor.internals.ProcessorTopology;
import org.apache.kafka.streams.processor.internals.RecordCollectorImpl;
import org.apache.kafka.streams.state.internals.ThreadCache;
@@ -109,23 +108,27 @@ public class KStreamTestDriver {
public void process(final String topicName, final Object key, final Object value) {
final ProcessorNode prevNode = context.currentNode();
- ProcessorNode currNode = topology.source(topicName);
- if (currNode == null && globalTopology != null) {
- currNode = globalTopology.source(topicName);
- }
+ final ProcessorNode currNode = sourceNodeByTopicName(topicName);
- // if currNode is null, check if this topic is a changelog topic;
- // if yes, skip
- if (topicName.endsWith(ProcessorStateManager.STATE_CHANGELOG_TOPIC_SUFFIX)) {
- return;
+ if (currNode != null) {
+ context.setRecordContext(createRecordContext(context.timestamp()));
+ context.setCurrentNode(currNode);
+ try {
+ context.forward(key, value);
+ } finally {
+ context.setCurrentNode(prevNode);
+ }
}
- context.setRecordContext(createRecordContext(context.timestamp()));
- context.setCurrentNode(currNode);
- try {
- context.forward(key, value);
- } finally {
- context.setCurrentNode(prevNode);
+ }
+
+ private ProcessorNode sourceNodeByTopicName(final String topicName) {
+ ProcessorNode topicNode = topology.source(topicName);
+
+ if (topicNode == null && globalTopology != null) {
+ topicNode = globalTopology.source(topicName);
}
+
+ return topicNode;
}
public void punctuate(final long timestamp) {
@@ -224,7 +227,9 @@ public class KStreamTestDriver {
final Serializer<V> valueSerializer,
final StreamPartitioner<? super K, ? super V> partitioner) {
// The serialization is skipped.
- process(topic, key, value);
+ if (sourceNodeByTopicName(topic) != null) {
+ process(topic, key, value);
+ }
}
@Override
@@ -236,7 +241,9 @@ public class KStreamTestDriver {
final Serializer<K> keySerializer,
final Serializer<V> valueSerializer) {
// The serialization is skipped.
- process(topic, key, value);
+ if (sourceNodeByTopicName(topic) != null) {
+ process(topic, key, value);
+ }
}
@Override