You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@kafka.apache.org by gu...@apache.org on 2016/08/26 19:40:25 UTC

kafka git commit: MINOR: Fix ProcessorTopologyTestDriver to support multiple source topics

Repository: kafka
Updated Branches:
  refs/heads/trunk d4c379832 -> 190239441


MINOR: Fix ProcessorTopologyTestDriver to support multiple source topics

There's a minor bug in ProcessorTopologyTestDriver that prevents it from working with a topology that contains multiple sources.  The bug is that ```consumer.assign()``` is called while looping through all the source topics, but, consumer.assign resets the state of the MockConsumer to only consume from the topics passed in.  This patch fixes the issue by calling consumer.assign once with all the TopicPartition instances.  Unit test (testDrivingSimpleMultiSourceTopology) included.

This contribution is my original work and I license the work to the project under the project's open source license.

Author: Mathieu Fenniak <ma...@replicon.com>

Reviewers: Guozhang Wang <wa...@gmail.com>

Closes #1782 from mfenniak/ProcessorTopologyTestDriver-multiple-source-bugfix


Project: http://git-wip-us.apache.org/repos/asf/kafka/repo
Commit: http://git-wip-us.apache.org/repos/asf/kafka/commit/19023944
Tree: http://git-wip-us.apache.org/repos/asf/kafka/tree/19023944
Diff: http://git-wip-us.apache.org/repos/asf/kafka/diff/19023944

Branch: refs/heads/trunk
Commit: 190239441878d73ad0c50d59c73bcc8717a12ed6
Parents: d4c3798
Author: Mathieu Fenniak <ma...@replicon.com>
Authored: Fri Aug 26 12:40:18 2016 -0700
Committer: Guozhang Wang <wa...@gmail.com>
Committed: Fri Aug 26 12:40:18 2016 -0700

----------------------------------------------------------------------
 .../internals/ProcessorTopologyTest.java        | 72 +++++++++++++-------
 .../kafka/test/ProcessorTopologyTestDriver.java |  2 +-
 2 files changed, 49 insertions(+), 25 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/kafka/blob/19023944/streams/src/test/java/org/apache/kafka/streams/processor/internals/ProcessorTopologyTest.java
----------------------------------------------------------------------
diff --git a/streams/src/test/java/org/apache/kafka/streams/processor/internals/ProcessorTopologyTest.java b/streams/src/test/java/org/apache/kafka/streams/processor/internals/ProcessorTopologyTest.java
index f7ef7f7..09434c3 100644
--- a/streams/src/test/java/org/apache/kafka/streams/processor/internals/ProcessorTopologyTest.java
+++ b/streams/src/test/java/org/apache/kafka/streams/processor/internals/ProcessorTopologyTest.java
@@ -54,7 +54,8 @@ public class ProcessorTopologyTest {
     private static final Serializer<String> STRING_SERIALIZER = new StringSerializer();
     private static final Deserializer<String> STRING_DESERIALIZER = new StringDeserializer();
 
-    protected static final String INPUT_TOPIC = "input-topic";
+    protected static final String INPUT_TOPIC_1 = "input-topic-1";
+    protected static final String INPUT_TOPIC_2 = "input-topic-2";
     protected static final String OUTPUT_TOPIC_1 = "output-topic-1";
     protected static final String OUTPUT_TOPIC_2 = "output-topic-2";
 
@@ -117,17 +118,17 @@ public class ProcessorTopologyTest {
     public void testDrivingSimpleTopology() {
         int partition = 10;
         driver = new ProcessorTopologyTestDriver(config, createSimpleTopology(partition));
-        driver.process(INPUT_TOPIC, "key1", "value1", STRING_SERIALIZER, STRING_SERIALIZER);
+        driver.process(INPUT_TOPIC_1, "key1", "value1", STRING_SERIALIZER, STRING_SERIALIZER);
         assertNextOutputRecord(OUTPUT_TOPIC_1, "key1", "value1", partition);
         assertNoOutputRecord(OUTPUT_TOPIC_2);
 
-        driver.process(INPUT_TOPIC, "key2", "value2", STRING_SERIALIZER, STRING_SERIALIZER);
+        driver.process(INPUT_TOPIC_1, "key2", "value2", STRING_SERIALIZER, STRING_SERIALIZER);
         assertNextOutputRecord(OUTPUT_TOPIC_1, "key2", "value2", partition);
         assertNoOutputRecord(OUTPUT_TOPIC_2);
 
-        driver.process(INPUT_TOPIC, "key3", "value3", STRING_SERIALIZER, STRING_SERIALIZER);
-        driver.process(INPUT_TOPIC, "key4", "value4", STRING_SERIALIZER, STRING_SERIALIZER);
-        driver.process(INPUT_TOPIC, "key5", "value5", STRING_SERIALIZER, STRING_SERIALIZER);
+        driver.process(INPUT_TOPIC_1, "key3", "value3", STRING_SERIALIZER, STRING_SERIALIZER);
+        driver.process(INPUT_TOPIC_1, "key4", "value4", STRING_SERIALIZER, STRING_SERIALIZER);
+        driver.process(INPUT_TOPIC_1, "key5", "value5", STRING_SERIALIZER, STRING_SERIALIZER);
         assertNoOutputRecord(OUTPUT_TOPIC_2);
         assertNextOutputRecord(OUTPUT_TOPIC_1, "key3", "value3", partition);
         assertNextOutputRecord(OUTPUT_TOPIC_1, "key4", "value4", partition);
@@ -137,17 +138,17 @@ public class ProcessorTopologyTest {
     @Test
     public void testDrivingMultiplexingTopology() {
         driver = new ProcessorTopologyTestDriver(config, createMultiplexingTopology());
-        driver.process(INPUT_TOPIC, "key1", "value1", STRING_SERIALIZER, STRING_SERIALIZER);
+        driver.process(INPUT_TOPIC_1, "key1", "value1", STRING_SERIALIZER, STRING_SERIALIZER);
         assertNextOutputRecord(OUTPUT_TOPIC_1, "key1", "value1(1)");
         assertNextOutputRecord(OUTPUT_TOPIC_2, "key1", "value1(2)");
 
-        driver.process(INPUT_TOPIC, "key2", "value2", STRING_SERIALIZER, STRING_SERIALIZER);
+        driver.process(INPUT_TOPIC_1, "key2", "value2", STRING_SERIALIZER, STRING_SERIALIZER);
         assertNextOutputRecord(OUTPUT_TOPIC_1, "key2", "value2(1)");
         assertNextOutputRecord(OUTPUT_TOPIC_2, "key2", "value2(2)");
 
-        driver.process(INPUT_TOPIC, "key3", "value3", STRING_SERIALIZER, STRING_SERIALIZER);
-        driver.process(INPUT_TOPIC, "key4", "value4", STRING_SERIALIZER, STRING_SERIALIZER);
-        driver.process(INPUT_TOPIC, "key5", "value5", STRING_SERIALIZER, STRING_SERIALIZER);
+        driver.process(INPUT_TOPIC_1, "key3", "value3", STRING_SERIALIZER, STRING_SERIALIZER);
+        driver.process(INPUT_TOPIC_1, "key4", "value4", STRING_SERIALIZER, STRING_SERIALIZER);
+        driver.process(INPUT_TOPIC_1, "key5", "value5", STRING_SERIALIZER, STRING_SERIALIZER);
         assertNextOutputRecord(OUTPUT_TOPIC_1, "key3", "value3(1)");
         assertNextOutputRecord(OUTPUT_TOPIC_1, "key4", "value4(1)");
         assertNextOutputRecord(OUTPUT_TOPIC_1, "key5", "value5(1)");
@@ -159,17 +160,17 @@ public class ProcessorTopologyTest {
     @Test
     public void testDrivingMultiplexByNameTopology() {
         driver = new ProcessorTopologyTestDriver(config, createMultiplexByNameTopology());
-        driver.process(INPUT_TOPIC, "key1", "value1", STRING_SERIALIZER, STRING_SERIALIZER);
+        driver.process(INPUT_TOPIC_1, "key1", "value1", STRING_SERIALIZER, STRING_SERIALIZER);
         assertNextOutputRecord(OUTPUT_TOPIC_1, "key1", "value1(1)");
         assertNextOutputRecord(OUTPUT_TOPIC_2, "key1", "value1(2)");
 
-        driver.process(INPUT_TOPIC, "key2", "value2", STRING_SERIALIZER, STRING_SERIALIZER);
+        driver.process(INPUT_TOPIC_1, "key2", "value2", STRING_SERIALIZER, STRING_SERIALIZER);
         assertNextOutputRecord(OUTPUT_TOPIC_1, "key2", "value2(1)");
         assertNextOutputRecord(OUTPUT_TOPIC_2, "key2", "value2(2)");
 
-        driver.process(INPUT_TOPIC, "key3", "value3", STRING_SERIALIZER, STRING_SERIALIZER);
-        driver.process(INPUT_TOPIC, "key4", "value4", STRING_SERIALIZER, STRING_SERIALIZER);
-        driver.process(INPUT_TOPIC, "key5", "value5", STRING_SERIALIZER, STRING_SERIALIZER);
+        driver.process(INPUT_TOPIC_1, "key3", "value3", STRING_SERIALIZER, STRING_SERIALIZER);
+        driver.process(INPUT_TOPIC_1, "key4", "value4", STRING_SERIALIZER, STRING_SERIALIZER);
+        driver.process(INPUT_TOPIC_1, "key5", "value5", STRING_SERIALIZER, STRING_SERIALIZER);
         assertNextOutputRecord(OUTPUT_TOPIC_1, "key3", "value3(1)");
         assertNextOutputRecord(OUTPUT_TOPIC_1, "key4", "value4(1)");
         assertNextOutputRecord(OUTPUT_TOPIC_1, "key5", "value5(1)");
@@ -182,10 +183,10 @@ public class ProcessorTopologyTest {
     public void testDrivingStatefulTopology() {
         String storeName = "entries";
         driver = new ProcessorTopologyTestDriver(config, createStatefulTopology(storeName), storeName);
-        driver.process(INPUT_TOPIC, "key1", "value1", STRING_SERIALIZER, STRING_SERIALIZER);
-        driver.process(INPUT_TOPIC, "key2", "value2", STRING_SERIALIZER, STRING_SERIALIZER);
-        driver.process(INPUT_TOPIC, "key3", "value3", STRING_SERIALIZER, STRING_SERIALIZER);
-        driver.process(INPUT_TOPIC, "key1", "value4", STRING_SERIALIZER, STRING_SERIALIZER);
+        driver.process(INPUT_TOPIC_1, "key1", "value1", STRING_SERIALIZER, STRING_SERIALIZER);
+        driver.process(INPUT_TOPIC_1, "key2", "value2", STRING_SERIALIZER, STRING_SERIALIZER);
+        driver.process(INPUT_TOPIC_1, "key3", "value3", STRING_SERIALIZER, STRING_SERIALIZER);
+        driver.process(INPUT_TOPIC_1, "key1", "value4", STRING_SERIALIZER, STRING_SERIALIZER);
         assertNoOutputRecord(OUTPUT_TOPIC_1);
 
         KeyValueStore<String, String> store = driver.getKeyValueStore("entries");
@@ -195,6 +196,20 @@ public class ProcessorTopologyTest {
         assertNull(store.get("key4"));
     }
 
+    @Test
+    public void testDrivingSimpleMultiSourceTopology() {
+        int partition = 10;
+        driver = new ProcessorTopologyTestDriver(config, createSimpleMultiSourceTopology(partition));
+
+        driver.process(INPUT_TOPIC_1, "key1", "value1", STRING_SERIALIZER, STRING_SERIALIZER);
+        assertNextOutputRecord(OUTPUT_TOPIC_1, "key1", "value1", partition);
+        assertNoOutputRecord(OUTPUT_TOPIC_2);
+
+        driver.process(INPUT_TOPIC_2, "key2", "value2", STRING_SERIALIZER, STRING_SERIALIZER);
+        assertNextOutputRecord(OUTPUT_TOPIC_2, "key2", "value2", partition);
+        assertNoOutputRecord(OUTPUT_TOPIC_1);
+    }
+
     protected void assertNextOutputRecord(String topic, String key, String value) {
         ProducerRecord<String, String> record = driver.readOutput(topic, STRING_DESERIALIZER, STRING_DESERIALIZER);
         assertEquals(topic, record.topic());
@@ -225,27 +240,27 @@ public class ProcessorTopologyTest {
     }
 
     protected TopologyBuilder createSimpleTopology(int partition) {
-        return new TopologyBuilder().addSource("source", STRING_DESERIALIZER, STRING_DESERIALIZER, INPUT_TOPIC)
+        return new TopologyBuilder().addSource("source", STRING_DESERIALIZER, STRING_DESERIALIZER, INPUT_TOPIC_1)
                                     .addProcessor("processor", define(new ForwardingProcessor()), "source")
                                     .addSink("sink", OUTPUT_TOPIC_1, constantPartitioner(partition), "processor");
     }
 
     protected TopologyBuilder createMultiplexingTopology() {
-        return new TopologyBuilder().addSource("source", STRING_DESERIALIZER, STRING_DESERIALIZER, INPUT_TOPIC)
+        return new TopologyBuilder().addSource("source", STRING_DESERIALIZER, STRING_DESERIALIZER, INPUT_TOPIC_1)
                                     .addProcessor("processor", define(new MultiplexingProcessor(2)), "source")
                                     .addSink("sink1", OUTPUT_TOPIC_1, "processor")
                                     .addSink("sink2", OUTPUT_TOPIC_2, "processor");
     }
 
     protected TopologyBuilder createMultiplexByNameTopology() {
-        return new TopologyBuilder().addSource("source", STRING_DESERIALIZER, STRING_DESERIALIZER, INPUT_TOPIC)
+        return new TopologyBuilder().addSource("source", STRING_DESERIALIZER, STRING_DESERIALIZER, INPUT_TOPIC_1)
             .addProcessor("processor", define(new MultiplexByNameProcessor(2)), "source")
             .addSink("sink0", OUTPUT_TOPIC_1, "processor")
             .addSink("sink1", OUTPUT_TOPIC_2, "processor");
     }
 
     protected TopologyBuilder createStatefulTopology(String storeName) {
-        return new TopologyBuilder().addSource("source", STRING_DESERIALIZER, STRING_DESERIALIZER, INPUT_TOPIC)
+        return new TopologyBuilder().addSource("source", STRING_DESERIALIZER, STRING_DESERIALIZER, INPUT_TOPIC_1)
                                     .addProcessor("processor", define(new StatefulProcessor(storeName)), "source")
                                     .addStateStore(
                                             Stores.create(storeName).withStringKeys().withStringValues().inMemory().build(),
@@ -254,6 +269,15 @@ public class ProcessorTopologyTest {
                                     .addSink("counts", OUTPUT_TOPIC_1, "processor");
     }
 
+    protected TopologyBuilder createSimpleMultiSourceTopology(int partition) {
+        return new TopologyBuilder().addSource("source-1", STRING_DESERIALIZER, STRING_DESERIALIZER, INPUT_TOPIC_1)
+                .addProcessor("processor-1", define(new ForwardingProcessor()), "source-1")
+                .addSink("sink-1", OUTPUT_TOPIC_1, constantPartitioner(partition), "processor-1")
+                .addSource("source-2", STRING_DESERIALIZER, STRING_DESERIALIZER, INPUT_TOPIC_2)
+                .addProcessor("processor-2", define(new ForwardingProcessor()), "source-2")
+                .addSink("sink-2", OUTPUT_TOPIC_2, constantPartitioner(partition), "processor-2");
+    }
+
     /**
      * A processor that simply forwards all messages to all children.
      */

http://git-wip-us.apache.org/repos/asf/kafka/blob/19023944/streams/src/test/java/org/apache/kafka/test/ProcessorTopologyTestDriver.java
----------------------------------------------------------------------
diff --git a/streams/src/test/java/org/apache/kafka/test/ProcessorTopologyTestDriver.java b/streams/src/test/java/org/apache/kafka/test/ProcessorTopologyTestDriver.java
index 6b8d969..8d2ad08 100644
--- a/streams/src/test/java/org/apache/kafka/test/ProcessorTopologyTestDriver.java
+++ b/streams/src/test/java/org/apache/kafka/test/ProcessorTopologyTestDriver.java
@@ -163,10 +163,10 @@ public class ProcessorTopologyTestDriver {
         // Set up all of the topic+partition information and subscribe the consumer to each ...
         for (String topic : topology.sourceTopics()) {
             TopicPartition tp = new TopicPartition(topic, 1);
-            consumer.assign(Collections.singletonList(tp));
             partitionsByTopic.put(topic, tp);
             offsetsByTopicPartition.put(tp, new AtomicLong());
         }
+        consumer.assign(offsetsByTopicPartition.keySet());
 
         task = new StreamTask(id,
             applicationId,