You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@kafka.apache.org by mj...@apache.org on 2018/11/27 21:09:52 UTC

[kafka] branch trunk updated: KAFKA-7367: Streams should not create state store directories unless they are needed (#5696)

This is an automated email from the ASF dual-hosted git repository.

mjsax pushed a commit to branch trunk
in repository https://gitbox.apache.org/repos/asf/kafka.git


The following commit(s) were added to refs/heads/trunk by this push:
     new de24d4a  KAFKA-7367: Streams should not create state store directories unless they are needed (#5696)
de24d4a is described below

commit de24d4a459ccfac623e71722f8df8b1e99f2ad43
Author: Kamal Chandraprakash <ka...@gmail.com>
AuthorDate: Wed Nov 28 02:39:44 2018 +0530

    KAFKA-7367: Streams should not create state store directories unless they are needed (#5696)
    
    * KAFKA-7367: Ensure stateless topologies don't require disk access
    
    * KAFKA-7367: Streams should not create state store directories unless they are needed.
    
    * Addressed the review comments.
    
    * Addressed the review-2 comments.
    
    * Fixed FileAlreadyExistsException
    
    * Addressed the review-3 comments.
    
    * Resolved the conflicts.
---
 .../org/apache/kafka/streams/KafkaStreams.java     |  16 +-
 .../processor/internals/ProcessorTopology.java     |  18 +++
 .../processor/internals/StateDirectory.java        |  37 +++--
 .../org/apache/kafka/streams/KafkaStreamsTest.java | 161 ++++++++++++++++++++-
 .../integration/RestoreIntegrationTest.java        |   4 +-
 .../processor/internals/AbstractTaskTest.java      |   2 +-
 .../internals/GlobalStateManagerImplTest.java      |  12 +-
 .../internals/GlobalStreamThreadTest.java          |   4 +-
 .../internals/ProcessorStateManagerTest.java       |   4 +-
 .../processor/internals/ProcessorTopologyTest.java |  70 +++++++--
 .../processor/internals/StandbyTaskTest.java       |   2 +-
 .../processor/internals/StateDirectoryTest.java    |  83 +++++++----
 .../processor/internals/StreamTaskTest.java        |   4 +-
 .../processor/internals/StreamThreadTest.java      |   2 +-
 .../StreamThreadStateStoreProviderTest.java        |   2 +-
 .../apache/kafka/streams/TopologyTestDriver.java   |   4 +-
 .../kafka/streams/TopologyTestDriverTest.java      |  37 ++++-
 17 files changed, 375 insertions(+), 87 deletions(-)

diff --git a/streams/src/main/java/org/apache/kafka/streams/KafkaStreams.java b/streams/src/main/java/org/apache/kafka/streams/KafkaStreams.java
index bbda11d..c29b7bc 100644
--- a/streams/src/main/java/org/apache/kafka/streams/KafkaStreams.java
+++ b/streams/src/main/java/org/apache/kafka/streams/KafkaStreams.java
@@ -644,12 +644,6 @@ public class KafkaStreams implements AutoCloseable {
         final LogContext logContext = new LogContext(String.format("stream-client [%s] ", clientId));
         this.log = logContext.logger(getClass());
 
-        try {
-            stateDirectory = new StateDirectory(config, time);
-        } catch (final ProcessorStateException fatal) {
-            throw new StreamsException(fatal);
-        }
-
         final MetricConfig metricConfig = new MetricConfig()
             .samples(config.getInt(StreamsConfig.METRICS_NUM_SAMPLES_CONFIG))
             .recordLevel(Sensor.RecordingLevel.forName(config.getString(StreamsConfig.METRICS_RECORDING_LEVEL_CONFIG)))
@@ -664,7 +658,7 @@ public class KafkaStreams implements AutoCloseable {
         internalTopologyBuilder.rewriteTopology(config);
 
         // sanity check to fail-fast in case we cannot build a ProcessorTopology due to an exception
-        internalTopologyBuilder.build();
+        final ProcessorTopology taskTopology = internalTopologyBuilder.build();
 
         streamsMetadataState = new StreamsMetadataState(
                 internalTopologyBuilder,
@@ -680,6 +674,14 @@ public class KafkaStreams implements AutoCloseable {
         }
         final ProcessorTopology globalTaskTopology = internalTopologyBuilder.buildGlobalStateTopology();
         final long cacheSizePerThread = totalCacheSize / (threads.length + (globalTaskTopology == null ? 0 : 1));
+        final boolean createStateDirectory = taskTopology.hasPersistentLocalStore() ||
+                (globalTaskTopology != null && globalTaskTopology.hasPersistentGlobalStore());
+
+        try {
+            stateDirectory = new StateDirectory(config, time, createStateDirectory);
+        } catch (final ProcessorStateException fatal) {
+            throw new StreamsException(fatal);
+        }
 
         final StateRestoreListener delegatingStateRestoreListener = new DelegatingStateRestoreListener();
         GlobalStreamThread.State globalThreadState = null;
diff --git a/streams/src/main/java/org/apache/kafka/streams/processor/internals/ProcessorTopology.java b/streams/src/main/java/org/apache/kafka/streams/processor/internals/ProcessorTopology.java
index 8fcbbb4..57af1f3 100644
--- a/streams/src/main/java/org/apache/kafka/streams/processor/internals/ProcessorTopology.java
+++ b/streams/src/main/java/org/apache/kafka/streams/processor/internals/ProcessorTopology.java
@@ -152,6 +152,24 @@ public class ProcessorTopology {
         return repartitionTopics.contains(topic);
     }
 
+    public boolean hasPersistentLocalStore() {
+        for (final StateStore store : stateStores) {
+            if (store.persistent()) {
+                return true;
+            }
+        }
+        return false;
+    }
+
+    public boolean hasPersistentGlobalStore() {
+        for (final StateStore store : globalStateStores) {
+            if (store.persistent()) {
+                return true;
+            }
+        }
+        return false;
+    }
+
     private String childrenToString(final String indent, final List<ProcessorNode<?, ?>> children) {
         if (children == null || children.isEmpty()) {
             return "";
diff --git a/streams/src/main/java/org/apache/kafka/streams/processor/internals/StateDirectory.java b/streams/src/main/java/org/apache/kafka/streams/processor/internals/StateDirectory.java
index a3227ca..f5c4c31 100644
--- a/streams/src/main/java/org/apache/kafka/streams/processor/internals/StateDirectory.java
+++ b/streams/src/main/java/org/apache/kafka/streams/processor/internals/StateDirectory.java
@@ -26,7 +26,6 @@ import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
 import java.io.File;
-import java.io.FileFilter;
 import java.io.IOException;
 import java.nio.channels.FileChannel;
 import java.nio.channels.FileLock;
@@ -50,6 +49,7 @@ public class StateDirectory {
     private static final Logger log = LoggerFactory.getLogger(StateDirectory.class);
 
     private final File stateDir;
+    private final boolean createStateDirectory;
     private final HashMap<TaskId, FileChannel> channels = new HashMap<>();
     private final HashMap<TaskId, LockAndOwner> locks = new HashMap<>();
     private final Time time;
@@ -71,19 +71,21 @@ public class StateDirectory {
      * Ensures that the state base directory as well as the application's sub-directory are created.
      *
      * @throws ProcessorStateException if the base state directory or application state directory does not exist
-     *                                 and could not be created
+     *                                 and could not be created when createStateDirectory is enabled.
      */
     public StateDirectory(final StreamsConfig config,
-                          final Time time) {
+                          final Time time,
+                          final boolean createStateDirectory) {
         this.time = time;
+        this.createStateDirectory = createStateDirectory;
         final String stateDirName = config.getString(StreamsConfig.STATE_DIR_CONFIG);
         final File baseDir = new File(stateDirName);
-        if (!baseDir.exists() && !baseDir.mkdirs()) {
+        if (this.createStateDirectory && !baseDir.exists() && !baseDir.mkdirs()) {
             throw new ProcessorStateException(
                 String.format("base state directory [%s] doesn't exist and couldn't be created", stateDirName));
         }
         stateDir = new File(baseDir, config.getString(StreamsConfig.APPLICATION_ID_CONFIG));
-        if (!stateDir.exists() && !stateDir.mkdir()) {
+        if (this.createStateDirectory && !stateDir.exists() && !stateDir.mkdir()) {
             throw new ProcessorStateException(
                 String.format("state directory [%s] doesn't exist and couldn't be created", stateDir.getPath()));
         }
@@ -96,7 +98,7 @@ public class StateDirectory {
      */
     public File directoryForTask(final TaskId taskId) {
         final File taskDir = new File(stateDir, taskId.toString());
-        if (!taskDir.exists() && !taskDir.mkdir()) {
+        if (createStateDirectory && !taskDir.exists() && !taskDir.mkdir()) {
             throw new ProcessorStateException(
                 String.format("task directory [%s] doesn't exist and couldn't be created", taskDir.getPath()));
         }
@@ -110,7 +112,7 @@ public class StateDirectory {
      */
     File globalStateDir() {
         final File dir = new File(stateDir, "global");
-        if (!dir.exists() && !dir.mkdir()) {
+        if (createStateDirectory && !dir.exists() && !dir.mkdir()) {
             throw new ProcessorStateException(
                 String.format("global state directory [%s] doesn't exist and couldn't be created", dir.getPath()));
         }
@@ -128,6 +130,9 @@ public class StateDirectory {
      * @throws IOException
      */
     synchronized boolean lock(final TaskId taskId) throws IOException {
+        if (!createStateDirectory) {
+            return true;
+        }
 
         final File lockFile;
         // we already have the lock so bail out here
@@ -169,6 +174,10 @@ public class StateDirectory {
     }
 
     synchronized boolean lockGlobalState() throws IOException {
+        if (!createStateDirectory) {
+            return true;
+        }
+
         if (globalStateLock != null) {
             log.trace("{} Found cached state dir lock for the global task", logPrefix());
             return true;
@@ -234,7 +243,9 @@ public class StateDirectory {
             throw new StreamsException(e);
         }
         try {
-            Utils.delete(globalStateDir().getAbsoluteFile());
+            if (stateDir.exists()) {
+                Utils.delete(globalStateDir().getAbsoluteFile());
+            }
         } catch (final IOException e) {
             log.error("{} Failed to delete global state directory due to an unexpected exception", logPrefix(), e);
             throw new StreamsException(e);
@@ -320,12 +331,8 @@ public class StateDirectory {
      * @return The list of all the existing local directories for stream tasks
      */
     File[] listTaskDirectories() {
-        return stateDir.listFiles(new FileFilter() {
-            @Override
-            public boolean accept(final File pathname) {
-                return pathname.isDirectory() && PATH_NAME.matcher(pathname.getName()).matches();
-            }
-        });
+        return !stateDir.exists() ? new File[0] :
+                stateDir.listFiles(pathname -> pathname.isDirectory() && PATH_NAME.matcher(pathname.getName()).matches());
     }
 
     private FileChannel getOrCreateFileChannel(final TaskId taskId,
@@ -344,6 +351,4 @@ public class StateDirectory {
         }
     }
 
-
-
 }
diff --git a/streams/src/test/java/org/apache/kafka/streams/KafkaStreamsTest.java b/streams/src/test/java/org/apache/kafka/streams/KafkaStreamsTest.java
index fde5bff..3ca0dcf 100644
--- a/streams/src/test/java/org/apache/kafka/streams/KafkaStreamsTest.java
+++ b/streams/src/test/java/org/apache/kafka/streams/KafkaStreamsTest.java
@@ -16,6 +16,9 @@
  */
 package org.apache.kafka.streams;
 
+import java.io.IOException;
+import java.nio.file.Files;
+import java.nio.file.Path;
 import java.time.Duration;
 import org.apache.kafka.clients.CommonClientConfigs;
 import org.apache.kafka.clients.consumer.ConsumerConfig;
@@ -27,27 +30,37 @@ import org.apache.kafka.common.config.ConfigException;
 import org.apache.kafka.common.metrics.Sensor;
 import org.apache.kafka.common.network.Selectable;
 import org.apache.kafka.common.serialization.Serdes;
+import org.apache.kafka.common.serialization.StringDeserializer;
 import org.apache.kafka.common.serialization.StringSerializer;
+import org.apache.kafka.common.utils.Utils;
 import org.apache.kafka.streams.errors.StreamsException;
 import org.apache.kafka.streams.integration.utils.EmbeddedKafkaCluster;
 import org.apache.kafka.streams.integration.utils.IntegrationTestUtils;
 import org.apache.kafka.streams.kstream.Consumed;
+import org.apache.kafka.streams.processor.AbstractProcessor;
 import org.apache.kafka.streams.processor.ThreadMetadata;
 import org.apache.kafka.streams.processor.internals.GlobalStreamThread;
 import org.apache.kafka.streams.processor.internals.StreamThread;
+import org.apache.kafka.streams.state.KeyValueStore;
+import org.apache.kafka.streams.state.StoreBuilder;
+import org.apache.kafka.streams.state.Stores;
 import org.apache.kafka.test.IntegrationTest;
 import org.apache.kafka.test.MockClientSupplier;
 import org.apache.kafka.test.MockMetricsReporter;
+import org.apache.kafka.test.MockProcessorSupplier;
 import org.apache.kafka.test.MockStateRestoreListener;
 import org.apache.kafka.test.TestUtils;
 import org.junit.After;
 import org.junit.Assert;
 import org.junit.Before;
 import org.junit.ClassRule;
+import org.junit.Rule;
 import org.junit.Test;
 import org.junit.experimental.categories.Category;
+import org.junit.rules.TestName;
 
 import java.io.File;
+import java.util.Collection;
 import java.util.Collections;
 import java.util.HashMap;
 import java.util.List;
@@ -57,6 +70,7 @@ import java.util.Set;
 import java.util.concurrent.CountDownLatch;
 import java.util.concurrent.TimeUnit;
 import java.util.concurrent.atomic.AtomicBoolean;
+import java.util.stream.Collectors;
 
 import static java.util.Arrays.asList;
 import static org.junit.Assert.assertEquals;
@@ -79,6 +93,9 @@ public class KafkaStreamsTest {
     private KafkaStreams globalStreams;
     private Properties props;
 
+    @Rule
+    public TestName testName = new TestName();
+
     @Before
     public void before() {
         props = new Properties();
@@ -125,8 +142,8 @@ public class KafkaStreamsTest {
         final StateListenerStub stateListener = new StateListenerStub();
         globalStreams.setStateListener(stateListener);
 
-        Assert.assertEquals(globalStreams.state(), KafkaStreams.State.CREATED);
-        Assert.assertEquals(stateListener.numChanges, 0);
+        Assert.assertEquals(KafkaStreams.State.CREATED, globalStreams.state());
+        Assert.assertEquals(0, stateListener.numChanges);
 
         globalStreams.start();
         TestUtils.waitForCondition(
@@ -136,7 +153,7 @@ public class KafkaStreamsTest {
 
         globalStreams.close();
 
-        Assert.assertEquals(globalStreams.state(), KafkaStreams.State.NOT_RUNNING);
+        Assert.assertEquals(KafkaStreams.State.NOT_RUNNING, globalStreams.state());
     }
 
     @Test
@@ -158,7 +175,7 @@ public class KafkaStreamsTest {
         builder.globalTable("anyTopic");
         final List<Node> nodes = asList(new Node(0, "localhost", 8121));
         final Cluster cluster = new Cluster("mockClusterId", nodes,
-                                            Collections.emptySet(), Collections.<String>emptySet(),
+                                            Collections.emptySet(), Collections.emptySet(),
                                             Collections.emptySet(), nodes.get(0));
         final MockClientSupplier clientSupplier = new MockClientSupplier();
         clientSupplier.setClusterForAdminClient(cluster);
@@ -349,6 +366,7 @@ public class KafkaStreamsTest {
 
         try {
             globalStreams.start();
+            fail("Should throw an IllegalStateException");
         } catch (final IllegalStateException e) {
             // this is ok
         } finally {
@@ -575,6 +593,141 @@ public class KafkaStreamsTest {
         }
     }
 
+    @Test
+    public void statelessTopologyShouldNotCreateStateDirectory() throws Exception {
+        final String inputTopic = testName.getMethodName() + "-input";
+        final String outputTopic = testName.getMethodName() + "-output";
+        CLUSTER.createTopics(inputTopic, outputTopic);
+
+        final Topology topology = new Topology();
+        topology.addSource("source", Serdes.String().deserializer(), Serdes.String().deserializer(), inputTopic)
+                .addProcessor("process", () -> new AbstractProcessor<String, String>() {
+                    @Override
+                    public void process(final String key, final String value) {
+                        if (value.length() % 2 == 0) {
+                            context().forward(key, key + value);
+                        }
+                    }
+                }, "source")
+                .addSink("sink", outputTopic, new StringSerializer(), new StringSerializer(), "process");
+        startStreamsAndCheckDirExists(topology, Collections.singleton(inputTopic), outputTopic, false);
+    }
+
+    @Test
+    public void inMemoryStatefulTopologyShouldNotCreateStateDirectory() throws Exception {
+        final String inputTopic = testName.getMethodName() + "-input";
+        final String outputTopic = testName.getMethodName() + "-output";
+        final String globalTopicName = testName.getMethodName() + "-global";
+        final String storeName = testName.getMethodName() + "-counts";
+        final String globalStoreName = testName.getMethodName() + "-globalStore";
+        final Topology topology = getStatefulTopology(inputTopic, outputTopic, globalTopicName, storeName, globalStoreName, false);
+        startStreamsAndCheckDirExists(topology, asList(inputTopic, globalTopicName), outputTopic, false);
+    }
+
+    @Test
+    public void statefulTopologyShouldCreateStateDirectory() throws Exception {
+        final String inputTopic = testName.getMethodName() + "-input";
+        final String outputTopic = testName.getMethodName() + "-output";
+        final String globalTopicName = testName.getMethodName() + "-global";
+        final String storeName = testName.getMethodName() + "-counts";
+        final String globalStoreName = testName.getMethodName() + "-globalStore";
+        final Topology topology = getStatefulTopology(inputTopic, outputTopic, globalTopicName, storeName, globalStoreName, true);
+        startStreamsAndCheckDirExists(topology, asList(inputTopic, globalTopicName), outputTopic, true);
+    }
+
+    @SuppressWarnings("unchecked")
+    private Topology getStatefulTopology(final String inputTopic,
+                                         final String outputTopic,
+                                         final String globalTopicName,
+                                         final String storeName,
+                                         final String globalStoreName,
+                                         final boolean isPersistentStore) throws Exception {
+        CLUSTER.createTopics(inputTopic, outputTopic, globalTopicName);
+        final StoreBuilder<KeyValueStore<String, Long>> storeBuilder = Stores.keyValueStoreBuilder(isPersistentStore ?
+                Stores.persistentKeyValueStore(storeName) : Stores.inMemoryKeyValueStore(storeName), 
+                Serdes.String(), Serdes.Long());
+        final Topology topology = new Topology();
+        topology.addSource("source", Serdes.String().deserializer(), Serdes.String().deserializer(), inputTopic)
+                .addProcessor("process", () -> new AbstractProcessor<String, String>() {
+                    @Override
+                    public void process(final String key, final String value) {
+                        final KeyValueStore<String, Long> kvStore =
+                                (KeyValueStore<String, Long>) context().getStateStore(storeName);
+                        kvStore.put(key, 5L);
+
+                        context().forward(key, "5");
+                        context().commit();
+                    }
+                }, "source")
+                .addStateStore(storeBuilder, "process")
+                .addSink("sink", outputTopic, new StringSerializer(), new StringSerializer(), "process");
+
+        final StoreBuilder<KeyValueStore<String, String>> globalStoreBuilder = Stores.keyValueStoreBuilder(
+                isPersistentStore ? Stores.persistentKeyValueStore(globalStoreName) : Stores.inMemoryKeyValueStore(globalStoreName), 
+                Serdes.String(), Serdes.String()).withLoggingDisabled();
+        topology.addGlobalStore(globalStoreBuilder,
+                "global",
+                Serdes.String().deserializer(),
+                Serdes.String().deserializer(),
+                globalTopicName,
+                globalTopicName + "-processor",
+                new MockProcessorSupplier());
+        return topology;
+    }
+
+    private void startStreamsAndCheckDirExists(final Topology topology,
+                                               final Collection<String> inputTopics,
+                                               final String outputTopic,
+                                               final boolean shouldFilesExist) throws Exception {
+        final File baseDir = new File(TestUtils.IO_TMP_DIR + File.separator + "kafka-" + TestUtils.randomString(5));
+        final Path basePath = baseDir.toPath();
+        if (!baseDir.exists()) {
+            Files.createDirectory(basePath);
+        }
+        // changing the path of state directory to make sure that it should not clash with other test cases.
+        final Properties localProps = new Properties();
+        localProps.putAll(props);
+        localProps.put(StreamsConfig.STATE_DIR_CONFIG, baseDir.getAbsolutePath());
+
+        final KafkaStreams streams = new KafkaStreams(topology, localProps);
+        streams.start();
+
+        for (final String topic : inputTopics) {
+            IntegrationTestUtils.produceKeyValuesSynchronouslyWithTimestamp(topic,
+                    Collections.singletonList(new KeyValue<>("A", "A")),
+                    TestUtils.producerConfig(
+                            CLUSTER.bootstrapServers(),
+                            StringSerializer.class,
+                            StringSerializer.class,
+                            new Properties()),
+                    System.currentTimeMillis());
+        }
+
+        IntegrationTestUtils.readKeyValues(outputTopic,
+                TestUtils.consumerConfig(
+                        CLUSTER.bootstrapServers(),
+                        outputTopic + "-group",
+                        StringDeserializer.class,
+                        StringDeserializer.class),
+                5000, 1);
+
+        try {
+            final List<Path> files = Files.find(basePath, 999, (p, bfa) -> !p.equals(basePath)).collect(Collectors.toList());
+            if (shouldFilesExist && files.isEmpty()) {
+                Assert.fail("Files should have existed, but it didn't: " + files);
+            }
+            if (!shouldFilesExist && !files.isEmpty()) {
+                Assert.fail("Files should not have existed, but it did: " + files);
+            }
+        } catch (final IOException e) {
+            Assert.fail("Couldn't read the state directory : " + baseDir.getPath());
+        } finally {
+            streams.close();
+            streams.cleanUp();
+            Utils.delete(baseDir);
+        }
+    }
+
     private void verifyCleanupStateDir(final String appDir, final File oldTaskDir) throws InterruptedException {
         final File taskDir = new File(appDir, "0_0");
         TestUtils.waitForCondition(
diff --git a/streams/src/test/java/org/apache/kafka/streams/integration/RestoreIntegrationTest.java b/streams/src/test/java/org/apache/kafka/streams/integration/RestoreIntegrationTest.java
index 81938f8..c3c45db 100644
--- a/streams/src/test/java/org/apache/kafka/streams/integration/RestoreIntegrationTest.java
+++ b/streams/src/test/java/org/apache/kafka/streams/integration/RestoreIntegrationTest.java
@@ -132,7 +132,7 @@ public class RestoreIntegrationTest {
         createStateForRestoration(INPUT_STREAM);
         setCommittedOffset(INPUT_STREAM, offsetLimitDelta);
 
-        final StateDirectory stateDirectory = new StateDirectory(new StreamsConfig(props), new MockTime());
+        final StateDirectory stateDirectory = new StateDirectory(new StreamsConfig(props), new MockTime(), true);
         new OffsetCheckpoint(new File(stateDirectory.directoryForTask(new TaskId(0, 0)), ".checkpoint"))
                 .write(Collections.singletonMap(new TopicPartition(INPUT_STREAM, 0), (long) offsetCheckpointed));
         new OffsetCheckpoint(new File(stateDirectory.directoryForTask(new TaskId(0, 1)), ".checkpoint"))
@@ -199,7 +199,7 @@ public class RestoreIntegrationTest {
         createStateForRestoration(APPID + "-store-changelog");
         createStateForRestoration(INPUT_STREAM);
 
-        final StateDirectory stateDirectory = new StateDirectory(new StreamsConfig(props), new MockTime());
+        final StateDirectory stateDirectory = new StateDirectory(new StreamsConfig(props), new MockTime(), true);
         new OffsetCheckpoint(new File(stateDirectory.directoryForTask(new TaskId(0, 0)), ".checkpoint"))
                 .write(Collections.singletonMap(new TopicPartition(APPID + "-store-changelog", 0), (long) offsetCheckpointed));
         new OffsetCheckpoint(new File(stateDirectory.directoryForTask(new TaskId(0, 1)), ".checkpoint"))
diff --git a/streams/src/test/java/org/apache/kafka/streams/processor/internals/AbstractTaskTest.java b/streams/src/test/java/org/apache/kafka/streams/processor/internals/AbstractTaskTest.java
index 4ed44be..b3afa24 100644
--- a/streams/src/test/java/org/apache/kafka/streams/processor/internals/AbstractTaskTest.java
+++ b/streams/src/test/java/org/apache/kafka/streams/processor/internals/AbstractTaskTest.java
@@ -152,7 +152,7 @@ public class AbstractTaskTest {
         expect(store4.name()).andReturn(storeName4).anyTimes();
         EasyMock.replay(store4);
 
-        final StateDirectory stateDirectory = new StateDirectory(streamsConfig, new MockTime());
+        final StateDirectory stateDirectory = new StateDirectory(streamsConfig, new MockTime(), true);
         final AbstractTask task = createTask(
             consumer,
             new HashMap<StateStore, String>() {
diff --git a/streams/src/test/java/org/apache/kafka/streams/processor/internals/GlobalStateManagerImplTest.java b/streams/src/test/java/org/apache/kafka/streams/processor/internals/GlobalStateManagerImplTest.java
index 8789e2b..8b02b3e 100644
--- a/streams/src/test/java/org/apache/kafka/streams/processor/internals/GlobalStateManagerImplTest.java
+++ b/streams/src/test/java/org/apache/kafka/streams/processor/internals/GlobalStateManagerImplTest.java
@@ -112,7 +112,7 @@ public class GlobalStateManagerImplTest {
                 put(StreamsConfig.STATE_DIR_CONFIG, TestUtils.tempDirectory().getPath());
             }
         });
-        stateDirectory = new StateDirectory(streamsConfig, time);
+        stateDirectory = new StateDirectory(streamsConfig, time, true);
         consumer = new MockConsumer<>(OffsetResetStrategy.NONE);
         stateManager = new GlobalStateManagerImpl(
             new LogContext("test"),
@@ -139,7 +139,7 @@ public class GlobalStateManagerImplTest {
 
     @Test(expected = LockException.class)
     public void shouldThrowLockExceptionIfCantGetLock() throws IOException {
-        final StateDirectory stateDir = new StateDirectory(streamsConfig, time);
+        final StateDirectory stateDir = new StateDirectory(streamsConfig, time, true);
         try {
             stateDir.lockGlobalState();
             stateManager.initialize();
@@ -357,8 +357,8 @@ public class GlobalStateManagerImplTest {
     @Test
     public void shouldUnlockGlobalStateDirectoryOnClose() throws IOException {
         stateManager.initialize();
-        stateManager.close(Collections.<TopicPartition, Long>emptyMap());
-        final StateDirectory stateDir = new StateDirectory(streamsConfig, new MockTime());
+        stateManager.close(Collections.emptyMap());
+        final StateDirectory stateDir = new StateDirectory(streamsConfig, new MockTime(), true);
         try {
             // should be able to get the lock now as it should've been released in close
             assertTrue(stateDir.lockGlobalState());
@@ -419,7 +419,7 @@ public class GlobalStateManagerImplTest {
         } catch (final StreamsException e) {
             // expected
         }
-        final StateDirectory stateDir = new StateDirectory(streamsConfig, new MockTime());
+        final StateDirectory stateDir = new StateDirectory(streamsConfig, new MockTime(), true);
         try {
             // should be able to get the lock now as it should've been released
             assertTrue(stateDir.lockGlobalState());
@@ -511,7 +511,7 @@ public class GlobalStateManagerImplTest {
             new LogContext("mock"),
             topology,
             consumer,
-            new StateDirectory(streamsConfig, time) {
+            new StateDirectory(streamsConfig, time, true) {
                 @Override
                 public boolean lockGlobalState() throws IOException {
                     throw new IOException("KABOOM!");
diff --git a/streams/src/test/java/org/apache/kafka/streams/processor/internals/GlobalStreamThreadTest.java b/streams/src/test/java/org/apache/kafka/streams/processor/internals/GlobalStreamThreadTest.java
index 37a6fdb..387179b 100644
--- a/streams/src/test/java/org/apache/kafka/streams/processor/internals/GlobalStreamThreadTest.java
+++ b/streams/src/test/java/org/apache/kafka/streams/processor/internals/GlobalStreamThreadTest.java
@@ -107,7 +107,7 @@ public class GlobalStreamThreadTest {
         globalStreamThread = new GlobalStreamThread(builder.rewriteTopology(config).buildGlobalStateTopology(),
                                                     config,
                                                     mockConsumer,
-                                                    new StateDirectory(config, time),
+                                                    new StateDirectory(config, time, true),
                                                     0,
                                                     new Metrics(),
                                                     new MockTime(),
@@ -140,7 +140,7 @@ public class GlobalStreamThreadTest {
         globalStreamThread = new GlobalStreamThread(builder.buildGlobalStateTopology(),
                                                     config,
                                                     mockConsumer,
-                                                    new StateDirectory(config, time),
+                                                    new StateDirectory(config, time, true),
                                                     0,
                                                     new Metrics(),
                                                     new MockTime(),
diff --git a/streams/src/test/java/org/apache/kafka/streams/processor/internals/ProcessorStateManagerTest.java b/streams/src/test/java/org/apache/kafka/streams/processor/internals/ProcessorStateManagerTest.java
index fbcb2c8..3d91ee0 100644
--- a/streams/src/test/java/org/apache/kafka/streams/processor/internals/ProcessorStateManagerTest.java
+++ b/streams/src/test/java/org/apache/kafka/streams/processor/internals/ProcessorStateManagerTest.java
@@ -98,7 +98,7 @@ public class ProcessorStateManagerTest {
                 put(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG, "dummy:1234");
                 put(StreamsConfig.STATE_DIR_CONFIG, baseDir.getPath());
             }
-        }), new MockTime());
+        }), new MockTime(), true);
         checkpointFile = new File(stateDirectory.directoryForTask(taskId), ProcessorStateManager.CHECKPOINT_FILE_NAME);
         checkpoint = new OffsetCheckpoint(checkpointFile);
     }
@@ -565,7 +565,7 @@ public class ProcessorStateManagerTest {
     }
 
     @Test
-    public void shouldFlushAllStoresEvenIfStoreThrowsExcepiton() throws IOException {
+    public void shouldFlushAllStoresEvenIfStoreThrowsException() throws IOException {
         final ProcessorStateManager stateManager = new ProcessorStateManager(
             taskId,
             Collections.singleton(changelogTopicPartition),
diff --git a/streams/src/test/java/org/apache/kafka/streams/processor/internals/ProcessorTopologyTest.java b/streams/src/test/java/org/apache/kafka/streams/processor/internals/ProcessorTopologyTest.java
index 587cae2..11050fe 100644
--- a/streams/src/test/java/org/apache/kafka/streams/processor/internals/ProcessorTopologyTest.java
+++ b/streams/src/test/java/org/apache/kafka/streams/processor/internals/ProcessorTopologyTest.java
@@ -38,7 +38,9 @@ import org.apache.kafka.streams.processor.ProcessorSupplier;
 import org.apache.kafka.streams.processor.StreamPartitioner;
 import org.apache.kafka.streams.processor.TimestampExtractor;
 import org.apache.kafka.streams.processor.To;
+import org.apache.kafka.streams.state.KeyValueBytesStoreSupplier;
 import org.apache.kafka.streams.state.KeyValueStore;
+import org.apache.kafka.streams.state.StoreBuilder;
 import org.apache.kafka.streams.state.Stores;
 import org.apache.kafka.streams.test.ConsumerRecordFactory;
 import org.apache.kafka.test.MockProcessorSupplier;
@@ -54,8 +56,10 @@ import static org.hamcrest.CoreMatchers.containsString;
 import static org.hamcrest.CoreMatchers.equalTo;
 import static org.hamcrest.MatcherAssert.assertThat;
 import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertFalse;
 import static org.junit.Assert.assertNotNull;
 import static org.junit.Assert.assertNull;
+import static org.junit.Assert.assertTrue;
 
 public class ProcessorTopologyTest {
 
@@ -367,6 +371,58 @@ public class ProcessorTopologyTest {
         assertNextOutputRecord(OUTPUT_TOPIC_1, "key3", "value3", HEADERS, 30L);
     }
 
+    @Test
+    public void statelessTopologyShouldNotHavePersistentStore() {
+        final TopologyWrapper topology = new TopologyWrapper();
+        final ProcessorTopology processorTopology = topology.getInternalBuilder("anyAppId").build();
+        assertFalse(processorTopology.hasPersistentLocalStore());
+        assertFalse(processorTopology.hasPersistentGlobalStore());
+    }
+
+    @Test
+    public void inMemoryStoreShouldNotResultInPersistentLocalStore() {
+        final ProcessorTopology processorTopology = createLocalStoreTopology(Stores.inMemoryKeyValueStore("my-store"));
+        assertFalse(processorTopology.hasPersistentLocalStore());
+    }
+
+    @Test
+    public void persistentLocalStoreShouldBeDetected() {
+        final ProcessorTopology processorTopology = createLocalStoreTopology(Stores.persistentKeyValueStore("my-store"));
+        assertTrue(processorTopology.hasPersistentLocalStore());
+    }
+
+    @Test
+    public void inMemoryStoreShouldNotResultInPersistentGlobalStore() {
+        final ProcessorTopology processorTopology = createGlobalStoreTopology(Stores.inMemoryKeyValueStore("my-store"));
+        assertFalse(processorTopology.hasPersistentGlobalStore());
+    }
+
+    @Test
+    public void persistentGlobalStoreShouldBeDetected() {
+        final ProcessorTopology processorTopology = createGlobalStoreTopology(Stores.persistentKeyValueStore("my-store"));
+        assertTrue(processorTopology.hasPersistentGlobalStore());
+    }
+
+    private ProcessorTopology createLocalStoreTopology(final KeyValueBytesStoreSupplier storeSupplier) {
+        final TopologyWrapper topology = new TopologyWrapper();
+        final String processor = "processor";
+        final StoreBuilder<KeyValueStore<String, String>> storeBuilder =
+                Stores.keyValueStoreBuilder(storeSupplier, Serdes.String(), Serdes.String());
+        topology.addSource("source", STRING_DESERIALIZER, STRING_DESERIALIZER, "topic")
+                .addProcessor(processor, () -> new StatefulProcessor(storeSupplier.name()), "source")
+                .addStateStore(storeBuilder, processor);
+        return topology.getInternalBuilder("anyAppId").build();
+    }
+
+    private ProcessorTopology createGlobalStoreTopology(final KeyValueBytesStoreSupplier storeSupplier) {
+        final TopologyWrapper topology = new TopologyWrapper();
+        final StoreBuilder<KeyValueStore<String, String>> storeBuilder =
+                Stores.keyValueStoreBuilder(storeSupplier, Serdes.String(), Serdes.String()).withLoggingDisabled();
+        topology.addGlobalStore(storeBuilder, "global", STRING_DESERIALIZER, STRING_DESERIALIZER, "topic", "processor",
+                define(new StatefulProcessor(storeSupplier.name())));
+        return topology.getInternalBuilder("anyAppId").build();
+    }
+
     private void assertNextOutputRecord(final String topic,
                                         final String key,
                                         final String value) {
@@ -416,12 +472,7 @@ public class ProcessorTopologyTest {
     }
 
     private StreamPartitioner<Object, Object> constantPartitioner(final Integer partition) {
-        return new StreamPartitioner<Object, Object>() {
-            @Override
-            public Integer partition(final String topic, final Object key, final Object value, final int numPartitions) {
-                return partition;
-            }
-        };
+        return (topic, key, value, numPartitions) -> partition;
     }
 
     private Topology createSimpleTopology(final int partition) {
@@ -619,12 +670,7 @@ public class ProcessorTopologyTest {
     }
 
     private <K, V> ProcessorSupplier<K, V> define(final Processor<K, V> processor) {
-        return new ProcessorSupplier<K, V>() {
-            @Override
-            public Processor<K, V> get() {
-                return processor;
-            }
-        };
+        return () -> processor;
     }
 
     /**
diff --git a/streams/src/test/java/org/apache/kafka/streams/processor/internals/StandbyTaskTest.java b/streams/src/test/java/org/apache/kafka/streams/processor/internals/StandbyTaskTest.java
index db48cb9..52599c6 100644
--- a/streams/src/test/java/org/apache/kafka/streams/processor/internals/StandbyTaskTest.java
+++ b/streams/src/test/java/org/apache/kafka/streams/processor/internals/StandbyTaskTest.java
@@ -160,7 +160,7 @@ public class StandbyTaskTest {
             new PartitionInfo(storeChangelogTopicName2, 2, Node.noNode(), new Node[0], new Node[0])
         ));
         baseDir = TestUtils.tempDirectory();
-        stateDirectory = new StateDirectory(createConfig(baseDir), new MockTime());
+        stateDirectory = new StateDirectory(createConfig(baseDir), new MockTime(), true);
     }
 
     @After
diff --git a/streams/src/test/java/org/apache/kafka/streams/processor/internals/StateDirectoryTest.java b/streams/src/test/java/org/apache/kafka/streams/processor/internals/StateDirectoryTest.java
index e14d010..edb02c6 100644
--- a/streams/src/test/java/org/apache/kafka/streams/processor/internals/StateDirectoryTest.java
+++ b/streams/src/test/java/org/apache/kafka/streams/processor/internals/StateDirectoryTest.java
@@ -52,9 +52,8 @@ public class StateDirectoryTest {
     private StateDirectory directory;
     private File appDir;
 
-    @Before
-    public void before() {
-        stateDir = new File(TestUtils.IO_TMP_DIR, TestUtils.randomString(5));
+    private void initializeStateDirectory(final boolean createStateDirectory) {
+        stateDir = new File(TestUtils.IO_TMP_DIR, "kafka-" + TestUtils.randomString(5));
         directory = new StateDirectory(
             new StreamsConfig(new Properties() {
                 {
@@ -63,10 +62,15 @@ public class StateDirectoryTest {
                     put(StreamsConfig.STATE_DIR_CONFIG, stateDir.getPath());
                 }
             }),
-            time);
+            time, createStateDirectory);
         appDir = new File(stateDir, applicationId);
     }
 
+    @Before
+    public void before() {
+        initializeStateDirectory(true);
+    }
+
     @After
     public void cleanup() throws IOException {
         Utils.delete(stateDir);
@@ -138,7 +142,7 @@ public class StateDirectoryTest {
     }
     
     @Test
-    public void shouldLockMulitpleTaskDirectories() throws IOException {
+    public void shouldLockMultipleTaskDirectories() throws IOException {
         final TaskId taskId = new TaskId(0, 0);
         final File task1Dir = directory.directoryForTask(taskId);
         final TaskId taskId2 = new TaskId(1, 0);
@@ -254,7 +258,7 @@ public class StateDirectoryTest {
                     put(StreamsConfig.STATE_DIR_CONFIG, stateDir.getPath());
                 }
             }),
-            time);
+            time, true);
         final File taskDir = stateDirectory.directoryForTask(new TaskId(0, 0));
         assertTrue(stateDir.exists());
         assertTrue(taskDir.exists());
@@ -296,14 +300,11 @@ public class StateDirectoryTest {
     public void shouldNotLockStateDirLockedByAnotherThread() throws IOException, InterruptedException {
         final TaskId taskId = new TaskId(0, 0);
         final AtomicReference<IOException> exceptionOnThread = new AtomicReference<>();
-        final Thread thread = new Thread(new Runnable() {
-            @Override
-            public void run() {
-                try {
-                    directory.lock(taskId);
-                } catch (final IOException e) {
-                    exceptionOnThread.set(e);
-                }
+        final Thread thread = new Thread(() -> {
+            try {
+                directory.lock(taskId);
+            } catch (final IOException e) {
+                exceptionOnThread.set(e);
             }
         });
         thread.start();
@@ -318,17 +319,14 @@ public class StateDirectoryTest {
         final CountDownLatch lockLatch = new CountDownLatch(1);
         final CountDownLatch unlockLatch = new CountDownLatch(1);
         final AtomicReference<Exception> exceptionOnThread = new AtomicReference<>();
-        final Thread thread = new Thread(new Runnable() {
-            @Override
-            public void run() {
-                try {
-                    directory.lock(taskId);
-                    lockLatch.countDown();
-                    unlockLatch.await();
-                    directory.unlock(taskId);
-                } catch (final Exception e) {
-                    exceptionOnThread.set(e);
-                }
+        final Thread thread = new Thread(() -> {
+            try {
+                directory.lock(taskId);
+                lockLatch.countDown();
+                unlockLatch.await();
+                directory.unlock(taskId);
+            } catch (final Exception e) {
+                exceptionOnThread.set(e);
             }
         });
         thread.start();
@@ -358,4 +356,39 @@ public class StateDirectoryTest {
         files = Arrays.asList(appDir.listFiles());
         assertEquals(0, files.size());
     }
+
+    @Test
+    public void shouldNotCreateBaseDirectory() {
+        initializeStateDirectory(false);
+        assertFalse(stateDir.exists());
+        assertFalse(appDir.exists());
+    }
+
+    @Test
+    public void shouldNotCreateTaskStateDirectory() {
+        initializeStateDirectory(false);
+        final TaskId taskId = new TaskId(0, 0);
+        final File taskDirectory = directory.directoryForTask(taskId);
+        assertFalse(taskDirectory.exists());
+    }
+
+    @Test
+    public void shouldNotCreateGlobalStateDirectory() {
+        initializeStateDirectory(false);
+        final File globalStateDir = directory.globalStateDir();
+        assertFalse(globalStateDir.exists());
+    }
+
+    @Test
+    public void shouldLockTaskStateDirectoryWhenDirectoryCreationDisabled() throws IOException {
+        initializeStateDirectory(false);
+        final TaskId taskId = new TaskId(0, 0);
+        assertTrue(directory.lock(taskId));
+    }
+
+    @Test
+    public void shouldLockGlobalStateDirectoryWhenDirectoryCreationDisabled() throws IOException {
+        initializeStateDirectory(false);
+        assertTrue(directory.lockGlobalState());
+    }
 }
\ No newline at end of file
diff --git a/streams/src/test/java/org/apache/kafka/streams/processor/internals/StreamTaskTest.java b/streams/src/test/java/org/apache/kafka/streams/processor/internals/StreamTaskTest.java
index b042e3c..d4b8511 100644
--- a/streams/src/test/java/org/apache/kafka/streams/processor/internals/StreamTaskTest.java
+++ b/streams/src/test/java/org/apache/kafka/streams/processor/internals/StreamTaskTest.java
@@ -167,7 +167,7 @@ public class StreamTaskTest {
     @Before
     public void setup() {
         consumer.assign(asList(partition1, partition2));
-        stateDirectory = new StateDirectory(createConfig(false), new MockTime());
+        stateDirectory = new StateDirectory(createConfig(false), new MockTime(), true);
     }
 
     @After
@@ -968,7 +968,7 @@ public class StreamTaskTest {
     }
 
     @Test
-    public void shouldWrapProducerFencedExceptionWithTaskMigragedExceptionForBeginTransaction() {
+    public void shouldWrapProducerFencedExceptionWithTaskMigratedExceptionForBeginTransaction() {
         task = createStatelessTask(createConfig(true));
         producer.fenceProducer();
 
diff --git a/streams/src/test/java/org/apache/kafka/streams/processor/internals/StreamThreadTest.java b/streams/src/test/java/org/apache/kafka/streams/processor/internals/StreamThreadTest.java
index 11474a3..42e22f5 100644
--- a/streams/src/test/java/org/apache/kafka/streams/processor/internals/StreamThreadTest.java
+++ b/streams/src/test/java/org/apache/kafka/streams/processor/internals/StreamThreadTest.java
@@ -112,7 +112,7 @@ public class StreamThreadTest {
     private final InternalStreamsBuilder internalStreamsBuilder = new InternalStreamsBuilder(new InternalTopologyBuilder());
     private final StreamsConfig config = new StreamsConfig(configProps(false));
     private final String stateDir = TestUtils.tempDirectory().getPath();
-    private final StateDirectory stateDirectory = new StateDirectory(config, mockTime);
+    private final StateDirectory stateDirectory = new StateDirectory(config, mockTime, true);
     private final ConsumedInternal<Object, Object> consumed = new ConsumedInternal<>();
 
     private UUID processId = UUID.randomUUID();
diff --git a/streams/src/test/java/org/apache/kafka/streams/state/internals/StreamThreadStateStoreProviderTest.java b/streams/src/test/java/org/apache/kafka/streams/state/internals/StreamThreadStateStoreProviderTest.java
index ca059b4..9679429 100644
--- a/streams/src/test/java/org/apache/kafka/streams/state/internals/StreamThreadStateStoreProviderTest.java
+++ b/streams/src/test/java/org/apache/kafka/streams/state/internals/StreamThreadStateStoreProviderTest.java
@@ -99,7 +99,7 @@ public class StreamThreadStateStoreProviderTest {
         final ProcessorTopology processorTopology = topology.getInternalBuilder(applicationId).build();
 
         tasks = new HashMap<>();
-        stateDirectory = new StateDirectory(streamsConfig, new MockTime());
+        stateDirectory = new StateDirectory(streamsConfig, new MockTime(), true);
 
         taskOne = createStreamsTask(streamsConfig, clientSupplier, processorTopology, new TaskId(0, 0));
         taskOne.initializeStateStores();
diff --git a/streams/test-utils/src/main/java/org/apache/kafka/streams/TopologyTestDriver.java b/streams/test-utils/src/main/java/org/apache/kafka/streams/TopologyTestDriver.java
index a11ae6b..a90afe7 100644
--- a/streams/test-utils/src/main/java/org/apache/kafka/streams/TopologyTestDriver.java
+++ b/streams/test-utils/src/main/java/org/apache/kafka/streams/TopologyTestDriver.java
@@ -246,6 +246,8 @@ public class TopologyTestDriver implements Closeable {
 
         processorTopology = internalTopologyBuilder.build(null);
         globalTopology = internalTopologyBuilder.buildGlobalStateTopology();
+        final boolean createStateDirectory = processorTopology.hasPersistentLocalStore() ||
+                (globalTopology != null && globalTopology.hasPersistentGlobalStore());
 
         final Serializer<byte[]> bytesSerializer = new ByteArraySerializer();
         producer = new MockProducer<byte[], byte[]>(true, bytesSerializer, bytesSerializer) {
@@ -256,7 +258,7 @@ public class TopologyTestDriver implements Closeable {
         };
 
         final MockConsumer<byte[], byte[]> consumer = new MockConsumer<>(OffsetResetStrategy.EARLIEST);
-        stateDirectory = new StateDirectory(streamsConfig, mockWallClockTime);
+        stateDirectory = new StateDirectory(streamsConfig, mockWallClockTime, createStateDirectory);
 
         final MetricConfig metricConfig = new MetricConfig()
             .samples(streamsConfig.getInt(StreamsConfig.METRICS_NUM_SAMPLES_CONFIG))
diff --git a/streams/test-utils/src/test/java/org/apache/kafka/streams/TopologyTestDriverTest.java b/streams/test-utils/src/test/java/org/apache/kafka/streams/TopologyTestDriverTest.java
index 3e95c73..9f1c3a7 100644
--- a/streams/test-utils/src/test/java/org/apache/kafka/streams/TopologyTestDriverTest.java
+++ b/streams/test-utils/src/test/java/org/apache/kafka/streams/TopologyTestDriverTest.java
@@ -16,6 +16,7 @@
  */
 package org.apache.kafka.streams;
 
+import java.io.File;
 import java.time.Duration;
 import org.apache.kafka.clients.consumer.ConsumerRecord;
 import org.apache.kafka.clients.producer.ProducerRecord;
@@ -41,6 +42,8 @@ import org.apache.kafka.streams.processor.ProcessorSupplier;
 import org.apache.kafka.streams.processor.PunctuationType;
 import org.apache.kafka.streams.processor.Punctuator;
 import org.apache.kafka.streams.processor.StateStore;
+import org.apache.kafka.streams.processor.TaskId;
+import org.apache.kafka.streams.state.KeyValueBytesStoreSupplier;
 import org.apache.kafka.streams.state.KeyValueIterator;
 import org.apache.kafka.streams.state.KeyValueStore;
 import org.apache.kafka.streams.state.Stores;
@@ -72,6 +75,7 @@ import static org.apache.kafka.common.utils.Utils.mkMap;
 import static org.apache.kafka.common.utils.Utils.mkProperties;
 import static org.hamcrest.CoreMatchers.equalTo;
 import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertFalse;
 import static org.junit.Assert.assertNotNull;
 import static org.junit.Assert.assertThat;
 import static org.junit.Assert.assertTrue;
@@ -773,14 +777,18 @@ public class TopologyTestDriverTest {
     }
 
     private void setup() {
+        setup(Stores.inMemoryKeyValueStore("aggStore"));
+    }
+
+    private void setup(final KeyValueBytesStoreSupplier storeSupplier) {
         final Topology topology = new Topology();
         topology.addSource("sourceProcessor", "input-topic");
         topology.addProcessor("aggregator", new CustomMaxAggregatorSupplier(), "sourceProcessor");
         topology.addStateStore(Stores.keyValueStoreBuilder(
-            Stores.inMemoryKeyValueStore("aggStore"),
-            Serdes.String(),
-            Serdes.Long()),
-            "aggregator");
+                storeSupplier,
+                Serdes.String(),
+                Serdes.Long()),
+                "aggregator");
         topology.addSink("sinkProcessor", "result-topic", "aggregator");
 
         config.setProperty(StreamsConfig.DEFAULT_KEY_SERDE_CLASS_CONFIG, Serdes.String().getClass().getName());
@@ -1070,4 +1078,25 @@ public class TopologyTestDriverTest {
             assertEquals(str, exception.getMessage());
         }
     }
+
+    @Test
+    public void shouldNotCreateStateDirectoryForStatelessTopology() {
+        setup();
+        final String stateDir = config.getProperty(StreamsConfig.STATE_DIR_CONFIG);
+        final File appDir = new File(stateDir, config.getProperty(StreamsConfig.APPLICATION_ID_CONFIG));
+        assertFalse(appDir.exists());
+    }
+
+    @Test
+    public void shouldCreateStateDirectoryForStatefulTopology() {
+        setup(Stores.persistentKeyValueStore("aggStore"));
+        final String stateDir = config.getProperty(StreamsConfig.STATE_DIR_CONFIG);
+        final File appDir = new File(stateDir, config.getProperty(StreamsConfig.APPLICATION_ID_CONFIG));
+
+        assertTrue(appDir.exists());
+        assertTrue(appDir.isDirectory());
+
+        final TaskId taskId = new TaskId(0, 0);
+        assertTrue(new File(appDir, taskId.toString()).exists());
+    }
 }