You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@kafka.apache.org by mj...@apache.org on 2018/03/08 23:44:40 UTC
[kafka] branch 1.1 updated: MINOR: clean stateDirectory in
TopologyTestDriver (#4655)
This is an automated email from the ASF dual-hosted git repository.
mjsax pushed a commit to branch 1.1
in repository https://gitbox.apache.org/repos/asf/kafka.git
The following commit(s) were added to refs/heads/1.1 by this push:
new d3c4456 MINOR: clean stateDirectory in TopologyTestDriver (#4655)
d3c4456 is described below
commit d3c4456e2bec11e802a603145c29f5c190087f40
Author: John Roesler <vv...@users.noreply.github.com>
AuthorDate: Thu Mar 8 15:43:45 2018 -0800
MINOR: clean stateDirectory in TopologyTestDriver (#4655)
Author: John Roeler <jo...@confluent.io>
Reviewers: Bill Bejeck <bi...@confluent.io>, Matthias J. Sax <ma...@confluent.io>, Guozhang Wang <gu...@confluent.io>
---
.../apache/kafka/streams/TopologyTestDriver.java | 4 +-
.../kafka/streams/TopologyTestDriverTest.java | 59 ++++++++++++++++++++++
2 files changed, 62 insertions(+), 1 deletion(-)
diff --git a/streams/test-utils/src/main/java/org/apache/kafka/streams/TopologyTestDriver.java b/streams/test-utils/src/main/java/org/apache/kafka/streams/TopologyTestDriver.java
index a108f22..abcc99d 100644
--- a/streams/test-utils/src/main/java/org/apache/kafka/streams/TopologyTestDriver.java
+++ b/streams/test-utils/src/main/java/org/apache/kafka/streams/TopologyTestDriver.java
@@ -172,6 +172,7 @@ public class TopologyTestDriver {
private StreamTask task;
private GlobalStateUpdateTask globalStateTask;
+ private final StateDirectory stateDirectory;
private final ProcessorTopology processorTopology;
private final MockProducer<byte[], byte[]> producer;
@@ -221,7 +222,7 @@ public class TopologyTestDriver {
};
final MockConsumer<byte[], byte[]> consumer = new MockConsumer<>(OffsetResetStrategy.EARLIEST);
- final StateDirectory stateDirectory = new StateDirectory(streamsConfig, mockTime);
+ stateDirectory = new StateDirectory(streamsConfig, mockTime);
final StreamsMetrics streamsMetrics = new StreamsMetricsImpl(
new Metrics(),
"topology-test-driver-stream-metrics",
@@ -564,6 +565,7 @@ public class TopologyTestDriver {
}
}
captureOutputRecords();
+ stateDirectory.clean();
}
static class MockTime implements Time {
diff --git a/streams/test-utils/src/test/java/org/apache/kafka/streams/TopologyTestDriverTest.java b/streams/test-utils/src/test/java/org/apache/kafka/streams/TopologyTestDriverTest.java
index 17d5e02..b74a754 100644
--- a/streams/test-utils/src/test/java/org/apache/kafka/streams/TopologyTestDriverTest.java
+++ b/streams/test-utils/src/test/java/org/apache/kafka/streams/TopologyTestDriverTest.java
@@ -838,4 +838,63 @@ public class TopologyTestDriverTest {
@Override
public void close() {}
}
+
+ @Test
+ public void shouldCleanUpPersistentStateStoresOnClose() {
+ final Topology topology = new Topology();
+ topology.addSource("sourceProcessor", "input-topic");
+ topology.addProcessor(
+ "storeProcessor",
+ new ProcessorSupplier() {
+ @Override
+ public Processor get() {
+ return new Processor<String, Long>() {
+ private KeyValueStore<String, Long> store;
+
+ @Override
+ public void init(final ProcessorContext context) {
+ //noinspection unchecked
+ this.store = (KeyValueStore<String, Long>) context.getStateStore("storeProcessorStore");
+ }
+
+ @Override
+ public void process(final String key, final Long value) {
+ store.put(key, value);
+ }
+
+ @Override
+ public void punctuate(final long timestamp) {}
+
+ @Override
+ public void close() {}
+ };
+ }
+ },
+ "sourceProcessor"
+ );
+ topology.addStateStore(Stores.keyValueStoreBuilder(Stores.persistentKeyValueStore("storeProcessorStore"), Serdes.String(), Serdes.Long()), "storeProcessor");
+
+ final Properties config = new Properties();
+ config.put(StreamsConfig.APPLICATION_ID_CONFIG, "test-TopologyTestDriver-cleanup");
+ config.put(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG, "dummy:1234");
+ config.put(StreamsConfig.STATE_DIR_CONFIG, TestUtils.tempDirectory().getAbsolutePath());
+ config.put(StreamsConfig.DEFAULT_KEY_SERDE_CLASS_CONFIG, Serdes.String().getClass().getName());
+ config.put(StreamsConfig.DEFAULT_VALUE_SERDE_CLASS_CONFIG, Serdes.Long().getClass().getName());
+
+ {
+ final TopologyTestDriver testDriver = new TopologyTestDriver(topology, config);
+ Assert.assertNull(testDriver.getKeyValueStore("storeProcessorStore").get("a"));
+ testDriver.pipeInput(recordFactory.create("input-topic", "a", 1L));
+ Assert.assertEquals(1L, testDriver.getKeyValueStore("storeProcessorStore").get("a"));
+ testDriver.close();
+ }
+
+ {
+ final TopologyTestDriver testDriver = new TopologyTestDriver(topology, config);
+ Assert.assertNull(
+ "Closing the prior test driver should have cleaned up this store and value.",
+ testDriver.getKeyValueStore("storeProcessorStore").get("a")
+ );
+ }
+ }
}
--
To stop receiving notification emails like this one, please contact
mjsax@apache.org.