You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@kafka.apache.org by ab...@apache.org on 2021/02/05 19:26:25 UTC

[kafka] branch 2.7 updated: KAFKA-10716: persist UUID in state directory for stable processId across restarts - 2.7 (#10060)

This is an automated email from the ASF dual-hosted git repository.

ableegoldman pushed a commit to branch 2.7
in repository https://gitbox.apache.org/repos/asf/kafka.git


The following commit(s) were added to refs/heads/2.7 by this push:
     new 62a58e1  KAFKA-10716: persist UUID in state directory for stable processId across restarts - 2.7 (#10060)
62a58e1 is described below

commit 62a58e1b2807c7983819b4a45b5a73cbdadedc7b
Author: A. Sophie Blee-Goldman <so...@confluent.io>
AuthorDate: Fri Feb 5 11:24:52 2021 -0800

    KAFKA-10716: persist UUID in state directory for stable processId across restarts - 2.7 (#10060)
    
    To stabilize the task assignment across restarts of the JVM we need some way to persist the process-specific UUID. We can just write it to a file in the state directory, and initialize it from there or create a new one if no prior UUID exists. Port of PR #9978 to the 2.7 branch
    
    Reviewers: Walker Carlson <wc...@confluent.io>, Leah Thomas <lt...@confluent.io>
---
 .../org/apache/kafka/streams/KafkaStreams.java     |  39 +--
 .../processor/internals/StateDirectory.java        | 104 ++++++++
 .../org/apache/kafka/streams/KafkaStreamsTest.java |   3 +
 .../KStreamRepartitionIntegrationTest.java         |  20 +-
 ...bleForeignKeyInnerJoinMultiIntegrationTest.java |  45 ++--
 .../integration/StandbyTaskEOSIntegrationTest.java | 279 +++++++++++----------
 .../integration/StoreUpgradeIntegrationTest.java   |  18 +-
 .../integration/utils/IntegrationTestUtils.java    |  11 +
 .../processor/internals/StateDirectoryTest.java    |  74 ++++++
 9 files changed, 414 insertions(+), 179 deletions(-)

diff --git a/streams/src/main/java/org/apache/kafka/streams/KafkaStreams.java b/streams/src/main/java/org/apache/kafka/streams/KafkaStreams.java
index b5e64e8..7ec6a0b 100644
--- a/streams/src/main/java/org/apache/kafka/streams/KafkaStreams.java
+++ b/streams/src/main/java/org/apache/kafka/streams/KafkaStreams.java
@@ -660,8 +660,25 @@ public class KafkaStreams implements AutoCloseable {
         this.config = config;
         this.time = time;
 
-        // The application ID is a required config and hence should always have value
-        final UUID processId = UUID.randomUUID();
+        // re-write the physical topology according to the config
+        internalTopologyBuilder.rewriteTopology(config);
+
+        // sanity check to fail-fast in case we cannot build a ProcessorTopology due to an exception
+        taskTopology = internalTopologyBuilder.buildTopology();
+        globalTaskTopology = internalTopologyBuilder.buildGlobalStateTopology();
+
+        final boolean hasGlobalTopology = globalTaskTopology != null;
+        final boolean hasPersistentStores = taskTopology.hasPersistentLocalStore() ||
+            (hasGlobalTopology && globalTaskTopology.hasPersistentGlobalStore());
+
+        final UUID processId;
+        try {
+            stateDirectory = new StateDirectory(config, time, hasPersistentStores);
+            processId = stateDirectory.initializeProcessId();
+        } catch (final ProcessorStateException fatal) {
+            throw new StreamsException(fatal);
+        }
+
         final String userClientId = config.getString(StreamsConfig.CLIENT_ID_CONFIG);
         final String applicationId = config.getString(StreamsConfig.APPLICATION_ID_CONFIG);
         if (userClientId.length() <= 0) {
@@ -700,12 +717,6 @@ public class KafkaStreams implements AutoCloseable {
         log.info("Kafka Streams version: {}", ClientMetrics.version());
         log.info("Kafka Streams commit ID: {}", ClientMetrics.commitId());
 
-        // re-write the physical topology according to the config
-        internalTopologyBuilder.rewriteTopology(config);
-
-        // sanity check to fail-fast in case we cannot build a ProcessorTopology due to an exception
-        taskTopology = internalTopologyBuilder.buildTopology();
-
         streamsMetadataState = new StreamsMetadataState(
                 internalTopologyBuilder,
                 parseHostInfo(config.getString(StreamsConfig.APPLICATION_SERVER_CONFIG)));
@@ -721,9 +732,6 @@ public class KafkaStreams implements AutoCloseable {
         // create the stream thread, global update thread, and cleanup thread
         threads = new StreamThread[numStreamThreads];
 
-        globalTaskTopology = internalTopologyBuilder.buildGlobalStateTopology();
-        final boolean hasGlobalTopology = globalTaskTopology != null;
-
         if (numStreamThreads == 0 && !hasGlobalTopology) {
             log.error("Topology with no input topics will create no stream threads and no global thread.");
             throw new TopologyException("Topology has no stream threads and no global threads, " +
@@ -736,14 +744,6 @@ public class KafkaStreams implements AutoCloseable {
             log.warn("Negative cache size passed in. Reverting to cache size of 0 bytes.");
         }
         final long cacheSizePerThread = totalCacheSize / (threads.length + (hasGlobalTopology ? 1 : 0));
-        final boolean hasPersistentStores = taskTopology.hasPersistentLocalStore() ||
-                (hasGlobalTopology && globalTaskTopology.hasPersistentGlobalStore());
-
-        try {
-            stateDirectory = new StateDirectory(config, time, hasPersistentStores);
-        } catch (final ProcessorStateException fatal) {
-            throw new StreamsException(fatal);
-        }
 
         final StateRestoreListener delegatingStateRestoreListener = new DelegatingStateRestoreListener();
         GlobalStreamThread.State globalThreadState = null;
@@ -977,6 +977,7 @@ public class KafkaStreams implements AutoCloseable {
                     globalStreamThread = null;
                 }
 
+                stateDirectory.close();
                 adminClient.close();
 
                 streamsMetrics.removeAllClientLevelMetrics();
diff --git a/streams/src/main/java/org/apache/kafka/streams/processor/internals/StateDirectory.java b/streams/src/main/java/org/apache/kafka/streams/processor/internals/StateDirectory.java
index e388b6a..97d0865 100644
--- a/streams/src/main/java/org/apache/kafka/streams/processor/internals/StateDirectory.java
+++ b/streams/src/main/java/org/apache/kafka/streams/processor/internals/StateDirectory.java
@@ -16,6 +16,9 @@
  */
 package org.apache.kafka.streams.processor.internals;
 
+import com.fasterxml.jackson.annotation.JsonIgnoreProperties;
+import com.fasterxml.jackson.annotation.JsonProperty;
+import com.fasterxml.jackson.databind.ObjectMapper;
 import org.apache.kafka.common.utils.Time;
 import org.apache.kafka.common.utils.Utils;
 import org.apache.kafka.streams.StreamsConfig;
@@ -39,6 +42,7 @@ import java.nio.file.attribute.PosixFilePermissions;
 import java.util.Collections;
 import java.util.HashMap;
 import java.util.Set;
+import java.util.UUID;
 import java.util.regex.Pattern;
 
 import static org.apache.kafka.streams.processor.internals.StateManagerUtil.CHECKPOINT_FILE_NAME;
@@ -54,6 +58,25 @@ public class StateDirectory {
     private static final Logger log = LoggerFactory.getLogger(StateDirectory.class);
     static final String LOCK_FILE_NAME = ".lock";
 
+    /* The process file is used to persist the process id across restarts.
+     * For compatibility reasons you should only ever add fields to the json schema
+     */
+    static final String PROCESS_FILE_NAME = "kafka-streams-process-metadata";
+
+    @JsonIgnoreProperties(ignoreUnknown = true)
+    static class StateDirectoryProcessFile {
+        @JsonProperty
+        private final UUID processId;
+
+        public StateDirectoryProcessFile() {
+            this.processId = null;
+        }
+
+        StateDirectoryProcessFile(final UUID processId) {
+            this.processId = processId;
+        }
+    }
+
     private final Object taskDirCreationLock = new Object();
     private final Time time;
     private final String appId;
@@ -62,6 +85,9 @@ public class StateDirectory {
     private final HashMap<TaskId, FileChannel> channels = new HashMap<>();
     private final HashMap<TaskId, LockAndOwner> locks = new HashMap<>();
 
+    private FileChannel stateDirLockChannel;
+    private FileLock stateDirLock;
+
     private FileChannel globalStateChannel;
     private FileLock globalStateLock;
 
@@ -134,6 +160,61 @@ public class StateDirectory {
     }
 
     /**
+     * @return true if the state directory was successfully locked
+     */
+    private boolean lockStateDirectory() {
+        final File lockFile = new File(stateDir, LOCK_FILE_NAME);
+        try {
+            stateDirLockChannel = FileChannel.open(lockFile.toPath(), StandardOpenOption.CREATE, StandardOpenOption.WRITE);
+            stateDirLock = tryLock(stateDirLockChannel);
+        } catch (final IOException e) {
+            log.error("Unable to lock the state directory due to unexpected exception", e);
+            throw new ProcessorStateException("Failed to lock the state directory during startup", e);
+        }
+
+        return stateDirLock != null;
+    }
+
+    public UUID initializeProcessId() {
+        if (!hasPersistentStores) {
+            return UUID.randomUUID();
+        }
+
+        if (!lockStateDirectory()) {
+            log.error("Unable to obtain lock as state directory is already locked by another process");
+            throw new StreamsException("Unable to initialize state, this can happen if multiple instances of " +
+                                           "Kafka Streams are running in the same state directory");
+        }
+
+        final File processFile = new File(stateDir, PROCESS_FILE_NAME);
+        final ObjectMapper mapper = new ObjectMapper();
+
+        try {
+            if (processFile.exists()) {
+                try {
+                    final StateDirectoryProcessFile processFileData = mapper.readValue(processFile, StateDirectoryProcessFile.class);
+                    log.info("Reading UUID from process file: {}", processFileData.processId);
+                    if (processFileData.processId != null) {
+                        return processFileData.processId;
+                    }
+                } catch (final Exception e) {
+                    log.warn("Failed to read json process file", e);
+                }
+            }
+
+            final StateDirectoryProcessFile processFileData = new StateDirectoryProcessFile(UUID.randomUUID());
+            log.info("No process id found on disk, got fresh process id {}", processFileData.processId);
+
+            mapper.writeValue(processFile, processFileData);
+            return processFileData.processId;
+        } catch (final IOException e) {
+            log.error("Unable to read/write process file due to unexpected exception", e);
+            throw new ProcessorStateException(e);
+        }
+    }
+
+
+    /**
      * Get or create the directory for the provided {@link TaskId}.
      * @return directory for the {@link TaskId}
      * @throws ProcessorStateException if the task directory does not exists and could not be created
@@ -310,6 +391,29 @@ public class StateDirectory {
         }
     }
 
+    public void close() {
+        if (hasPersistentStores) {
+            try {
+                stateDirLock.release();
+                stateDirLockChannel.close();
+
+                stateDirLock = null;
+                stateDirLockChannel = null;
+            } catch (final IOException e) {
+                log.error("Unexpected exception while unlocking the state dir", e);
+                throw new StreamsException("Failed to release the lock on the state directory", e);
+            }
+
+            // all threads should be stopped and cleaned up by now, so none should remain holding a lock
+            if (locks.isEmpty()) {
+                log.error("Some task directories still locked while closing state, this indicates unclean shutdown: {}", locks);
+            }
+            if (globalStateLock != null) {
+                log.error("Global state lock is present while closing the state, this indicates unclean shutdown");
+            }
+        }
+    }
+
     public synchronized void clean() {
         // remove task dirs
         try {
diff --git a/streams/src/test/java/org/apache/kafka/streams/KafkaStreamsTest.java b/streams/src/test/java/org/apache/kafka/streams/KafkaStreamsTest.java
index f259696..6e6a3ea 100644
--- a/streams/src/test/java/org/apache/kafka/streams/KafkaStreamsTest.java
+++ b/streams/src/test/java/org/apache/kafka/streams/KafkaStreamsTest.java
@@ -830,6 +830,8 @@ public class KafkaStreamsTest {
             anyObject(Time.class),
             EasyMock.eq(true)
         ).andReturn(stateDirectory);
+        EasyMock.expect(stateDirectory.initializeProcessId()).andReturn(UUID.randomUUID());
+        stateDirectory.close();
         PowerMock.replayAll(Executors.class, cleanupSchedule, stateDirectory);
 
         props.setProperty(StreamsConfig.STATE_CLEANUP_DELAY_MS_CONFIG, "1");
@@ -999,6 +1001,7 @@ public class KafkaStreamsTest {
             anyObject(Time.class),
             EasyMock.eq(shouldFilesExist)
         ).andReturn(stateDirectory);
+        EasyMock.expect(stateDirectory.initializeProcessId()).andReturn(UUID.randomUUID());
 
         PowerMock.replayAll();
 
diff --git a/streams/src/test/java/org/apache/kafka/streams/integration/KStreamRepartitionIntegrationTest.java b/streams/src/test/java/org/apache/kafka/streams/integration/KStreamRepartitionIntegrationTest.java
index a12bb85..ee1541d 100644
--- a/streams/src/test/java/org/apache/kafka/streams/integration/KStreamRepartitionIntegrationTest.java
+++ b/streams/src/test/java/org/apache/kafka/streams/integration/KStreamRepartitionIntegrationTest.java
@@ -622,8 +622,10 @@ public class KStreamRepartitionIntegrationTest {
                .to(outputTopic);
 
         startStreams(builder);
-        final KafkaStreams kafkaStreamsToClose = startStreams(builder);
-
+        final Properties streamsToCloseConfigs = new Properties();
+        streamsToCloseConfigs.putAll(streamsConfiguration);
+        streamsToCloseConfigs.put(StreamsConfig.STATE_DIR_CONFIG, TestUtils.tempDirectory().getPath() + "-2");
+        final KafkaStreams kafkaStreamsToClose = startStreams(builder, streamsToCloseConfigs);
         validateReceivedMessages(
             new StringDeserializer(),
             new LongDeserializer(),
@@ -724,12 +726,24 @@ public class KStreamRepartitionIntegrationTest {
     }
 
     private KafkaStreams startStreams(final StreamsBuilder builder) throws InterruptedException {
-        return startStreams(builder, REBALANCING, RUNNING, null);
+        return startStreams(builder, REBALANCING, RUNNING, streamsConfiguration, null);
+    }
+
+    private KafkaStreams startStreams(final StreamsBuilder builder, final Properties streamsConfiguration) throws InterruptedException {
+        return startStreams(builder, REBALANCING, RUNNING, streamsConfiguration, null);
+    }
+
+    private KafkaStreams startStreams(final StreamsBuilder builder,
+                                      final State expectedOldState,
+                                      final State expectedNewState,
+                                      final Thread.UncaughtExceptionHandler uncaughtExceptionHandler) throws InterruptedException {
+        return startStreams(builder, expectedOldState, expectedNewState, streamsConfiguration, uncaughtExceptionHandler);
     }
 
     private KafkaStreams startStreams(final StreamsBuilder builder,
                                       final State expectedOldState,
                                       final State expectedNewState,
+                                      final Properties streamsConfiguration,
                                       final UncaughtExceptionHandler uncaughtExceptionHandler) throws InterruptedException {
         final CountDownLatch latch;
         final KafkaStreams kafkaStreams = new KafkaStreams(builder.build(streamsConfiguration), streamsConfiguration);
diff --git a/streams/src/test/java/org/apache/kafka/streams/integration/KTableKTableForeignKeyInnerJoinMultiIntegrationTest.java b/streams/src/test/java/org/apache/kafka/streams/integration/KTableKTableForeignKeyInnerJoinMultiIntegrationTest.java
index eeac500..b52b638 100644
--- a/streams/src/test/java/org/apache/kafka/streams/integration/KTableKTableForeignKeyInnerJoinMultiIntegrationTest.java
+++ b/streams/src/test/java/org/apache/kafka/streams/integration/KTableKTableForeignKeyInnerJoinMultiIntegrationTest.java
@@ -50,7 +50,6 @@ import org.junit.Test;
 import org.junit.experimental.categories.Category;
 
 import java.io.IOException;
-import java.util.Arrays;
 import java.util.Collections;
 import java.util.HashSet;
 import java.util.List;
@@ -58,6 +57,7 @@ import java.util.Properties;
 import java.util.Set;
 import java.util.function.Function;
 
+import static java.util.Arrays.asList;
 import static org.junit.Assert.assertEquals;
 
 @Category({IntegrationTest.class})
@@ -71,7 +71,9 @@ public class KTableKTableForeignKeyInnerJoinMultiIntegrationTest {
     private final static String TABLE_2 = "table2";
     private final static String TABLE_3 = "table3";
     private final static String OUTPUT = "output-";
-    private static Properties streamsConfig;
+    private final Properties streamsConfig = getStreamsConfig();
+    private final Properties streamsConfigTwo = getStreamsConfig();
+    private final Properties streamsConfigThree = getStreamsConfig();
     private KafkaStreams streams;
     private KafkaStreams streamsTwo;
     private KafkaStreams streamsThree;
@@ -105,14 +107,8 @@ public class KTableKTableForeignKeyInnerJoinMultiIntegrationTest {
         PRODUCER_CONFIG_3.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, IntegerSerializer.class);
         PRODUCER_CONFIG_3.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, StringSerializer.class);
 
-        streamsConfig = new Properties();
-        streamsConfig.put(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG, CLUSTER.bootstrapServers());
-        streamsConfig.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest");
-        streamsConfig.put(StreamsConfig.STATE_DIR_CONFIG, TestUtils.tempDirectory().getPath());
-        streamsConfig.put(StreamsConfig.CACHE_MAX_BYTES_BUFFERING_CONFIG, 0);
-        streamsConfig.put(StreamsConfig.COMMIT_INTERVAL_MS_CONFIG, 100);
 
-        final List<KeyValue<Integer, Float>> table1 = Arrays.asList(
+        final List<KeyValue<Integer, Float>> table1 = asList(
             new KeyValue<>(1, 1.33f),
             new KeyValue<>(2, 2.22f),
             new KeyValue<>(3, -1.22f), //Won't be joined in yet.
@@ -120,7 +116,7 @@ public class KTableKTableForeignKeyInnerJoinMultiIntegrationTest {
         );
 
         //Partitions pre-computed using the default Murmur2 hash, just to ensure that all 3 partitions will be exercised.
-        final List<KeyValue<String, Long>> table2 = Arrays.asList(
+        final List<KeyValue<String, Long>> table2 = asList(
             new KeyValue<>("0", 0L),  //partition 2
             new KeyValue<>("1", 10L), //partition 0
             new KeyValue<>("2", 20L), //partition 2
@@ -150,7 +146,12 @@ public class KTableKTableForeignKeyInnerJoinMultiIntegrationTest {
 
     @Before
     public void before() throws IOException {
-        IntegrationTestUtils.purgeLocalStreamsState(streamsConfig);
+        final String stateDirBasePath = TestUtils.tempDirectory().getPath();
+        streamsConfig.put(StreamsConfig.STATE_DIR_CONFIG, stateDirBasePath + "-1");
+        streamsConfigTwo.put(StreamsConfig.STATE_DIR_CONFIG, stateDirBasePath + "-2");
+        streamsConfigThree.put(StreamsConfig.STATE_DIR_CONFIG, stateDirBasePath + "-3");
+
+        IntegrationTestUtils.purgeLocalStreamsState(asList(streamsConfig, streamsConfigTwo, streamsConfigThree));
     }
 
     @After
@@ -187,11 +188,10 @@ public class KTableKTableForeignKeyInnerJoinMultiIntegrationTest {
                                         final boolean verifyQueryableState) throws Exception {
         final String queryableName = verifyQueryableState ? joinType + "-store1" : null;
         final String queryableNameTwo = verifyQueryableState ? joinType + "-store2" : null;
-        streamsConfig.put(StreamsConfig.APPLICATION_ID_CONFIG, joinType + queryableName);
 
-        streams = prepareTopology(queryableName, queryableNameTwo);
-        streamsTwo = prepareTopology(queryableName, queryableNameTwo);
-        streamsThree = prepareTopology(queryableName, queryableNameTwo);
+        streams = prepareTopology(queryableName, queryableNameTwo, streamsConfig);
+        streamsTwo = prepareTopology(queryableName, queryableNameTwo, streamsConfigTwo);
+        streamsThree = prepareTopology(queryableName, queryableNameTwo, streamsConfigThree);
         streams.start();
         streamsTwo.start();
         streamsThree.start();
@@ -204,7 +204,20 @@ public class KTableKTableForeignKeyInnerJoinMultiIntegrationTest {
         assertEquals(expectedResult, result);
     }
 
-    private KafkaStreams prepareTopology(final String queryableName, final String queryableNameTwo) {
+    private static Properties getStreamsConfig() {
+        final Properties streamsConfig = new Properties();
+        streamsConfig.put(StreamsConfig.APPLICATION_ID_CONFIG, "KTable-FKJ-Multi");
+        streamsConfig.put(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG, CLUSTER.bootstrapServers());
+        streamsConfig.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest");
+        streamsConfig.put(StreamsConfig.CACHE_MAX_BYTES_BUFFERING_CONFIG, 0);
+        streamsConfig.put(StreamsConfig.COMMIT_INTERVAL_MS_CONFIG, 100);
+        return streamsConfig;
+    }
+
+    private static KafkaStreams prepareTopology(final String queryableName,
+                                                final String queryableNameTwo,
+                                                final Properties streamsConfig) {
+
         final UniqueTopicSerdeScope serdeScope = new UniqueTopicSerdeScope();
         final StreamsBuilder builder = new StreamsBuilder();
 
diff --git a/streams/src/test/java/org/apache/kafka/streams/integration/StandbyTaskEOSIntegrationTest.java b/streams/src/test/java/org/apache/kafka/streams/integration/StandbyTaskEOSIntegrationTest.java
index 93b4f2f..e456990 100644
--- a/streams/src/test/java/org/apache/kafka/streams/integration/StandbyTaskEOSIntegrationTest.java
+++ b/streams/src/test/java/org/apache/kafka/streams/integration/StandbyTaskEOSIntegrationTest.java
@@ -39,6 +39,7 @@ import org.apache.kafka.streams.state.QueryableStoreTypes;
 import org.apache.kafka.streams.state.Stores;
 import org.apache.kafka.streams.state.internals.OffsetCheckpoint;
 import org.apache.kafka.test.TestUtils;
+import org.junit.After;
 import org.junit.Before;
 import org.junit.ClassRule;
 import org.junit.Rule;
@@ -91,6 +92,10 @@ public class StandbyTaskEOSIntegrationTest {
     private String storeName;
     private String outputTopic;
 
+    private KafkaStreams streamInstanceOne;
+    private KafkaStreams streamInstanceTwo;
+    private KafkaStreams streamInstanceOneRecovery;
+
     @ClassRule
     public static final EmbeddedKafkaCluster CLUSTER = new EmbeddedKafkaCluster(3);
 
@@ -109,6 +114,19 @@ public class StandbyTaskEOSIntegrationTest {
         CLUSTER.createTopic(outputTopic, 1, 3);
     }
 
+    @After
+    public void cleanUp() {
+        if (streamInstanceOne != null) {
+            streamInstanceOne.close();
+        }
+        if (streamInstanceTwo != null) {
+            streamInstanceTwo.close();
+        }
+        if (streamInstanceOneRecovery != null) {
+            streamInstanceOneRecovery.close();
+        }
+    }
+
     @Test
     public void shouldSurviveWithOneTaskAsStandby() throws Exception {
         IntegrationTestUtils.produceKeyValuesSynchronouslyWithTimestamp(
@@ -129,21 +147,19 @@ public class StandbyTaskEOSIntegrationTest {
 
         final CountDownLatch instanceLatch = new CountDownLatch(1);
 
-        try (
-            final KafkaStreams streamInstanceOne = buildStreamWithDirtyStateDir(stateDirPath + "/" + appId + "-1/", instanceLatch);
-            final KafkaStreams streamInstanceTwo = buildStreamWithDirtyStateDir(stateDirPath + "/" + appId + "-2/", instanceLatch)
-        ) {
-            startApplicationAndWaitUntilRunning(asList(streamInstanceOne, streamInstanceTwo), Duration.ofSeconds(60));
+        streamInstanceOne = buildStreamWithDirtyStateDir(stateDirPath + "/" + appId + "-1/", instanceLatch);
+        streamInstanceTwo = buildStreamWithDirtyStateDir(stateDirPath + "/" + appId + "-2/", instanceLatch);
+        startApplicationAndWaitUntilRunning(asList(streamInstanceOne, streamInstanceTwo), Duration.ofSeconds(60));
 
-            // Wait for the record to be processed
-            assertTrue(instanceLatch.await(15, TimeUnit.SECONDS));
+        // Wait for the record to be processed
+        assertTrue(instanceLatch.await(15, TimeUnit.SECONDS));
 
-            streamInstanceOne.close(Duration.ZERO);
-            streamInstanceTwo.close(Duration.ZERO);
+        streamInstanceOne.close(Duration.ZERO);
+        streamInstanceTwo.close(Duration.ZERO);
+
+        streamInstanceOne.cleanUp();
+        streamInstanceTwo.cleanUp();
 
-            streamInstanceOne.cleanUp();
-            streamInstanceTwo.cleanUp();
-        }
     }
 
     private KafkaStreams buildStreamWithDirtyStateDir(final String stateDirPath,
@@ -165,10 +181,10 @@ public class StandbyTaskEOSIntegrationTest {
 
         builder.stream(inputTopic,
                        Consumed.with(Serdes.Integer(), Serdes.Integer()))
-               .groupByKey()
-               .count()
-               .toStream()
-               .peek((key, value) -> recordProcessLatch.countDown());
+            .groupByKey()
+            .count()
+            .toStream()
+            .peek((key, value) -> recordProcessLatch.countDown());
 
         return new KafkaStreams(builder.build(), props);
     }
@@ -192,124 +208,123 @@ public class StandbyTaskEOSIntegrationTest {
             10L + time
         );
 
-        try (
-            final KafkaStreams streamInstanceOne = buildWithDeduplicationTopology(base + "-1");
-            final KafkaStreams streamInstanceTwo = buildWithDeduplicationTopology(base + "-2");
-            final KafkaStreams streamInstanceOneRecovery = buildWithDeduplicationTopology(base + "-1")
-        ) {
-            // start first instance and wait for processing
-            startApplicationAndWaitUntilRunning(Collections.singletonList(streamInstanceOne), Duration.ofSeconds(30));
-            IntegrationTestUtils.waitUntilMinRecordsReceived(
-                TestUtils.consumerConfig(
-                    CLUSTER.bootstrapServers(),
-                    IntegerDeserializer.class,
-                    IntegerDeserializer.class
-                ),
-                outputTopic,
-                1
-            );
-
-            // start second instance and wait for standby replication
-            startApplicationAndWaitUntilRunning(Collections.singletonList(streamInstanceTwo), Duration.ofSeconds(30));
-            waitForCondition(
-                () -> streamInstanceTwo.store(
-                    StoreQueryParameters.fromNameAndType(
-                        storeName,
-                        QueryableStoreTypes.<Integer, Integer>keyValueStore()
-                    ).enableStaleStores()
-                ).get(KEY_0) != null,
-                REBALANCE_TIMEOUT,
-                "Could not get key from standby store"
-            );
-            // sanity check that first instance is still active
-            waitForCondition(
-                () -> streamInstanceOne.store(
-                    StoreQueryParameters.fromNameAndType(
-                        storeName,
-                        QueryableStoreTypes.<Integer, Integer>keyValueStore()
-                    )
-                ).get(KEY_0) != null,
-                "Could not get key from main store"
-            );
-
-            // inject poison pill and wait for crash of first instance and recovery on second instance
-            IntegrationTestUtils.produceKeyValuesSynchronouslyWithTimestamp(
-                inputTopic,
-                Collections.singletonList(
-                    new KeyValue<>(KEY_1, 0)
-                ),
-                TestUtils.producerConfig(
-                    CLUSTER.bootstrapServers(),
-                    IntegerSerializer.class,
-                    IntegerSerializer.class,
-                    new Properties()
-                ),
-                10L + time
-            );
-            waitForCondition(
-                () -> streamInstanceOne.state() == KafkaStreams.State.ERROR,
-                "Stream instance 1 did not go into error state"
-            );
-            streamInstanceOne.close();
+        streamInstanceOne = buildWithDeduplicationTopology(base + "-1");
+        streamInstanceTwo = buildWithDeduplicationTopology(base + "-2");
+
+        // start first instance and wait for processing
+        startApplicationAndWaitUntilRunning(Collections.singletonList(streamInstanceOne), Duration.ofSeconds(30));
+        IntegrationTestUtils.waitUntilMinRecordsReceived(
+            TestUtils.consumerConfig(
+                CLUSTER.bootstrapServers(),
+                IntegerDeserializer.class,
+                IntegerDeserializer.class
+            ),
+            outputTopic,
+            1
+        );
 
-            IntegrationTestUtils.waitUntilMinRecordsReceived(
-                TestUtils.consumerConfig(
-                    CLUSTER.bootstrapServers(),
-                    IntegerDeserializer.class,
-                    IntegerDeserializer.class
-                ),
-                outputTopic,
-                2
-            );
-
-            // "restart" first client and wait for standby recovery
-            // (could actually also be active, but it does not matter as long as we enable "state stores"
-            startApplicationAndWaitUntilRunning(
-                Collections.singletonList(streamInstanceOneRecovery),
-                Duration.ofSeconds(30)
-            );
-            waitForCondition(
-                () -> streamInstanceOneRecovery.store(
-                    StoreQueryParameters.fromNameAndType(
-                        storeName,
-                        QueryableStoreTypes.<Integer, Integer>keyValueStore()
-                    ).enableStaleStores()
-                ).get(KEY_0) != null,
-                "Could not get key from recovered standby store"
-            );
+        // start second instance and wait for standby replication
+        startApplicationAndWaitUntilRunning(Collections.singletonList(streamInstanceTwo), Duration.ofSeconds(30));
+        waitForCondition(
+            () -> streamInstanceTwo.store(
+                StoreQueryParameters.fromNameAndType(
+                    storeName,
+                    QueryableStoreTypes.<Integer, Integer>keyValueStore()
+                ).enableStaleStores()
+            ).get(KEY_0) != null,
+            REBALANCE_TIMEOUT,
+            "Could not get key from standby store"
+        );
+        // sanity check that first instance is still active
+        waitForCondition(
+            () -> streamInstanceOne.store(
+                StoreQueryParameters.fromNameAndType(
+                    storeName,
+                    QueryableStoreTypes.<Integer, Integer>keyValueStore()
+                )
+            ).get(KEY_0) != null,
+            "Could not get key from main store"
+        );
 
-            streamInstanceTwo.close();
-            waitForCondition(
-                () -> streamInstanceOneRecovery.store(
-                    StoreQueryParameters.fromNameAndType(
-                        storeName,
-                        QueryableStoreTypes.<Integer, Integer>keyValueStore()
-                    )
-                ).get(KEY_0) != null,
-                REBALANCE_TIMEOUT,
-                "Could not get key from recovered main store"
-            );
-
-            // re-inject poison pill and wait for crash of first instance
-            skipRecord.set(false);
-            IntegrationTestUtils.produceKeyValuesSynchronouslyWithTimestamp(
-                inputTopic,
-                Collections.singletonList(
-                    new KeyValue<>(KEY_1, 0)
-                ),
-                TestUtils.producerConfig(
-                    CLUSTER.bootstrapServers(),
-                    IntegerSerializer.class,
-                    IntegerSerializer.class,
-                    new Properties()
-                ),
-                10L + time
-            );
-            waitForCondition(
-                () -> streamInstanceOneRecovery.state() == KafkaStreams.State.ERROR,
-                "Stream instance 1 did not go into error state"
-            );
-        }
+        // inject poison pill and wait for crash of first instance and recovery on second instance
+        IntegrationTestUtils.produceKeyValuesSynchronouslyWithTimestamp(
+            inputTopic,
+            Collections.singletonList(
+                new KeyValue<>(KEY_1, 0)
+            ),
+            TestUtils.producerConfig(
+                CLUSTER.bootstrapServers(),
+                IntegerSerializer.class,
+                IntegerSerializer.class,
+                new Properties()
+            ),
+            10L + time
+        );
+        waitForCondition(
+            () -> streamInstanceOne.state() == KafkaStreams.State.ERROR,
+            "Stream instance 1 did not go into error state"
+        );
+        streamInstanceOne.close();
+
+        IntegrationTestUtils.waitUntilMinRecordsReceived(
+            TestUtils.consumerConfig(
+                CLUSTER.bootstrapServers(),
+                IntegerDeserializer.class,
+                IntegerDeserializer.class
+            ),
+            outputTopic,
+            2
+        );
+
+        streamInstanceOneRecovery = buildWithDeduplicationTopology(base + "-1");
+
+        // "restart" first client and wait for standby recovery
+        // (could actually also be active, but it does not matter as long as we enable "state stores"
+        startApplicationAndWaitUntilRunning(
+            Collections.singletonList(streamInstanceOneRecovery),
+            Duration.ofSeconds(30)
+        );
+        waitForCondition(
+            () -> streamInstanceOneRecovery.store(
+                StoreQueryParameters.fromNameAndType(
+                    storeName,
+                    QueryableStoreTypes.<Integer, Integer>keyValueStore()
+                ).enableStaleStores()
+            ).get(KEY_0) != null,
+            "Could not get key from recovered standby store"
+        );
+
+        streamInstanceTwo.close();
+        waitForCondition(
+            () -> streamInstanceOneRecovery.store(
+                StoreQueryParameters.fromNameAndType(
+                    storeName,
+                    QueryableStoreTypes.<Integer, Integer>keyValueStore()
+                )
+            ).get(KEY_0) != null,
+            REBALANCE_TIMEOUT,
+            "Could not get key from recovered main store"
+        );
+
+        // re-inject poison pill and wait for crash of first instance
+        skipRecord.set(false);
+        IntegrationTestUtils.produceKeyValuesSynchronouslyWithTimestamp(
+            inputTopic,
+            Collections.singletonList(
+                new KeyValue<>(KEY_1, 0)
+            ),
+            TestUtils.producerConfig(
+                CLUSTER.bootstrapServers(),
+                IntegerSerializer.class,
+                IntegerSerializer.class,
+                new Properties()
+            ),
+            10L + time
+        );
+        waitForCondition(
+            () -> streamInstanceOneRecovery.state() == KafkaStreams.State.ERROR,
+            "Stream instance 1 did not go into error state"
+        );
     }
 
     private KafkaStreams buildWithDeduplicationTopology(final String stateDirPath) {
diff --git a/streams/src/test/java/org/apache/kafka/streams/integration/StoreUpgradeIntegrationTest.java b/streams/src/test/java/org/apache/kafka/streams/integration/StoreUpgradeIntegrationTest.java
index 153f434..70082a0 100644
--- a/streams/src/test/java/org/apache/kafka/streams/integration/StoreUpgradeIntegrationTest.java
+++ b/streams/src/test/java/org/apache/kafka/streams/integration/StoreUpgradeIntegrationTest.java
@@ -518,8 +518,8 @@ public class StoreUpgradeIntegrationTest {
 
 
         shouldMigrateWindowStoreToTimestampedWindowStoreUsingPapi(
-            new KafkaStreams(streamsBuilderForOldStore.build(), props()),
-            new KafkaStreams(streamsBuilderForNewStore.build(), props()),
+            streamsBuilderForOldStore,
+            streamsBuilderForNewStore,
             false);
     }
 
@@ -554,17 +554,17 @@ public class StoreUpgradeIntegrationTest {
             .<Integer, Integer>stream(inputStream)
             .process(TimestampedWindowedProcessor::new, STORE_NAME);
 
-        final Properties props = props();
         shouldMigrateWindowStoreToTimestampedWindowStoreUsingPapi(
-            new KafkaStreams(streamsBuilderForOldStore.build(), props),
-            new KafkaStreams(streamsBuilderForNewStore.build(), props),
+            streamsBuilderForOldStore,
+            streamsBuilderForNewStore,
             true);
     }
 
-    private void shouldMigrateWindowStoreToTimestampedWindowStoreUsingPapi(final KafkaStreams kafkaStreamsOld,
-                                                                           final KafkaStreams kafkaStreamsNew,
+    private void shouldMigrateWindowStoreToTimestampedWindowStoreUsingPapi(final StreamsBuilder streamsBuilderForOldStore,
+                                                                           final StreamsBuilder streamsBuilderForNewStore,
                                                                            final boolean persistentStore) throws Exception {
-        kafkaStreams = kafkaStreamsOld;
+        final Properties props = props();
+        kafkaStreams =  new KafkaStreams(streamsBuilderForOldStore.build(), props);
         kafkaStreams.start();
 
         processWindowedKeyValueAndVerifyPlainCount(1, singletonList(
@@ -608,7 +608,7 @@ public class StoreUpgradeIntegrationTest {
         kafkaStreams = null;
 
 
-        kafkaStreams = kafkaStreamsNew;
+        kafkaStreams = new KafkaStreams(streamsBuilderForNewStore.build(), props);
         kafkaStreams.start();
 
         verifyWindowedCountWithTimestamp(new Windowed<>(1, new TimeWindow(0L, 1000L)), 2L, lastUpdateKeyOne);
diff --git a/streams/src/test/java/org/apache/kafka/streams/integration/utils/IntegrationTestUtils.java b/streams/src/test/java/org/apache/kafka/streams/integration/utils/IntegrationTestUtils.java
index 3de6a7d..bbe657f 100644
--- a/streams/src/test/java/org/apache/kafka/streams/integration/utils/IntegrationTestUtils.java
+++ b/streams/src/test/java/org/apache/kafka/streams/integration/utils/IntegrationTestUtils.java
@@ -153,6 +153,17 @@ public class IntegrationTestUtils {
         }
     }
 
+    /**
+     * Removes local state stores. Useful to reset state in-between integration test runs.
+     *
+     * @param streamsConfigurations Streams configuration settings
+     */
+    public static void purgeLocalStreamsState(final Collection<Properties> streamsConfigurations) throws IOException {
+        for (final Properties streamsConfig : streamsConfigurations) {
+            purgeLocalStreamsState(streamsConfig);
+        }
+    }
+
     public static void cleanStateBeforeTest(final EmbeddedKafkaCluster cluster, final String... topics) {
         cleanStateBeforeTest(cluster, 1, topics);
     }
diff --git a/streams/src/test/java/org/apache/kafka/streams/processor/internals/StateDirectoryTest.java b/streams/src/test/java/org/apache/kafka/streams/processor/internals/StateDirectoryTest.java
index f5f04dc..b0d77aa 100644
--- a/streams/src/test/java/org/apache/kafka/streams/processor/internals/StateDirectoryTest.java
+++ b/streams/src/test/java/org/apache/kafka/streams/processor/internals/StateDirectoryTest.java
@@ -16,6 +16,8 @@
  */
 package org.apache.kafka.streams.processor.internals;
 
+import com.fasterxml.jackson.annotation.JsonProperty;
+import com.fasterxml.jackson.databind.ObjectMapper;
 import org.apache.kafka.common.TopicPartition;
 import org.apache.kafka.common.utils.MockTime;
 import org.apache.kafka.common.utils.Utils;
@@ -29,10 +31,14 @@ import org.junit.After;
 import org.junit.Before;
 import org.junit.Test;
 
+import java.io.BufferedWriter;
 import java.io.File;
+import java.io.FileOutputStream;
 import java.io.IOException;
+import java.io.OutputStreamWriter;
 import java.nio.channels.FileChannel;
 import java.nio.channels.OverlappingFileLockException;
+import java.nio.charset.StandardCharsets;
 import java.nio.file.Files;
 import java.nio.file.Path;
 import java.nio.file.StandardOpenOption;
@@ -44,6 +50,7 @@ import java.util.EnumSet;
 import java.util.Objects;
 import java.util.Properties;
 import java.util.Set;
+import java.util.UUID;
 import java.util.concurrent.CountDownLatch;
 import java.util.concurrent.TimeUnit;
 import java.util.concurrent.atomic.AtomicBoolean;
@@ -52,6 +59,7 @@ import java.util.stream.Collectors;
 
 import static org.apache.kafka.common.utils.Utils.mkSet;
 import static org.apache.kafka.streams.processor.internals.StateDirectory.LOCK_FILE_NAME;
+import static org.apache.kafka.streams.processor.internals.StateDirectory.PROCESS_FILE_NAME;
 import static org.apache.kafka.streams.processor.internals.StateManagerUtil.CHECKPOINT_FILE_NAME;
 import static org.hamcrest.CoreMatchers.containsString;
 import static org.hamcrest.CoreMatchers.endsWith;
@@ -644,6 +652,72 @@ public class StateDirectoryTest {
         }
     }
 
+    @Test
+    public void shouldPersistProcessIdAcrossRestart() {
+        final UUID processId = directory.initializeProcessId();
+        directory.close();
+        assertThat(directory.initializeProcessId(), equalTo(processId));
+    }
+
+    @Test
+    public void shouldGetFreshProcessIdIfProcessFileDeleted() {
+        final UUID processId = directory.initializeProcessId();
+        directory.close();
+
+        final File processFile = new File(appDir, PROCESS_FILE_NAME);
+        assertThat(processFile.exists(), is(true));
+        assertThat(processFile.delete(), is(true));
+
+        assertThat(directory.initializeProcessId(), not(processId));
+    }
+
+    @Test
+    public void shouldGetFreshProcessIdIfJsonUnreadable() throws Exception {
+        final File processFile = new File(appDir, PROCESS_FILE_NAME);
+        assertThat(processFile.createNewFile(), is(true));
+        final UUID processId = UUID.randomUUID();
+
+        final FileOutputStream fileOutputStream = new FileOutputStream(processFile);
+        try (final BufferedWriter writer = new BufferedWriter(
+            new OutputStreamWriter(fileOutputStream, StandardCharsets.UTF_8))) {
+            writer.write(processId.toString());
+            writer.flush();
+            fileOutputStream.getFD().sync();
+        }
+
+        assertThat(directory.initializeProcessId(), not(processId));
+    }
+
+    @Test
+    public void shouldReadFutureProcessFileFormat() throws Exception {
+        final File processFile = new File(appDir, PROCESS_FILE_NAME);
+        final ObjectMapper mapper = new ObjectMapper();
+        final UUID processId = UUID.randomUUID();
+        mapper.writeValue(processFile, new FutureStateDirectoryProcessFile(processId, "some random junk"));
+
+        assertThat(directory.initializeProcessId(), equalTo(processId));
+    }
+
+    private static class FutureStateDirectoryProcessFile {
+
+        @JsonProperty
+        private final UUID processId;
+
+        @JsonProperty
+        private final String newField;
+
+        public FutureStateDirectoryProcessFile() {
+            this.processId = null;
+            this.newField = null;
+        }
+
+        FutureStateDirectoryProcessFile(final UUID processId, final String newField) {
+            this.processId = processId;
+            this.newField = newField;
+
+        }
+    }
+
     private static class CreateTaskDirRunner implements Runnable {
         private final StateDirectory directory;
         private final TaskId taskId;