You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@kafka.apache.org by gu...@apache.org on 2018/11/20 22:39:21 UTC

[kafka] branch trunk updated: KAFKA-7536: Initialize TopologyTestDriver with non-null topic (#5923)

This is an automated email from the ASF dual-hosted git repository.

guozhang pushed a commit to branch trunk
in repository https://gitbox.apache.org/repos/asf/kafka.git


The following commit(s) were added to refs/heads/trunk by this push:
     new 46e7b13  KAFKA-7536: Initialize TopologyTestDriver with non-null topic (#5923)
46e7b13 is described below

commit 46e7b136e22430340337d3540627a1d685dfa31e
Author: Guozhang Wang <wa...@gmail.com>
AuthorDate: Tue Nov 20 14:39:12 2018 -0800

    KAFKA-7536: Initialize TopologyTestDriver with non-null topic (#5923)
    
    In TopologyTestDriver constructor set non-null topic; and in unit test intentionally turn on caching to verify this case.
    
    Reviewers: Bill Bejeck <bi...@confluent.io>, Matthias J. Sax <ma...@confluent.io>
---
 .../processor/internals/AbstractProcessorContext.java   |  2 +-
 .../org/apache/kafka/streams/TopologyTestDriver.java    |  4 ++--
 .../apache/kafka/streams/TopologyTestDriverTest.java    | 17 +++++++++++++++++
 3 files changed, 20 insertions(+), 3 deletions(-)

diff --git a/streams/src/main/java/org/apache/kafka/streams/processor/internals/AbstractProcessorContext.java b/streams/src/main/java/org/apache/kafka/streams/processor/internals/AbstractProcessorContext.java
index 0753b2a..af8b073 100644
--- a/streams/src/main/java/org/apache/kafka/streams/processor/internals/AbstractProcessorContext.java
+++ b/streams/src/main/java/org/apache/kafka/streams/processor/internals/AbstractProcessorContext.java
@@ -33,7 +33,7 @@ import java.util.Objects;
 
 public abstract class AbstractProcessorContext implements InternalProcessorContext {
 
-    static final String NONEXIST_TOPIC = "__null_topic__";
+    public static final String NONEXIST_TOPIC = "__null_topic__";
     private final TaskId taskId;
     private final String applicationId;
     private final StreamsConfig config;
diff --git a/streams/test-utils/src/main/java/org/apache/kafka/streams/TopologyTestDriver.java b/streams/test-utils/src/main/java/org/apache/kafka/streams/TopologyTestDriver.java
index 2abfd63..a11ae6b 100644
--- a/streams/test-utils/src/main/java/org/apache/kafka/streams/TopologyTestDriver.java
+++ b/streams/test-utils/src/main/java/org/apache/kafka/streams/TopologyTestDriver.java
@@ -325,7 +325,7 @@ public class TopologyTestDriver implements Closeable {
                 new LogContext()
             );
             globalStateTask.initialize();
-            globalProcessorContext.setRecordContext(new ProcessorRecordContext(0L, -1L, -1, null, new RecordHeaders()));
+            globalProcessorContext.setRecordContext(new ProcessorRecordContext(0L, -1L, -1, ProcessorContextImpl.NONEXIST_TOPIC, new RecordHeaders()));
         } else {
             globalStateManager = null;
             globalStateTask = null;
@@ -352,7 +352,7 @@ public class TopologyTestDriver implements Closeable {
             task.initializeStateStores();
             task.initializeTopology();
             context = (InternalProcessorContext) task.context();
-            context.setRecordContext(new ProcessorRecordContext(0L, -1L, -1, null, new RecordHeaders()));
+            context.setRecordContext(new ProcessorRecordContext(0L, -1L, -1, ProcessorContextImpl.NONEXIST_TOPIC, new RecordHeaders()));
         } else {
             task = null;
             context = null;
diff --git a/streams/test-utils/src/test/java/org/apache/kafka/streams/TopologyTestDriverTest.java b/streams/test-utils/src/test/java/org/apache/kafka/streams/TopologyTestDriverTest.java
index bead079..3e95c73 100644
--- a/streams/test-utils/src/test/java/org/apache/kafka/streams/TopologyTestDriverTest.java
+++ b/streams/test-utils/src/test/java/org/apache/kafka/streams/TopologyTestDriverTest.java
@@ -890,6 +890,23 @@ public class TopologyTestDriverTest {
     }
 
     @Test
+    public void shouldAllowPrePopulatingStatesStoresWithCachingEnabled() {
+        final Topology topology = new Topology();
+        topology.addSource("sourceProcessor", "input-topic");
+        topology.addProcessor("aggregator", new CustomMaxAggregatorSupplier(), "sourceProcessor");
+        topology.addStateStore(Stores.keyValueStoreBuilder(
+            Stores.inMemoryKeyValueStore("aggStore"),
+            Serdes.String(),
+            Serdes.Long()).withCachingEnabled(), // intentionally turn on caching to achieve better test coverage
+            "aggregator");
+
+        testDriver = new TopologyTestDriver(topology, config);
+
+        store = testDriver.getKeyValueStore("aggStore");
+        store.put("a", 21L);
+    }
+
+    @Test
     public void shouldCleanUpPersistentStateStoresOnClose() {
         final Topology topology = new Topology();
         topology.addSource("sourceProcessor", "input-topic");