You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@kafka.apache.org by mj...@apache.org on 2018/03/08 23:44:40 UTC

[kafka] branch 1.1 updated: MINOR: clean stateDirectory in TopologyTestDriver (#4655)

This is an automated email from the ASF dual-hosted git repository.

mjsax pushed a commit to branch 1.1
in repository https://gitbox.apache.org/repos/asf/kafka.git


The following commit(s) were added to refs/heads/1.1 by this push:
     new d3c4456  MINOR: clean stateDirectory in TopologyTestDriver (#4655)
d3c4456 is described below

commit d3c4456e2bec11e802a603145c29f5c190087f40
Author: John Roesler <vv...@users.noreply.github.com>
AuthorDate: Thu Mar 8 15:43:45 2018 -0800

    MINOR: clean stateDirectory in TopologyTestDriver (#4655)
    
    Author: John Roeler <jo...@confluent.io>
    
    Reviewers: Bill Bejeck <bi...@confluent.io>, Matthias J. Sax <ma...@confluent.io>, Guozhang Wang <gu...@confluent.io>
---
 .../apache/kafka/streams/TopologyTestDriver.java   |  4 +-
 .../kafka/streams/TopologyTestDriverTest.java      | 59 ++++++++++++++++++++++
 2 files changed, 62 insertions(+), 1 deletion(-)

diff --git a/streams/test-utils/src/main/java/org/apache/kafka/streams/TopologyTestDriver.java b/streams/test-utils/src/main/java/org/apache/kafka/streams/TopologyTestDriver.java
index a108f22..abcc99d 100644
--- a/streams/test-utils/src/main/java/org/apache/kafka/streams/TopologyTestDriver.java
+++ b/streams/test-utils/src/main/java/org/apache/kafka/streams/TopologyTestDriver.java
@@ -172,6 +172,7 @@ public class TopologyTestDriver {
     private StreamTask task;
     private GlobalStateUpdateTask globalStateTask;
 
+    private final StateDirectory stateDirectory;
     private final ProcessorTopology processorTopology;
     private final MockProducer<byte[], byte[]> producer;
 
@@ -221,7 +222,7 @@ public class TopologyTestDriver {
         };
 
         final MockConsumer<byte[], byte[]> consumer = new MockConsumer<>(OffsetResetStrategy.EARLIEST);
-        final StateDirectory stateDirectory = new StateDirectory(streamsConfig, mockTime);
+        stateDirectory = new StateDirectory(streamsConfig, mockTime);
         final StreamsMetrics streamsMetrics = new StreamsMetricsImpl(
             new Metrics(),
             "topology-test-driver-stream-metrics",
@@ -564,6 +565,7 @@ public class TopologyTestDriver {
             }
         }
         captureOutputRecords();
+        stateDirectory.clean();
     }
 
     static class MockTime implements Time {
diff --git a/streams/test-utils/src/test/java/org/apache/kafka/streams/TopologyTestDriverTest.java b/streams/test-utils/src/test/java/org/apache/kafka/streams/TopologyTestDriverTest.java
index 17d5e02..b74a754 100644
--- a/streams/test-utils/src/test/java/org/apache/kafka/streams/TopologyTestDriverTest.java
+++ b/streams/test-utils/src/test/java/org/apache/kafka/streams/TopologyTestDriverTest.java
@@ -838,4 +838,63 @@ public class TopologyTestDriverTest {
         @Override
         public void close() {}
     }
+
+    @Test
+    public void shouldCleanUpPersistentStateStoresOnClose() {
+        final Topology topology = new Topology();
+        topology.addSource("sourceProcessor", "input-topic");
+        topology.addProcessor(
+            "storeProcessor",
+            new ProcessorSupplier() {
+                @Override
+                public Processor get() {
+                    return new Processor<String, Long>() {
+                        private KeyValueStore<String, Long> store;
+
+                        @Override
+                        public void init(final ProcessorContext context) {
+                            //noinspection unchecked
+                            this.store = (KeyValueStore<String, Long>) context.getStateStore("storeProcessorStore");
+                        }
+
+                        @Override
+                        public void process(final String key, final Long value) {
+                            store.put(key, value);
+                        }
+
+                        @Override
+                        public void punctuate(final long timestamp) {}
+
+                        @Override
+                        public void close() {}
+                    };
+                }
+            },
+            "sourceProcessor"
+        );
+        topology.addStateStore(Stores.keyValueStoreBuilder(Stores.persistentKeyValueStore("storeProcessorStore"), Serdes.String(), Serdes.Long()), "storeProcessor");
+
+        final Properties config = new Properties();
+        config.put(StreamsConfig.APPLICATION_ID_CONFIG, "test-TopologyTestDriver-cleanup");
+        config.put(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG, "dummy:1234");
+        config.put(StreamsConfig.STATE_DIR_CONFIG, TestUtils.tempDirectory().getAbsolutePath());
+        config.put(StreamsConfig.DEFAULT_KEY_SERDE_CLASS_CONFIG, Serdes.String().getClass().getName());
+        config.put(StreamsConfig.DEFAULT_VALUE_SERDE_CLASS_CONFIG, Serdes.Long().getClass().getName());
+
+        {
+            final TopologyTestDriver testDriver = new TopologyTestDriver(topology, config);
+            Assert.assertNull(testDriver.getKeyValueStore("storeProcessorStore").get("a"));
+            testDriver.pipeInput(recordFactory.create("input-topic", "a", 1L));
+            Assert.assertEquals(1L, testDriver.getKeyValueStore("storeProcessorStore").get("a"));
+            testDriver.close();
+        }
+
+        {
+            final TopologyTestDriver testDriver = new TopologyTestDriver(topology, config);
+            Assert.assertNull(
+                "Closing the prior test driver should have cleaned up this store and value.",
+                testDriver.getKeyValueStore("storeProcessorStore").get("a")
+            );
+        }
+    }
 }

-- 
To stop receiving notification emails like this one, please contact
mjsax@apache.org.