You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@kafka.apache.org by mj...@apache.org on 2018/11/27 21:09:52 UTC
[kafka] branch trunk updated: KAFKA-7367: Streams should not create
state store directories unless they are needed (#5696)
This is an automated email from the ASF dual-hosted git repository.
mjsax pushed a commit to branch trunk
in repository https://gitbox.apache.org/repos/asf/kafka.git
The following commit(s) were added to refs/heads/trunk by this push:
new de24d4a KAFKA-7367: Streams should not create state store directories unless they are needed (#5696)
de24d4a is described below
commit de24d4a459ccfac623e71722f8df8b1e99f2ad43
Author: Kamal Chandraprakash <ka...@gmail.com>
AuthorDate: Wed Nov 28 02:39:44 2018 +0530
KAFKA-7367: Streams should not create state store directories unless they are needed (#5696)
* KAFKA-7367: Ensure stateless topologies don't require disk access
* KAFKA-7367: Streams should not create state store directories unless they are needed.
* Addressed the review comments.
* Addressed the review-2 comments.
* Fixed FileAlreadyExistsException
* Addressed the review-3 comments.
* Resolved the conflicts.
---
.../org/apache/kafka/streams/KafkaStreams.java | 16 +-
.../processor/internals/ProcessorTopology.java | 18 +++
.../processor/internals/StateDirectory.java | 37 +++--
.../org/apache/kafka/streams/KafkaStreamsTest.java | 161 ++++++++++++++++++++-
.../integration/RestoreIntegrationTest.java | 4 +-
.../processor/internals/AbstractTaskTest.java | 2 +-
.../internals/GlobalStateManagerImplTest.java | 12 +-
.../internals/GlobalStreamThreadTest.java | 4 +-
.../internals/ProcessorStateManagerTest.java | 4 +-
.../processor/internals/ProcessorTopologyTest.java | 70 +++++++--
.../processor/internals/StandbyTaskTest.java | 2 +-
.../processor/internals/StateDirectoryTest.java | 83 +++++++----
.../processor/internals/StreamTaskTest.java | 4 +-
.../processor/internals/StreamThreadTest.java | 2 +-
.../StreamThreadStateStoreProviderTest.java | 2 +-
.../apache/kafka/streams/TopologyTestDriver.java | 4 +-
.../kafka/streams/TopologyTestDriverTest.java | 37 ++++-
17 files changed, 375 insertions(+), 87 deletions(-)
diff --git a/streams/src/main/java/org/apache/kafka/streams/KafkaStreams.java b/streams/src/main/java/org/apache/kafka/streams/KafkaStreams.java
index bbda11d..c29b7bc 100644
--- a/streams/src/main/java/org/apache/kafka/streams/KafkaStreams.java
+++ b/streams/src/main/java/org/apache/kafka/streams/KafkaStreams.java
@@ -644,12 +644,6 @@ public class KafkaStreams implements AutoCloseable {
final LogContext logContext = new LogContext(String.format("stream-client [%s] ", clientId));
this.log = logContext.logger(getClass());
- try {
- stateDirectory = new StateDirectory(config, time);
- } catch (final ProcessorStateException fatal) {
- throw new StreamsException(fatal);
- }
-
final MetricConfig metricConfig = new MetricConfig()
.samples(config.getInt(StreamsConfig.METRICS_NUM_SAMPLES_CONFIG))
.recordLevel(Sensor.RecordingLevel.forName(config.getString(StreamsConfig.METRICS_RECORDING_LEVEL_CONFIG)))
@@ -664,7 +658,7 @@ public class KafkaStreams implements AutoCloseable {
internalTopologyBuilder.rewriteTopology(config);
// sanity check to fail-fast in case we cannot build a ProcessorTopology due to an exception
- internalTopologyBuilder.build();
+ final ProcessorTopology taskTopology = internalTopologyBuilder.build();
streamsMetadataState = new StreamsMetadataState(
internalTopologyBuilder,
@@ -680,6 +674,14 @@ public class KafkaStreams implements AutoCloseable {
}
final ProcessorTopology globalTaskTopology = internalTopologyBuilder.buildGlobalStateTopology();
final long cacheSizePerThread = totalCacheSize / (threads.length + (globalTaskTopology == null ? 0 : 1));
+ final boolean createStateDirectory = taskTopology.hasPersistentLocalStore() ||
+ (globalTaskTopology != null && globalTaskTopology.hasPersistentGlobalStore());
+
+ try {
+ stateDirectory = new StateDirectory(config, time, createStateDirectory);
+ } catch (final ProcessorStateException fatal) {
+ throw new StreamsException(fatal);
+ }
final StateRestoreListener delegatingStateRestoreListener = new DelegatingStateRestoreListener();
GlobalStreamThread.State globalThreadState = null;
diff --git a/streams/src/main/java/org/apache/kafka/streams/processor/internals/ProcessorTopology.java b/streams/src/main/java/org/apache/kafka/streams/processor/internals/ProcessorTopology.java
index 8fcbbb4..57af1f3 100644
--- a/streams/src/main/java/org/apache/kafka/streams/processor/internals/ProcessorTopology.java
+++ b/streams/src/main/java/org/apache/kafka/streams/processor/internals/ProcessorTopology.java
@@ -152,6 +152,24 @@ public class ProcessorTopology {
return repartitionTopics.contains(topic);
}
+ public boolean hasPersistentLocalStore() {
+ for (final StateStore store : stateStores) {
+ if (store.persistent()) {
+ return true;
+ }
+ }
+ return false;
+ }
+
+ public boolean hasPersistentGlobalStore() {
+ for (final StateStore store : globalStateStores) {
+ if (store.persistent()) {
+ return true;
+ }
+ }
+ return false;
+ }
+
private String childrenToString(final String indent, final List<ProcessorNode<?, ?>> children) {
if (children == null || children.isEmpty()) {
return "";
diff --git a/streams/src/main/java/org/apache/kafka/streams/processor/internals/StateDirectory.java b/streams/src/main/java/org/apache/kafka/streams/processor/internals/StateDirectory.java
index a3227ca..f5c4c31 100644
--- a/streams/src/main/java/org/apache/kafka/streams/processor/internals/StateDirectory.java
+++ b/streams/src/main/java/org/apache/kafka/streams/processor/internals/StateDirectory.java
@@ -26,7 +26,6 @@ import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import java.io.File;
-import java.io.FileFilter;
import java.io.IOException;
import java.nio.channels.FileChannel;
import java.nio.channels.FileLock;
@@ -50,6 +49,7 @@ public class StateDirectory {
private static final Logger log = LoggerFactory.getLogger(StateDirectory.class);
private final File stateDir;
+ private final boolean createStateDirectory;
private final HashMap<TaskId, FileChannel> channels = new HashMap<>();
private final HashMap<TaskId, LockAndOwner> locks = new HashMap<>();
private final Time time;
@@ -71,19 +71,21 @@ public class StateDirectory {
* Ensures that the state base directory as well as the application's sub-directory are created.
*
* @throws ProcessorStateException if the base state directory or application state directory does not exist
- * and could not be created
+ * and could not be created when createStateDirectory is enabled.
*/
public StateDirectory(final StreamsConfig config,
- final Time time) {
+ final Time time,
+ final boolean createStateDirectory) {
this.time = time;
+ this.createStateDirectory = createStateDirectory;
final String stateDirName = config.getString(StreamsConfig.STATE_DIR_CONFIG);
final File baseDir = new File(stateDirName);
- if (!baseDir.exists() && !baseDir.mkdirs()) {
+ if (this.createStateDirectory && !baseDir.exists() && !baseDir.mkdirs()) {
throw new ProcessorStateException(
String.format("base state directory [%s] doesn't exist and couldn't be created", stateDirName));
}
stateDir = new File(baseDir, config.getString(StreamsConfig.APPLICATION_ID_CONFIG));
- if (!stateDir.exists() && !stateDir.mkdir()) {
+ if (this.createStateDirectory && !stateDir.exists() && !stateDir.mkdir()) {
throw new ProcessorStateException(
String.format("state directory [%s] doesn't exist and couldn't be created", stateDir.getPath()));
}
@@ -96,7 +98,7 @@ public class StateDirectory {
*/
public File directoryForTask(final TaskId taskId) {
final File taskDir = new File(stateDir, taskId.toString());
- if (!taskDir.exists() && !taskDir.mkdir()) {
+ if (createStateDirectory && !taskDir.exists() && !taskDir.mkdir()) {
throw new ProcessorStateException(
String.format("task directory [%s] doesn't exist and couldn't be created", taskDir.getPath()));
}
@@ -110,7 +112,7 @@ public class StateDirectory {
*/
File globalStateDir() {
final File dir = new File(stateDir, "global");
- if (!dir.exists() && !dir.mkdir()) {
+ if (createStateDirectory && !dir.exists() && !dir.mkdir()) {
throw new ProcessorStateException(
String.format("global state directory [%s] doesn't exist and couldn't be created", dir.getPath()));
}
@@ -128,6 +130,9 @@ public class StateDirectory {
* @throws IOException
*/
synchronized boolean lock(final TaskId taskId) throws IOException {
+ if (!createStateDirectory) {
+ return true;
+ }
final File lockFile;
// we already have the lock so bail out here
@@ -169,6 +174,10 @@ public class StateDirectory {
}
synchronized boolean lockGlobalState() throws IOException {
+ if (!createStateDirectory) {
+ return true;
+ }
+
if (globalStateLock != null) {
log.trace("{} Found cached state dir lock for the global task", logPrefix());
return true;
@@ -234,7 +243,9 @@ public class StateDirectory {
throw new StreamsException(e);
}
try {
- Utils.delete(globalStateDir().getAbsoluteFile());
+ if (stateDir.exists()) {
+ Utils.delete(globalStateDir().getAbsoluteFile());
+ }
} catch (final IOException e) {
log.error("{} Failed to delete global state directory due to an unexpected exception", logPrefix(), e);
throw new StreamsException(e);
@@ -320,12 +331,8 @@ public class StateDirectory {
* @return The list of all the existing local directories for stream tasks
*/
File[] listTaskDirectories() {
- return stateDir.listFiles(new FileFilter() {
- @Override
- public boolean accept(final File pathname) {
- return pathname.isDirectory() && PATH_NAME.matcher(pathname.getName()).matches();
- }
- });
+ return !stateDir.exists() ? new File[0] :
+ stateDir.listFiles(pathname -> pathname.isDirectory() && PATH_NAME.matcher(pathname.getName()).matches());
}
private FileChannel getOrCreateFileChannel(final TaskId taskId,
@@ -344,6 +351,4 @@ public class StateDirectory {
}
}
-
-
}
diff --git a/streams/src/test/java/org/apache/kafka/streams/KafkaStreamsTest.java b/streams/src/test/java/org/apache/kafka/streams/KafkaStreamsTest.java
index fde5bff..3ca0dcf 100644
--- a/streams/src/test/java/org/apache/kafka/streams/KafkaStreamsTest.java
+++ b/streams/src/test/java/org/apache/kafka/streams/KafkaStreamsTest.java
@@ -16,6 +16,9 @@
*/
package org.apache.kafka.streams;
+import java.io.IOException;
+import java.nio.file.Files;
+import java.nio.file.Path;
import java.time.Duration;
import org.apache.kafka.clients.CommonClientConfigs;
import org.apache.kafka.clients.consumer.ConsumerConfig;
@@ -27,27 +30,37 @@ import org.apache.kafka.common.config.ConfigException;
import org.apache.kafka.common.metrics.Sensor;
import org.apache.kafka.common.network.Selectable;
import org.apache.kafka.common.serialization.Serdes;
+import org.apache.kafka.common.serialization.StringDeserializer;
import org.apache.kafka.common.serialization.StringSerializer;
+import org.apache.kafka.common.utils.Utils;
import org.apache.kafka.streams.errors.StreamsException;
import org.apache.kafka.streams.integration.utils.EmbeddedKafkaCluster;
import org.apache.kafka.streams.integration.utils.IntegrationTestUtils;
import org.apache.kafka.streams.kstream.Consumed;
+import org.apache.kafka.streams.processor.AbstractProcessor;
import org.apache.kafka.streams.processor.ThreadMetadata;
import org.apache.kafka.streams.processor.internals.GlobalStreamThread;
import org.apache.kafka.streams.processor.internals.StreamThread;
+import org.apache.kafka.streams.state.KeyValueStore;
+import org.apache.kafka.streams.state.StoreBuilder;
+import org.apache.kafka.streams.state.Stores;
import org.apache.kafka.test.IntegrationTest;
import org.apache.kafka.test.MockClientSupplier;
import org.apache.kafka.test.MockMetricsReporter;
+import org.apache.kafka.test.MockProcessorSupplier;
import org.apache.kafka.test.MockStateRestoreListener;
import org.apache.kafka.test.TestUtils;
import org.junit.After;
import org.junit.Assert;
import org.junit.Before;
import org.junit.ClassRule;
+import org.junit.Rule;
import org.junit.Test;
import org.junit.experimental.categories.Category;
+import org.junit.rules.TestName;
import java.io.File;
+import java.util.Collection;
import java.util.Collections;
import java.util.HashMap;
import java.util.List;
@@ -57,6 +70,7 @@ import java.util.Set;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicBoolean;
+import java.util.stream.Collectors;
import static java.util.Arrays.asList;
import static org.junit.Assert.assertEquals;
@@ -79,6 +93,9 @@ public class KafkaStreamsTest {
private KafkaStreams globalStreams;
private Properties props;
+ @Rule
+ public TestName testName = new TestName();
+
@Before
public void before() {
props = new Properties();
@@ -125,8 +142,8 @@ public class KafkaStreamsTest {
final StateListenerStub stateListener = new StateListenerStub();
globalStreams.setStateListener(stateListener);
- Assert.assertEquals(globalStreams.state(), KafkaStreams.State.CREATED);
- Assert.assertEquals(stateListener.numChanges, 0);
+ Assert.assertEquals(KafkaStreams.State.CREATED, globalStreams.state());
+ Assert.assertEquals(0, stateListener.numChanges);
globalStreams.start();
TestUtils.waitForCondition(
@@ -136,7 +153,7 @@ public class KafkaStreamsTest {
globalStreams.close();
- Assert.assertEquals(globalStreams.state(), KafkaStreams.State.NOT_RUNNING);
+ Assert.assertEquals(KafkaStreams.State.NOT_RUNNING, globalStreams.state());
}
@Test
@@ -158,7 +175,7 @@ public class KafkaStreamsTest {
builder.globalTable("anyTopic");
final List<Node> nodes = asList(new Node(0, "localhost", 8121));
final Cluster cluster = new Cluster("mockClusterId", nodes,
- Collections.emptySet(), Collections.<String>emptySet(),
+ Collections.emptySet(), Collections.emptySet(),
Collections.emptySet(), nodes.get(0));
final MockClientSupplier clientSupplier = new MockClientSupplier();
clientSupplier.setClusterForAdminClient(cluster);
@@ -349,6 +366,7 @@ public class KafkaStreamsTest {
try {
globalStreams.start();
+ fail("Should throw an IllegalStateException");
} catch (final IllegalStateException e) {
// this is ok
} finally {
@@ -575,6 +593,141 @@ public class KafkaStreamsTest {
}
}
+ @Test
+ public void statelessTopologyShouldNotCreateStateDirectory() throws Exception {
+ final String inputTopic = testName.getMethodName() + "-input";
+ final String outputTopic = testName.getMethodName() + "-output";
+ CLUSTER.createTopics(inputTopic, outputTopic);
+
+ final Topology topology = new Topology();
+ topology.addSource("source", Serdes.String().deserializer(), Serdes.String().deserializer(), inputTopic)
+ .addProcessor("process", () -> new AbstractProcessor<String, String>() {
+ @Override
+ public void process(final String key, final String value) {
+ if (value.length() % 2 == 0) {
+ context().forward(key, key + value);
+ }
+ }
+ }, "source")
+ .addSink("sink", outputTopic, new StringSerializer(), new StringSerializer(), "process");
+ startStreamsAndCheckDirExists(topology, Collections.singleton(inputTopic), outputTopic, false);
+ }
+
+ @Test
+ public void inMemoryStatefulTopologyShouldNotCreateStateDirectory() throws Exception {
+ final String inputTopic = testName.getMethodName() + "-input";
+ final String outputTopic = testName.getMethodName() + "-output";
+ final String globalTopicName = testName.getMethodName() + "-global";
+ final String storeName = testName.getMethodName() + "-counts";
+ final String globalStoreName = testName.getMethodName() + "-globalStore";
+ final Topology topology = getStatefulTopology(inputTopic, outputTopic, globalTopicName, storeName, globalStoreName, false);
+ startStreamsAndCheckDirExists(topology, asList(inputTopic, globalTopicName), outputTopic, false);
+ }
+
+ @Test
+ public void statefulTopologyShouldCreateStateDirectory() throws Exception {
+ final String inputTopic = testName.getMethodName() + "-input";
+ final String outputTopic = testName.getMethodName() + "-output";
+ final String globalTopicName = testName.getMethodName() + "-global";
+ final String storeName = testName.getMethodName() + "-counts";
+ final String globalStoreName = testName.getMethodName() + "-globalStore";
+ final Topology topology = getStatefulTopology(inputTopic, outputTopic, globalTopicName, storeName, globalStoreName, true);
+ startStreamsAndCheckDirExists(topology, asList(inputTopic, globalTopicName), outputTopic, true);
+ }
+
+ @SuppressWarnings("unchecked")
+ private Topology getStatefulTopology(final String inputTopic,
+ final String outputTopic,
+ final String globalTopicName,
+ final String storeName,
+ final String globalStoreName,
+ final boolean isPersistentStore) throws Exception {
+ CLUSTER.createTopics(inputTopic, outputTopic, globalTopicName);
+ final StoreBuilder<KeyValueStore<String, Long>> storeBuilder = Stores.keyValueStoreBuilder(isPersistentStore ?
+ Stores.persistentKeyValueStore(storeName) : Stores.inMemoryKeyValueStore(storeName),
+ Serdes.String(), Serdes.Long());
+ final Topology topology = new Topology();
+ topology.addSource("source", Serdes.String().deserializer(), Serdes.String().deserializer(), inputTopic)
+ .addProcessor("process", () -> new AbstractProcessor<String, String>() {
+ @Override
+ public void process(final String key, final String value) {
+ final KeyValueStore<String, Long> kvStore =
+ (KeyValueStore<String, Long>) context().getStateStore(storeName);
+ kvStore.put(key, 5L);
+
+ context().forward(key, "5");
+ context().commit();
+ }
+ }, "source")
+ .addStateStore(storeBuilder, "process")
+ .addSink("sink", outputTopic, new StringSerializer(), new StringSerializer(), "process");
+
+ final StoreBuilder<KeyValueStore<String, String>> globalStoreBuilder = Stores.keyValueStoreBuilder(
+ isPersistentStore ? Stores.persistentKeyValueStore(globalStoreName) : Stores.inMemoryKeyValueStore(globalStoreName),
+ Serdes.String(), Serdes.String()).withLoggingDisabled();
+ topology.addGlobalStore(globalStoreBuilder,
+ "global",
+ Serdes.String().deserializer(),
+ Serdes.String().deserializer(),
+ globalTopicName,
+ globalTopicName + "-processor",
+ new MockProcessorSupplier());
+ return topology;
+ }
+
+ private void startStreamsAndCheckDirExists(final Topology topology,
+ final Collection<String> inputTopics,
+ final String outputTopic,
+ final boolean shouldFilesExist) throws Exception {
+ final File baseDir = new File(TestUtils.IO_TMP_DIR + File.separator + "kafka-" + TestUtils.randomString(5));
+ final Path basePath = baseDir.toPath();
+ if (!baseDir.exists()) {
+ Files.createDirectory(basePath);
+ }
+ // changing the path of state directory to make sure that it should not clash with other test cases.
+ final Properties localProps = new Properties();
+ localProps.putAll(props);
+ localProps.put(StreamsConfig.STATE_DIR_CONFIG, baseDir.getAbsolutePath());
+
+ final KafkaStreams streams = new KafkaStreams(topology, localProps);
+ streams.start();
+
+ for (final String topic : inputTopics) {
+ IntegrationTestUtils.produceKeyValuesSynchronouslyWithTimestamp(topic,
+ Collections.singletonList(new KeyValue<>("A", "A")),
+ TestUtils.producerConfig(
+ CLUSTER.bootstrapServers(),
+ StringSerializer.class,
+ StringSerializer.class,
+ new Properties()),
+ System.currentTimeMillis());
+ }
+
+ IntegrationTestUtils.readKeyValues(outputTopic,
+ TestUtils.consumerConfig(
+ CLUSTER.bootstrapServers(),
+ outputTopic + "-group",
+ StringDeserializer.class,
+ StringDeserializer.class),
+ 5000, 1);
+
+ try {
+ final List<Path> files = Files.find(basePath, 999, (p, bfa) -> !p.equals(basePath)).collect(Collectors.toList());
+ if (shouldFilesExist && files.isEmpty()) {
+ Assert.fail("Files should have existed, but it didn't: " + files);
+ }
+ if (!shouldFilesExist && !files.isEmpty()) {
+ Assert.fail("Files should not have existed, but it did: " + files);
+ }
+ } catch (final IOException e) {
+ Assert.fail("Couldn't read the state directory : " + baseDir.getPath());
+ } finally {
+ streams.close();
+ streams.cleanUp();
+ Utils.delete(baseDir);
+ }
+ }
+
private void verifyCleanupStateDir(final String appDir, final File oldTaskDir) throws InterruptedException {
final File taskDir = new File(appDir, "0_0");
TestUtils.waitForCondition(
diff --git a/streams/src/test/java/org/apache/kafka/streams/integration/RestoreIntegrationTest.java b/streams/src/test/java/org/apache/kafka/streams/integration/RestoreIntegrationTest.java
index 81938f8..c3c45db 100644
--- a/streams/src/test/java/org/apache/kafka/streams/integration/RestoreIntegrationTest.java
+++ b/streams/src/test/java/org/apache/kafka/streams/integration/RestoreIntegrationTest.java
@@ -132,7 +132,7 @@ public class RestoreIntegrationTest {
createStateForRestoration(INPUT_STREAM);
setCommittedOffset(INPUT_STREAM, offsetLimitDelta);
- final StateDirectory stateDirectory = new StateDirectory(new StreamsConfig(props), new MockTime());
+ final StateDirectory stateDirectory = new StateDirectory(new StreamsConfig(props), new MockTime(), true);
new OffsetCheckpoint(new File(stateDirectory.directoryForTask(new TaskId(0, 0)), ".checkpoint"))
.write(Collections.singletonMap(new TopicPartition(INPUT_STREAM, 0), (long) offsetCheckpointed));
new OffsetCheckpoint(new File(stateDirectory.directoryForTask(new TaskId(0, 1)), ".checkpoint"))
@@ -199,7 +199,7 @@ public class RestoreIntegrationTest {
createStateForRestoration(APPID + "-store-changelog");
createStateForRestoration(INPUT_STREAM);
- final StateDirectory stateDirectory = new StateDirectory(new StreamsConfig(props), new MockTime());
+ final StateDirectory stateDirectory = new StateDirectory(new StreamsConfig(props), new MockTime(), true);
new OffsetCheckpoint(new File(stateDirectory.directoryForTask(new TaskId(0, 0)), ".checkpoint"))
.write(Collections.singletonMap(new TopicPartition(APPID + "-store-changelog", 0), (long) offsetCheckpointed));
new OffsetCheckpoint(new File(stateDirectory.directoryForTask(new TaskId(0, 1)), ".checkpoint"))
diff --git a/streams/src/test/java/org/apache/kafka/streams/processor/internals/AbstractTaskTest.java b/streams/src/test/java/org/apache/kafka/streams/processor/internals/AbstractTaskTest.java
index 4ed44be..b3afa24 100644
--- a/streams/src/test/java/org/apache/kafka/streams/processor/internals/AbstractTaskTest.java
+++ b/streams/src/test/java/org/apache/kafka/streams/processor/internals/AbstractTaskTest.java
@@ -152,7 +152,7 @@ public class AbstractTaskTest {
expect(store4.name()).andReturn(storeName4).anyTimes();
EasyMock.replay(store4);
- final StateDirectory stateDirectory = new StateDirectory(streamsConfig, new MockTime());
+ final StateDirectory stateDirectory = new StateDirectory(streamsConfig, new MockTime(), true);
final AbstractTask task = createTask(
consumer,
new HashMap<StateStore, String>() {
diff --git a/streams/src/test/java/org/apache/kafka/streams/processor/internals/GlobalStateManagerImplTest.java b/streams/src/test/java/org/apache/kafka/streams/processor/internals/GlobalStateManagerImplTest.java
index 8789e2b..8b02b3e 100644
--- a/streams/src/test/java/org/apache/kafka/streams/processor/internals/GlobalStateManagerImplTest.java
+++ b/streams/src/test/java/org/apache/kafka/streams/processor/internals/GlobalStateManagerImplTest.java
@@ -112,7 +112,7 @@ public class GlobalStateManagerImplTest {
put(StreamsConfig.STATE_DIR_CONFIG, TestUtils.tempDirectory().getPath());
}
});
- stateDirectory = new StateDirectory(streamsConfig, time);
+ stateDirectory = new StateDirectory(streamsConfig, time, true);
consumer = new MockConsumer<>(OffsetResetStrategy.NONE);
stateManager = new GlobalStateManagerImpl(
new LogContext("test"),
@@ -139,7 +139,7 @@ public class GlobalStateManagerImplTest {
@Test(expected = LockException.class)
public void shouldThrowLockExceptionIfCantGetLock() throws IOException {
- final StateDirectory stateDir = new StateDirectory(streamsConfig, time);
+ final StateDirectory stateDir = new StateDirectory(streamsConfig, time, true);
try {
stateDir.lockGlobalState();
stateManager.initialize();
@@ -357,8 +357,8 @@ public class GlobalStateManagerImplTest {
@Test
public void shouldUnlockGlobalStateDirectoryOnClose() throws IOException {
stateManager.initialize();
- stateManager.close(Collections.<TopicPartition, Long>emptyMap());
- final StateDirectory stateDir = new StateDirectory(streamsConfig, new MockTime());
+ stateManager.close(Collections.emptyMap());
+ final StateDirectory stateDir = new StateDirectory(streamsConfig, new MockTime(), true);
try {
// should be able to get the lock now as it should've been released in close
assertTrue(stateDir.lockGlobalState());
@@ -419,7 +419,7 @@ public class GlobalStateManagerImplTest {
} catch (final StreamsException e) {
// expected
}
- final StateDirectory stateDir = new StateDirectory(streamsConfig, new MockTime());
+ final StateDirectory stateDir = new StateDirectory(streamsConfig, new MockTime(), true);
try {
// should be able to get the lock now as it should've been released
assertTrue(stateDir.lockGlobalState());
@@ -511,7 +511,7 @@ public class GlobalStateManagerImplTest {
new LogContext("mock"),
topology,
consumer,
- new StateDirectory(streamsConfig, time) {
+ new StateDirectory(streamsConfig, time, true) {
@Override
public boolean lockGlobalState() throws IOException {
throw new IOException("KABOOM!");
diff --git a/streams/src/test/java/org/apache/kafka/streams/processor/internals/GlobalStreamThreadTest.java b/streams/src/test/java/org/apache/kafka/streams/processor/internals/GlobalStreamThreadTest.java
index 37a6fdb..387179b 100644
--- a/streams/src/test/java/org/apache/kafka/streams/processor/internals/GlobalStreamThreadTest.java
+++ b/streams/src/test/java/org/apache/kafka/streams/processor/internals/GlobalStreamThreadTest.java
@@ -107,7 +107,7 @@ public class GlobalStreamThreadTest {
globalStreamThread = new GlobalStreamThread(builder.rewriteTopology(config).buildGlobalStateTopology(),
config,
mockConsumer,
- new StateDirectory(config, time),
+ new StateDirectory(config, time, true),
0,
new Metrics(),
new MockTime(),
@@ -140,7 +140,7 @@ public class GlobalStreamThreadTest {
globalStreamThread = new GlobalStreamThread(builder.buildGlobalStateTopology(),
config,
mockConsumer,
- new StateDirectory(config, time),
+ new StateDirectory(config, time, true),
0,
new Metrics(),
new MockTime(),
diff --git a/streams/src/test/java/org/apache/kafka/streams/processor/internals/ProcessorStateManagerTest.java b/streams/src/test/java/org/apache/kafka/streams/processor/internals/ProcessorStateManagerTest.java
index fbcb2c8..3d91ee0 100644
--- a/streams/src/test/java/org/apache/kafka/streams/processor/internals/ProcessorStateManagerTest.java
+++ b/streams/src/test/java/org/apache/kafka/streams/processor/internals/ProcessorStateManagerTest.java
@@ -98,7 +98,7 @@ public class ProcessorStateManagerTest {
put(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG, "dummy:1234");
put(StreamsConfig.STATE_DIR_CONFIG, baseDir.getPath());
}
- }), new MockTime());
+ }), new MockTime(), true);
checkpointFile = new File(stateDirectory.directoryForTask(taskId), ProcessorStateManager.CHECKPOINT_FILE_NAME);
checkpoint = new OffsetCheckpoint(checkpointFile);
}
@@ -565,7 +565,7 @@ public class ProcessorStateManagerTest {
}
@Test
- public void shouldFlushAllStoresEvenIfStoreThrowsExcepiton() throws IOException {
+ public void shouldFlushAllStoresEvenIfStoreThrowsException() throws IOException {
final ProcessorStateManager stateManager = new ProcessorStateManager(
taskId,
Collections.singleton(changelogTopicPartition),
diff --git a/streams/src/test/java/org/apache/kafka/streams/processor/internals/ProcessorTopologyTest.java b/streams/src/test/java/org/apache/kafka/streams/processor/internals/ProcessorTopologyTest.java
index 587cae2..11050fe 100644
--- a/streams/src/test/java/org/apache/kafka/streams/processor/internals/ProcessorTopologyTest.java
+++ b/streams/src/test/java/org/apache/kafka/streams/processor/internals/ProcessorTopologyTest.java
@@ -38,7 +38,9 @@ import org.apache.kafka.streams.processor.ProcessorSupplier;
import org.apache.kafka.streams.processor.StreamPartitioner;
import org.apache.kafka.streams.processor.TimestampExtractor;
import org.apache.kafka.streams.processor.To;
+import org.apache.kafka.streams.state.KeyValueBytesStoreSupplier;
import org.apache.kafka.streams.state.KeyValueStore;
+import org.apache.kafka.streams.state.StoreBuilder;
import org.apache.kafka.streams.state.Stores;
import org.apache.kafka.streams.test.ConsumerRecordFactory;
import org.apache.kafka.test.MockProcessorSupplier;
@@ -54,8 +56,10 @@ import static org.hamcrest.CoreMatchers.containsString;
import static org.hamcrest.CoreMatchers.equalTo;
import static org.hamcrest.MatcherAssert.assertThat;
import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertFalse;
import static org.junit.Assert.assertNotNull;
import static org.junit.Assert.assertNull;
+import static org.junit.Assert.assertTrue;
public class ProcessorTopologyTest {
@@ -367,6 +371,58 @@ public class ProcessorTopologyTest {
assertNextOutputRecord(OUTPUT_TOPIC_1, "key3", "value3", HEADERS, 30L);
}
+ @Test
+ public void statelessTopologyShouldNotHavePersistentStore() {
+ final TopologyWrapper topology = new TopologyWrapper();
+ final ProcessorTopology processorTopology = topology.getInternalBuilder("anyAppId").build();
+ assertFalse(processorTopology.hasPersistentLocalStore());
+ assertFalse(processorTopology.hasPersistentGlobalStore());
+ }
+
+ @Test
+ public void inMemoryStoreShouldNotResultInPersistentLocalStore() {
+ final ProcessorTopology processorTopology = createLocalStoreTopology(Stores.inMemoryKeyValueStore("my-store"));
+ assertFalse(processorTopology.hasPersistentLocalStore());
+ }
+
+ @Test
+ public void persistentLocalStoreShouldBeDetected() {
+ final ProcessorTopology processorTopology = createLocalStoreTopology(Stores.persistentKeyValueStore("my-store"));
+ assertTrue(processorTopology.hasPersistentLocalStore());
+ }
+
+ @Test
+ public void inMemoryStoreShouldNotResultInPersistentGlobalStore() {
+ final ProcessorTopology processorTopology = createGlobalStoreTopology(Stores.inMemoryKeyValueStore("my-store"));
+ assertFalse(processorTopology.hasPersistentGlobalStore());
+ }
+
+ @Test
+ public void persistentGlobalStoreShouldBeDetected() {
+ final ProcessorTopology processorTopology = createGlobalStoreTopology(Stores.persistentKeyValueStore("my-store"));
+ assertTrue(processorTopology.hasPersistentGlobalStore());
+ }
+
+ private ProcessorTopology createLocalStoreTopology(final KeyValueBytesStoreSupplier storeSupplier) {
+ final TopologyWrapper topology = new TopologyWrapper();
+ final String processor = "processor";
+ final StoreBuilder<KeyValueStore<String, String>> storeBuilder =
+ Stores.keyValueStoreBuilder(storeSupplier, Serdes.String(), Serdes.String());
+ topology.addSource("source", STRING_DESERIALIZER, STRING_DESERIALIZER, "topic")
+ .addProcessor(processor, () -> new StatefulProcessor(storeSupplier.name()), "source")
+ .addStateStore(storeBuilder, processor);
+ return topology.getInternalBuilder("anyAppId").build();
+ }
+
+ private ProcessorTopology createGlobalStoreTopology(final KeyValueBytesStoreSupplier storeSupplier) {
+ final TopologyWrapper topology = new TopologyWrapper();
+ final StoreBuilder<KeyValueStore<String, String>> storeBuilder =
+ Stores.keyValueStoreBuilder(storeSupplier, Serdes.String(), Serdes.String()).withLoggingDisabled();
+ topology.addGlobalStore(storeBuilder, "global", STRING_DESERIALIZER, STRING_DESERIALIZER, "topic", "processor",
+ define(new StatefulProcessor(storeSupplier.name())));
+ return topology.getInternalBuilder("anyAppId").build();
+ }
+
private void assertNextOutputRecord(final String topic,
final String key,
final String value) {
@@ -416,12 +472,7 @@ public class ProcessorTopologyTest {
}
private StreamPartitioner<Object, Object> constantPartitioner(final Integer partition) {
- return new StreamPartitioner<Object, Object>() {
- @Override
- public Integer partition(final String topic, final Object key, final Object value, final int numPartitions) {
- return partition;
- }
- };
+ return (topic, key, value, numPartitions) -> partition;
}
private Topology createSimpleTopology(final int partition) {
@@ -619,12 +670,7 @@ public class ProcessorTopologyTest {
}
private <K, V> ProcessorSupplier<K, V> define(final Processor<K, V> processor) {
- return new ProcessorSupplier<K, V>() {
- @Override
- public Processor<K, V> get() {
- return processor;
- }
- };
+ return () -> processor;
}
/**
diff --git a/streams/src/test/java/org/apache/kafka/streams/processor/internals/StandbyTaskTest.java b/streams/src/test/java/org/apache/kafka/streams/processor/internals/StandbyTaskTest.java
index db48cb9..52599c6 100644
--- a/streams/src/test/java/org/apache/kafka/streams/processor/internals/StandbyTaskTest.java
+++ b/streams/src/test/java/org/apache/kafka/streams/processor/internals/StandbyTaskTest.java
@@ -160,7 +160,7 @@ public class StandbyTaskTest {
new PartitionInfo(storeChangelogTopicName2, 2, Node.noNode(), new Node[0], new Node[0])
));
baseDir = TestUtils.tempDirectory();
- stateDirectory = new StateDirectory(createConfig(baseDir), new MockTime());
+ stateDirectory = new StateDirectory(createConfig(baseDir), new MockTime(), true);
}
@After
diff --git a/streams/src/test/java/org/apache/kafka/streams/processor/internals/StateDirectoryTest.java b/streams/src/test/java/org/apache/kafka/streams/processor/internals/StateDirectoryTest.java
index e14d010..edb02c6 100644
--- a/streams/src/test/java/org/apache/kafka/streams/processor/internals/StateDirectoryTest.java
+++ b/streams/src/test/java/org/apache/kafka/streams/processor/internals/StateDirectoryTest.java
@@ -52,9 +52,8 @@ public class StateDirectoryTest {
private StateDirectory directory;
private File appDir;
- @Before
- public void before() {
- stateDir = new File(TestUtils.IO_TMP_DIR, TestUtils.randomString(5));
+ private void initializeStateDirectory(final boolean createStateDirectory) {
+ stateDir = new File(TestUtils.IO_TMP_DIR, "kafka-" + TestUtils.randomString(5));
directory = new StateDirectory(
new StreamsConfig(new Properties() {
{
@@ -63,10 +62,15 @@ public class StateDirectoryTest {
put(StreamsConfig.STATE_DIR_CONFIG, stateDir.getPath());
}
}),
- time);
+ time, createStateDirectory);
appDir = new File(stateDir, applicationId);
}
+ @Before
+ public void before() {
+ initializeStateDirectory(true);
+ }
+
@After
public void cleanup() throws IOException {
Utils.delete(stateDir);
@@ -138,7 +142,7 @@ public class StateDirectoryTest {
}
@Test
- public void shouldLockMulitpleTaskDirectories() throws IOException {
+ public void shouldLockMultipleTaskDirectories() throws IOException {
final TaskId taskId = new TaskId(0, 0);
final File task1Dir = directory.directoryForTask(taskId);
final TaskId taskId2 = new TaskId(1, 0);
@@ -254,7 +258,7 @@ public class StateDirectoryTest {
put(StreamsConfig.STATE_DIR_CONFIG, stateDir.getPath());
}
}),
- time);
+ time, true);
final File taskDir = stateDirectory.directoryForTask(new TaskId(0, 0));
assertTrue(stateDir.exists());
assertTrue(taskDir.exists());
@@ -296,14 +300,11 @@ public class StateDirectoryTest {
public void shouldNotLockStateDirLockedByAnotherThread() throws IOException, InterruptedException {
final TaskId taskId = new TaskId(0, 0);
final AtomicReference<IOException> exceptionOnThread = new AtomicReference<>();
- final Thread thread = new Thread(new Runnable() {
- @Override
- public void run() {
- try {
- directory.lock(taskId);
- } catch (final IOException e) {
- exceptionOnThread.set(e);
- }
+ final Thread thread = new Thread(() -> {
+ try {
+ directory.lock(taskId);
+ } catch (final IOException e) {
+ exceptionOnThread.set(e);
}
});
thread.start();
@@ -318,17 +319,14 @@ public class StateDirectoryTest {
final CountDownLatch lockLatch = new CountDownLatch(1);
final CountDownLatch unlockLatch = new CountDownLatch(1);
final AtomicReference<Exception> exceptionOnThread = new AtomicReference<>();
- final Thread thread = new Thread(new Runnable() {
- @Override
- public void run() {
- try {
- directory.lock(taskId);
- lockLatch.countDown();
- unlockLatch.await();
- directory.unlock(taskId);
- } catch (final Exception e) {
- exceptionOnThread.set(e);
- }
+ final Thread thread = new Thread(() -> {
+ try {
+ directory.lock(taskId);
+ lockLatch.countDown();
+ unlockLatch.await();
+ directory.unlock(taskId);
+ } catch (final Exception e) {
+ exceptionOnThread.set(e);
}
});
thread.start();
@@ -358,4 +356,39 @@ public class StateDirectoryTest {
files = Arrays.asList(appDir.listFiles());
assertEquals(0, files.size());
}
+
+ @Test
+ public void shouldNotCreateBaseDirectory() {
+ initializeStateDirectory(false);
+ assertFalse(stateDir.exists());
+ assertFalse(appDir.exists());
+ }
+
+ @Test
+ public void shouldNotCreateTaskStateDirectory() {
+ initializeStateDirectory(false);
+ final TaskId taskId = new TaskId(0, 0);
+ final File taskDirectory = directory.directoryForTask(taskId);
+ assertFalse(taskDirectory.exists());
+ }
+
+ @Test
+ public void shouldNotCreateGlobalStateDirectory() {
+ initializeStateDirectory(false);
+ final File globalStateDir = directory.globalStateDir();
+ assertFalse(globalStateDir.exists());
+ }
+
+ @Test
+ public void shouldLockTaskStateDirectoryWhenDirectoryCreationDisabled() throws IOException {
+ initializeStateDirectory(false);
+ final TaskId taskId = new TaskId(0, 0);
+ assertTrue(directory.lock(taskId));
+ }
+
+ @Test
+ public void shouldLockGlobalStateDirectoryWhenDirectoryCreationDisabled() throws IOException {
+ initializeStateDirectory(false);
+ assertTrue(directory.lockGlobalState());
+ }
}
\ No newline at end of file
diff --git a/streams/src/test/java/org/apache/kafka/streams/processor/internals/StreamTaskTest.java b/streams/src/test/java/org/apache/kafka/streams/processor/internals/StreamTaskTest.java
index b042e3c..d4b8511 100644
--- a/streams/src/test/java/org/apache/kafka/streams/processor/internals/StreamTaskTest.java
+++ b/streams/src/test/java/org/apache/kafka/streams/processor/internals/StreamTaskTest.java
@@ -167,7 +167,7 @@ public class StreamTaskTest {
@Before
public void setup() {
consumer.assign(asList(partition1, partition2));
- stateDirectory = new StateDirectory(createConfig(false), new MockTime());
+ stateDirectory = new StateDirectory(createConfig(false), new MockTime(), true);
}
@After
@@ -968,7 +968,7 @@ public class StreamTaskTest {
}
@Test
- public void shouldWrapProducerFencedExceptionWithTaskMigragedExceptionForBeginTransaction() {
+ public void shouldWrapProducerFencedExceptionWithTaskMigratedExceptionForBeginTransaction() {
task = createStatelessTask(createConfig(true));
producer.fenceProducer();
diff --git a/streams/src/test/java/org/apache/kafka/streams/processor/internals/StreamThreadTest.java b/streams/src/test/java/org/apache/kafka/streams/processor/internals/StreamThreadTest.java
index 11474a3..42e22f5 100644
--- a/streams/src/test/java/org/apache/kafka/streams/processor/internals/StreamThreadTest.java
+++ b/streams/src/test/java/org/apache/kafka/streams/processor/internals/StreamThreadTest.java
@@ -112,7 +112,7 @@ public class StreamThreadTest {
private final InternalStreamsBuilder internalStreamsBuilder = new InternalStreamsBuilder(new InternalTopologyBuilder());
private final StreamsConfig config = new StreamsConfig(configProps(false));
private final String stateDir = TestUtils.tempDirectory().getPath();
- private final StateDirectory stateDirectory = new StateDirectory(config, mockTime);
+ private final StateDirectory stateDirectory = new StateDirectory(config, mockTime, true);
private final ConsumedInternal<Object, Object> consumed = new ConsumedInternal<>();
private UUID processId = UUID.randomUUID();
diff --git a/streams/src/test/java/org/apache/kafka/streams/state/internals/StreamThreadStateStoreProviderTest.java b/streams/src/test/java/org/apache/kafka/streams/state/internals/StreamThreadStateStoreProviderTest.java
index ca059b4..9679429 100644
--- a/streams/src/test/java/org/apache/kafka/streams/state/internals/StreamThreadStateStoreProviderTest.java
+++ b/streams/src/test/java/org/apache/kafka/streams/state/internals/StreamThreadStateStoreProviderTest.java
@@ -99,7 +99,7 @@ public class StreamThreadStateStoreProviderTest {
final ProcessorTopology processorTopology = topology.getInternalBuilder(applicationId).build();
tasks = new HashMap<>();
- stateDirectory = new StateDirectory(streamsConfig, new MockTime());
+ stateDirectory = new StateDirectory(streamsConfig, new MockTime(), true);
taskOne = createStreamsTask(streamsConfig, clientSupplier, processorTopology, new TaskId(0, 0));
taskOne.initializeStateStores();
diff --git a/streams/test-utils/src/main/java/org/apache/kafka/streams/TopologyTestDriver.java b/streams/test-utils/src/main/java/org/apache/kafka/streams/TopologyTestDriver.java
index a11ae6b..a90afe7 100644
--- a/streams/test-utils/src/main/java/org/apache/kafka/streams/TopologyTestDriver.java
+++ b/streams/test-utils/src/main/java/org/apache/kafka/streams/TopologyTestDriver.java
@@ -246,6 +246,8 @@ public class TopologyTestDriver implements Closeable {
processorTopology = internalTopologyBuilder.build(null);
globalTopology = internalTopologyBuilder.buildGlobalStateTopology();
+ final boolean createStateDirectory = processorTopology.hasPersistentLocalStore() ||
+ (globalTopology != null && globalTopology.hasPersistentGlobalStore());
final Serializer<byte[]> bytesSerializer = new ByteArraySerializer();
producer = new MockProducer<byte[], byte[]>(true, bytesSerializer, bytesSerializer) {
@@ -256,7 +258,7 @@ public class TopologyTestDriver implements Closeable {
};
final MockConsumer<byte[], byte[]> consumer = new MockConsumer<>(OffsetResetStrategy.EARLIEST);
- stateDirectory = new StateDirectory(streamsConfig, mockWallClockTime);
+ stateDirectory = new StateDirectory(streamsConfig, mockWallClockTime, createStateDirectory);
final MetricConfig metricConfig = new MetricConfig()
.samples(streamsConfig.getInt(StreamsConfig.METRICS_NUM_SAMPLES_CONFIG))
diff --git a/streams/test-utils/src/test/java/org/apache/kafka/streams/TopologyTestDriverTest.java b/streams/test-utils/src/test/java/org/apache/kafka/streams/TopologyTestDriverTest.java
index 3e95c73..9f1c3a7 100644
--- a/streams/test-utils/src/test/java/org/apache/kafka/streams/TopologyTestDriverTest.java
+++ b/streams/test-utils/src/test/java/org/apache/kafka/streams/TopologyTestDriverTest.java
@@ -16,6 +16,7 @@
*/
package org.apache.kafka.streams;
+import java.io.File;
import java.time.Duration;
import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.apache.kafka.clients.producer.ProducerRecord;
@@ -41,6 +42,8 @@ import org.apache.kafka.streams.processor.ProcessorSupplier;
import org.apache.kafka.streams.processor.PunctuationType;
import org.apache.kafka.streams.processor.Punctuator;
import org.apache.kafka.streams.processor.StateStore;
+import org.apache.kafka.streams.processor.TaskId;
+import org.apache.kafka.streams.state.KeyValueBytesStoreSupplier;
import org.apache.kafka.streams.state.KeyValueIterator;
import org.apache.kafka.streams.state.KeyValueStore;
import org.apache.kafka.streams.state.Stores;
@@ -72,6 +75,7 @@ import static org.apache.kafka.common.utils.Utils.mkMap;
import static org.apache.kafka.common.utils.Utils.mkProperties;
import static org.hamcrest.CoreMatchers.equalTo;
import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertFalse;
import static org.junit.Assert.assertNotNull;
import static org.junit.Assert.assertThat;
import static org.junit.Assert.assertTrue;
@@ -773,14 +777,18 @@ public class TopologyTestDriverTest {
}
private void setup() {
+ setup(Stores.inMemoryKeyValueStore("aggStore"));
+ }
+
+ private void setup(final KeyValueBytesStoreSupplier storeSupplier) {
final Topology topology = new Topology();
topology.addSource("sourceProcessor", "input-topic");
topology.addProcessor("aggregator", new CustomMaxAggregatorSupplier(), "sourceProcessor");
topology.addStateStore(Stores.keyValueStoreBuilder(
- Stores.inMemoryKeyValueStore("aggStore"),
- Serdes.String(),
- Serdes.Long()),
- "aggregator");
+ storeSupplier,
+ Serdes.String(),
+ Serdes.Long()),
+ "aggregator");
topology.addSink("sinkProcessor", "result-topic", "aggregator");
config.setProperty(StreamsConfig.DEFAULT_KEY_SERDE_CLASS_CONFIG, Serdes.String().getClass().getName());
@@ -1070,4 +1078,25 @@ public class TopologyTestDriverTest {
assertEquals(str, exception.getMessage());
}
}
+
+ @Test
+ public void shouldNotCreateStateDirectoryForStatelessTopology() {
+ setup();
+ final String stateDir = config.getProperty(StreamsConfig.STATE_DIR_CONFIG);
+ final File appDir = new File(stateDir, config.getProperty(StreamsConfig.APPLICATION_ID_CONFIG));
+ assertFalse(appDir.exists());
+ }
+
+ @Test
+ public void shouldCreateStateDirectoryForStatefulTopology() {
+ setup(Stores.persistentKeyValueStore("aggStore"));
+ final String stateDir = config.getProperty(StreamsConfig.STATE_DIR_CONFIG);
+ final File appDir = new File(stateDir, config.getProperty(StreamsConfig.APPLICATION_ID_CONFIG));
+
+ assertTrue(appDir.exists());
+ assertTrue(appDir.isDirectory());
+
+ final TaskId taskId = new TaskId(0, 0);
+ assertTrue(new File(appDir, taskId.toString()).exists());
+ }
}