You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@kafka.apache.org by gu...@apache.org on 2017/03/06 06:42:46 UTC

kafka git commit: KAFKA-4828: ProcessorTopologyTestDriver does not work when using through

Repository: kafka
Updated Branches:
  refs/heads/trunk b1272500b -> 5b013d9cd


KAFKA-4828: ProcessorTopologyTestDriver does not work when using through

This resolves the following issues in the ProcessorTopologyTestDriver:

- It should not create an internal changelog topic when using `through()` and `table()`
- It should forward the produced record back into the topology if it is to a source topic

Jira ticket: https://issues.apache.org/jira/browse/KAFKA-4828

The contribution is my original work and I license the work to the project under the project's open source license.

Author: Hamidreza Afzali <hr...@gmail.com>

Reviewers: Matthias J. Sax, Guozhang Wang

Closes #2629 from hrafzali/KAFKA-4828_ProcessorTopologyTestDriver_through


Project: http://git-wip-us.apache.org/repos/asf/kafka/repo
Commit: http://git-wip-us.apache.org/repos/asf/kafka/commit/5b013d9c
Tree: http://git-wip-us.apache.org/repos/asf/kafka/tree/5b013d9c
Diff: http://git-wip-us.apache.org/repos/asf/kafka/diff/5b013d9c

Branch: refs/heads/trunk
Commit: 5b013d9cd28d86b50109e8e08f813c3b5af0054b
Parents: b127250
Author: Hamidreza Afzali <hr...@gmail.com>
Authored: Sun Mar 5 22:42:16 2017 -0800
Committer: Guozhang Wang <wa...@gmail.com>
Committed: Sun Mar 5 22:42:16 2017 -0800

----------------------------------------------------------------------
 .../streams/processor/TopologyBuilderTest.java  |  2 +-
 .../internals/ProcessorTopologyTest.java        | 22 +++++++++-
 .../kafka/test/ProcessorTopologyTestDriver.java | 46 ++++++++++----------
 3 files changed, 43 insertions(+), 27 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/kafka/blob/5b013d9c/streams/src/test/java/org/apache/kafka/streams/processor/TopologyBuilderTest.java
----------------------------------------------------------------------
diff --git a/streams/src/test/java/org/apache/kafka/streams/processor/TopologyBuilderTest.java b/streams/src/test/java/org/apache/kafka/streams/processor/TopologyBuilderTest.java
index c3474b0..88a420a 100644
--- a/streams/src/test/java/org/apache/kafka/streams/processor/TopologyBuilderTest.java
+++ b/streams/src/test/java/org/apache/kafka/streams/processor/TopologyBuilderTest.java
@@ -629,7 +629,7 @@ public class TopologyBuilderTest {
                     goodNodeName)
                 .addProcessor(badNodeName, new LocalMockProcessorSupplier(), sourceNodeName);
 
-            final ProcessorTopologyTestDriver driver = new ProcessorTopologyTestDriver(streamsConfig, builder, LocalMockProcessorSupplier.STORE_NAME);
+            final ProcessorTopologyTestDriver driver = new ProcessorTopologyTestDriver(streamsConfig, builder);
             driver.process("topic", null, null);
         } catch (final StreamsException e) {
             final Throwable cause = e.getCause();

http://git-wip-us.apache.org/repos/asf/kafka/blob/5b013d9c/streams/src/test/java/org/apache/kafka/streams/processor/internals/ProcessorTopologyTest.java
----------------------------------------------------------------------
diff --git a/streams/src/test/java/org/apache/kafka/streams/processor/internals/ProcessorTopologyTest.java b/streams/src/test/java/org/apache/kafka/streams/processor/internals/ProcessorTopologyTest.java
index 322c178..65b3e2f 100644
--- a/streams/src/test/java/org/apache/kafka/streams/processor/internals/ProcessorTopologyTest.java
+++ b/streams/src/test/java/org/apache/kafka/streams/processor/internals/ProcessorTopologyTest.java
@@ -189,7 +189,7 @@ public class ProcessorTopologyTest {
     @Test
     public void testDrivingStatefulTopology() {
         String storeName = "entries";
-        driver = new ProcessorTopologyTestDriver(config, createStatefulTopology(storeName), storeName);
+        driver = new ProcessorTopologyTestDriver(config, createStatefulTopology(storeName));
         driver.process(INPUT_TOPIC_1, "key1", "value1", STRING_SERIALIZER, STRING_SERIALIZER);
         driver.process(INPUT_TOPIC_1, "key2", "value2", STRING_SERIALIZER, STRING_SERIALIZER);
         driver.process(INPUT_TOPIC_1, "key3", "value3", STRING_SERIALIZER, STRING_SERIALIZER);
@@ -214,7 +214,7 @@ public class ProcessorTopologyTest {
         final TopologyBuilder topologyBuilder = this.builder
                 .addGlobalStore(globalStore, global, STRING_DESERIALIZER, STRING_DESERIALIZER, topic, "processor", define(new StatefulProcessor("my-store")));
 
-        driver = new ProcessorTopologyTestDriver(config, topologyBuilder, "my-store");
+        driver = new ProcessorTopologyTestDriver(config, topologyBuilder);
         driver.process(topic, "key1", "value1", STRING_SERIALIZER, STRING_SERIALIZER);
         driver.process(topic, "key2", "value2", STRING_SERIALIZER, STRING_SERIALIZER);
         assertEquals("value1", globalStore.get("key1"));
@@ -236,6 +236,17 @@ public class ProcessorTopologyTest {
     }
 
     @Test
+    public void testDrivingForwardToSourceTopology() {
+        driver = new ProcessorTopologyTestDriver(config, createForwardToSourceTopology());
+        driver.process(INPUT_TOPIC_1, "key1", "value1", STRING_SERIALIZER, STRING_SERIALIZER);
+        driver.process(INPUT_TOPIC_1, "key2", "value2", STRING_SERIALIZER, STRING_SERIALIZER);
+        driver.process(INPUT_TOPIC_1, "key3", "value3", STRING_SERIALIZER, STRING_SERIALIZER);
+        assertNextOutputRecord(OUTPUT_TOPIC_2, "key1", "value1");
+        assertNextOutputRecord(OUTPUT_TOPIC_2, "key2", "value2");
+        assertNextOutputRecord(OUTPUT_TOPIC_2, "key3", "value3");
+    }
+
+    @Test
     public void testDrivingInternalRepartitioningTopology() {
         driver = new ProcessorTopologyTestDriver(config, createInternalRepartitioningTopology());
         driver.process(INPUT_TOPIC_1, "key1", "value1", STRING_SERIALIZER, STRING_SERIALIZER);
@@ -380,6 +391,13 @@ public class ProcessorTopologyTest {
                 .addSink("sink1", OUTPUT_TOPIC_1, "source1");
     }
 
+    private TopologyBuilder createForwardToSourceTopology() {
+        return builder.addSource("source-1", INPUT_TOPIC_1)
+                .addSink("sink-1", OUTPUT_TOPIC_1, "source-1")
+                .addSource("source-2", OUTPUT_TOPIC_1)
+                .addSink("sink-2", OUTPUT_TOPIC_2, "source-2");
+    }
+
     private TopologyBuilder createSimpleMultiSourceTopology(int partition) {
         return builder.addSource("source-1", STRING_DESERIALIZER, STRING_DESERIALIZER, INPUT_TOPIC_1)
                 .addProcessor("processor-1", define(new ForwardingProcessor()), "source-1")

http://git-wip-us.apache.org/repos/asf/kafka/blob/5b013d9c/streams/src/test/java/org/apache/kafka/test/ProcessorTopologyTestDriver.java
----------------------------------------------------------------------
diff --git a/streams/src/test/java/org/apache/kafka/test/ProcessorTopologyTestDriver.java b/streams/src/test/java/org/apache/kafka/test/ProcessorTopologyTestDriver.java
index b704aa7..1e97e11 100644
--- a/streams/src/test/java/org/apache/kafka/test/ProcessorTopologyTestDriver.java
+++ b/streams/src/test/java/org/apache/kafka/test/ProcessorTopologyTestDriver.java
@@ -52,7 +52,6 @@ import org.apache.kafka.streams.processor.internals.GlobalStateUpdateTask;
 import org.apache.kafka.streams.processor.internals.InternalProcessorContext;
 import org.apache.kafka.streams.processor.internals.ProcessorContextImpl;
 import org.apache.kafka.streams.processor.internals.ProcessorRecordContext;
-import org.apache.kafka.streams.processor.internals.ProcessorStateManager;
 import org.apache.kafka.streams.processor.internals.ProcessorTopology;
 import org.apache.kafka.streams.processor.internals.RecordCollectorImpl;
 import org.apache.kafka.streams.processor.internals.StateDirectory;
@@ -143,9 +142,10 @@ public class ProcessorTopologyTestDriver {
 
     private final Serializer<byte[]> bytesSerializer = new ByteArraySerializer();
 
-    private final String applicationId = "test-driver-application";
+    private final static String APPLICATION_ID = "test-driver-application";
+    private final static int PARTITION_ID = 0;
+    private final static TaskId TASK_ID = new TaskId(0, PARTITION_ID);
 
-    private final TaskId id;
     private final ProcessorTopology topology;
     private final MockConsumer<byte[], byte[]> consumer;
     private final MockProducer<byte[], byte[]> producer;
@@ -163,11 +163,9 @@ public class ProcessorTopologyTestDriver {
      * Create a new test driver instance.
      * @param config the stream configuration for the topology
      * @param builder the topology builder that will be used to create the topology instance
-     * @param storeNames the optional names of the state stores that are used by the topology
      */
-    public ProcessorTopologyTestDriver(StreamsConfig config, TopologyBuilder builder, String... storeNames) {
-        id = new TaskId(0, 0);
-        topology = builder.setApplicationId(applicationId).build(null);
+    public ProcessorTopologyTestDriver(StreamsConfig config, TopologyBuilder builder) {
+        topology = builder.setApplicationId(APPLICATION_ID).build(null);
         globalTopology  = builder.buildGlobalStateTopology();
 
         // Set up the consumer and producer ...
@@ -175,10 +173,10 @@ public class ProcessorTopologyTestDriver {
         producer = new MockProducer<byte[], byte[]>(true, bytesSerializer, bytesSerializer) {
             @Override
             public List<PartitionInfo> partitionsFor(String topic) {
-                return Collections.singletonList(new PartitionInfo(topic, 0, null, null, null));
+                return Collections.singletonList(new PartitionInfo(topic, PARTITION_ID, null, null, null));
             }
         };
-        restoreStateConsumer = createRestoreConsumer(id, storeNames);
+        restoreStateConsumer = createRestoreConsumer(TASK_ID, topology.storeToChangelogTopic());
 
         // Identify internal topics for forwarding in process ...
         for (TopologyBuilder.TopicsInfo topicsInfo : builder.topicGroups().values()) {
@@ -187,14 +185,14 @@ public class ProcessorTopologyTestDriver {
 
         // Set up all of the topic+partition information and subscribe the consumer to each ...
         for (String topic : topology.sourceTopics()) {
-            TopicPartition tp = new TopicPartition(topic, 1);
+            TopicPartition tp = new TopicPartition(topic, PARTITION_ID);
             partitionsByTopic.put(topic, tp);
             offsetsByTopicPartition.put(tp, new AtomicLong());
         }
 
         consumer.assign(offsetsByTopicPartition.keySet());
 
-        final StateDirectory stateDirectory = new StateDirectory(applicationId, TestUtils.tempDirectory().getPath(), Time.SYSTEM);
+        final StateDirectory stateDirectory = new StateDirectory(APPLICATION_ID, TestUtils.tempDirectory().getPath(), Time.SYSTEM);
         final StreamsMetrics streamsMetrics = new MockStreamsMetrics(new Metrics());
         final ThreadCache cache = new ThreadCache("mock", 1024 * 1024, streamsMetrics);
 
@@ -218,8 +216,8 @@ public class ProcessorTopologyTestDriver {
         }
 
         if (!partitionsByTopic.isEmpty()) {
-            task = new StreamTask(id,
-                                  applicationId,
+            task = new StreamTask(TASK_ID,
+                                  APPLICATION_ID,
                                   partitionsByTopic.values(),
                                   topology,
                                   consumer,
@@ -263,8 +261,8 @@ public class ProcessorTopologyTestDriver {
                 }
                 outputRecords.add(record);
 
-                // Forward back into the topology if the produced record is to an internal topic ...
-                if (internalTopics.contains(record.topic())) {
+                // Forward back into the topology if the produced record is to an internal or a source topic ...
+                if (internalTopics.contains(record.topic()) || topology.sourceTopics().contains(record.topic())) {
                     process(record.topic(), record.key(), record.value(), record.timestamp());
                 }
             }
@@ -339,7 +337,7 @@ public class ProcessorTopologyTestDriver {
 
     /**
      * Get the {@link StateStore} with the given name. The name should have been supplied via
-     * {@link #ProcessorTopologyTestDriver(StreamsConfig, TopologyBuilder, String...) this object's constructor}, and is
+     * {@link #ProcessorTopologyTestDriver(StreamsConfig, TopologyBuilder) this object's constructor}, and is
      * presumed to be used by a Processor within the topology.
      * <p>
      * This is often useful in test cases to pre-populate the store before the test case instructs the topology to
@@ -355,7 +353,7 @@ public class ProcessorTopologyTestDriver {
 
     /**
      * Get the {@link KeyValueStore} with the given name. The name should have been supplied via
-     * {@link #ProcessorTopologyTestDriver(StreamsConfig, TopologyBuilder, String...) this object's constructor}, and is
+     * {@link #ProcessorTopologyTestDriver(StreamsConfig, TopologyBuilder) this object's constructor}, and is
      * presumed to be used by a Processor within the topology.
      * <p>
      * This is often useful in test cases to pre-populate the store before the test case instructs the topology to
@@ -393,10 +391,10 @@ public class ProcessorTopologyTestDriver {
      * driver object unless this method is overwritten with a functional consumer.
      *
      * @param id the ID of the stream task
-     * @param storeNames the names of the stores that this
+     * @param storeToChangelogTopic the map of the names of the stores to the changelog topics
      * @return the mock consumer; never null
      */
-    protected MockConsumer<byte[], byte[]> createRestoreConsumer(TaskId id, String... storeNames) {
+    protected MockConsumer<byte[], byte[]> createRestoreConsumer(TaskId id, Map<String, String> storeToChangelogTopic) {
         MockConsumer<byte[], byte[]> consumer = new MockConsumer<byte[], byte[]>(OffsetResetStrategy.LATEST) {
             @Override
             public synchronized void seekToEnd(Collection<TopicPartition> partitions) {
@@ -414,16 +412,16 @@ public class ProcessorTopologyTestDriver {
                 return 0L;
             }
         };
-        // For each store name ...
-        for (String storeName : storeNames) {
-            String topicName = ProcessorStateManager.storeChangelogTopic(applicationId, storeName);
+        // For each store ...
+        for (Map.Entry<String, String> storeAndTopic: storeToChangelogTopic.entrySet()) {
+            String topicName = storeAndTopic.getValue();
             // Set up the restore-state topic ...
             // consumer.subscribe(new TopicPartition(topicName, 1));
             // Set up the partition that matches the ID (which is what ProcessorStateManager expects) ...
             List<PartitionInfo> partitionInfos = new ArrayList<>();
-            partitionInfos.add(new PartitionInfo(topicName, id.partition, null, null, null));
+            partitionInfos.add(new PartitionInfo(topicName, PARTITION_ID, null, null, null));
             consumer.updatePartitions(topicName, partitionInfos);
-            consumer.updateEndOffsets(Collections.singletonMap(new TopicPartition(topicName, id.partition), 0L));
+            consumer.updateEndOffsets(Collections.singletonMap(new TopicPartition(topicName, PARTITION_ID), 0L));
         }
         return consumer;
     }