You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@kafka.apache.org by gu...@apache.org on 2018/11/20 22:48:57 UTC
[kafka] branch 2.1 updated: KAFKA-7536: Initialize
TopologyTestDriver with non-null topic (#5923)
This is an automated email from the ASF dual-hosted git repository.
guozhang pushed a commit to branch 2.1
in repository https://gitbox.apache.org/repos/asf/kafka.git
The following commit(s) were added to refs/heads/2.1 by this push:
new 57cb9a7 KAFKA-7536: Initialize TopologyTestDriver with non-null topic (#5923)
57cb9a7 is described below
commit 57cb9a7a3f7b3d2f44e295291fa5303e3ba34af6
Author: Guozhang Wang <wa...@gmail.com>
AuthorDate: Tue Nov 20 14:39:12 2018 -0800
KAFKA-7536: Initialize TopologyTestDriver with non-null topic (#5923)
In TopologyTestDriver constructor set non-null topic; and in unit test intentionally turn on caching to verify this case.
Reviewers: Bill Bejeck <bi...@confluent.io>, Matthias J. Sax <ma...@confluent.io>
---
.../processor/internals/AbstractProcessorContext.java | 2 +-
.../org/apache/kafka/streams/TopologyTestDriver.java | 4 ++--
.../apache/kafka/streams/TopologyTestDriverTest.java | 17 +++++++++++++++++
3 files changed, 20 insertions(+), 3 deletions(-)
diff --git a/streams/src/main/java/org/apache/kafka/streams/processor/internals/AbstractProcessorContext.java b/streams/src/main/java/org/apache/kafka/streams/processor/internals/AbstractProcessorContext.java
index 0753b2a..af8b073 100644
--- a/streams/src/main/java/org/apache/kafka/streams/processor/internals/AbstractProcessorContext.java
+++ b/streams/src/main/java/org/apache/kafka/streams/processor/internals/AbstractProcessorContext.java
@@ -33,7 +33,7 @@ import java.util.Objects;
public abstract class AbstractProcessorContext implements InternalProcessorContext {
- static final String NONEXIST_TOPIC = "__null_topic__";
+ public static final String NONEXIST_TOPIC = "__null_topic__";
private final TaskId taskId;
private final String applicationId;
private final StreamsConfig config;
diff --git a/streams/test-utils/src/main/java/org/apache/kafka/streams/TopologyTestDriver.java b/streams/test-utils/src/main/java/org/apache/kafka/streams/TopologyTestDriver.java
index 2abfd63..a11ae6b 100644
--- a/streams/test-utils/src/main/java/org/apache/kafka/streams/TopologyTestDriver.java
+++ b/streams/test-utils/src/main/java/org/apache/kafka/streams/TopologyTestDriver.java
@@ -325,7 +325,7 @@ public class TopologyTestDriver implements Closeable {
new LogContext()
);
globalStateTask.initialize();
- globalProcessorContext.setRecordContext(new ProcessorRecordContext(0L, -1L, -1, null, new RecordHeaders()));
+ globalProcessorContext.setRecordContext(new ProcessorRecordContext(0L, -1L, -1, ProcessorContextImpl.NONEXIST_TOPIC, new RecordHeaders()));
} else {
globalStateManager = null;
globalStateTask = null;
@@ -352,7 +352,7 @@ public class TopologyTestDriver implements Closeable {
task.initializeStateStores();
task.initializeTopology();
context = (InternalProcessorContext) task.context();
- context.setRecordContext(new ProcessorRecordContext(0L, -1L, -1, null, new RecordHeaders()));
+ context.setRecordContext(new ProcessorRecordContext(0L, -1L, -1, ProcessorContextImpl.NONEXIST_TOPIC, new RecordHeaders()));
} else {
task = null;
context = null;
diff --git a/streams/test-utils/src/test/java/org/apache/kafka/streams/TopologyTestDriverTest.java b/streams/test-utils/src/test/java/org/apache/kafka/streams/TopologyTestDriverTest.java
index 58f6e02..e01e739 100644
--- a/streams/test-utils/src/test/java/org/apache/kafka/streams/TopologyTestDriverTest.java
+++ b/streams/test-utils/src/test/java/org/apache/kafka/streams/TopologyTestDriverTest.java
@@ -890,6 +890,23 @@ public class TopologyTestDriverTest {
}
@Test
+ public void shouldAllowPrePopulatingStatesStoresWithCachingEnabled() {
+ final Topology topology = new Topology();
+ topology.addSource("sourceProcessor", "input-topic");
+ topology.addProcessor("aggregator", new CustomMaxAggregatorSupplier(), "sourceProcessor");
+ topology.addStateStore(Stores.keyValueStoreBuilder(
+ Stores.inMemoryKeyValueStore("aggStore"),
+ Serdes.String(),
+ Serdes.Long()).withCachingEnabled(), // intentionally turn on caching to achieve better test coverage
+ "aggregator");
+
+ testDriver = new TopologyTestDriver(topology, config);
+
+ store = testDriver.getKeyValueStore("aggStore");
+ store.put("a", 21L);
+ }
+
+ @Test
public void shouldCleanUpPersistentStateStoresOnClose() {
final Topology topology = new Topology();
topology.addSource("sourceProcessor", "input-topic");