You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@kafka.apache.org by ab...@apache.org on 2021/02/05 19:26:25 UTC
[kafka] branch 2.7 updated: KAFKA-10716: persist UUID in state
directory for stable processId across restarts - 2.7 (#10060)
This is an automated email from the ASF dual-hosted git repository.
ableegoldman pushed a commit to branch 2.7
in repository https://gitbox.apache.org/repos/asf/kafka.git
The following commit(s) were added to refs/heads/2.7 by this push:
new 62a58e1 KAFKA-10716: persist UUID in state directory for stable processId across restarts - 2.7 (#10060)
62a58e1 is described below
commit 62a58e1b2807c7983819b4a45b5a73cbdadedc7b
Author: A. Sophie Blee-Goldman <so...@confluent.io>
AuthorDate: Fri Feb 5 11:24:52 2021 -0800
KAFKA-10716: persist UUID in state directory for stable processId across restarts - 2.7 (#10060)
To stabilize the task assignment across restarts of the JVM we need some way to persist the process-specific UUID. We can just write it to a file in the state directory, and initialize it from there or create a new one if no prior UUID exists. Port of PR #9978 to the 2.7 branch
Reviewers: Walker Carlson <wc...@confluent.io>, Leah Thomas <lt...@confluent.io>
---
.../org/apache/kafka/streams/KafkaStreams.java | 39 +--
.../processor/internals/StateDirectory.java | 104 ++++++++
.../org/apache/kafka/streams/KafkaStreamsTest.java | 3 +
.../KStreamRepartitionIntegrationTest.java | 20 +-
...bleForeignKeyInnerJoinMultiIntegrationTest.java | 45 ++--
.../integration/StandbyTaskEOSIntegrationTest.java | 279 +++++++++++----------
.../integration/StoreUpgradeIntegrationTest.java | 18 +-
.../integration/utils/IntegrationTestUtils.java | 11 +
.../processor/internals/StateDirectoryTest.java | 74 ++++++
9 files changed, 414 insertions(+), 179 deletions(-)
diff --git a/streams/src/main/java/org/apache/kafka/streams/KafkaStreams.java b/streams/src/main/java/org/apache/kafka/streams/KafkaStreams.java
index b5e64e8..7ec6a0b 100644
--- a/streams/src/main/java/org/apache/kafka/streams/KafkaStreams.java
+++ b/streams/src/main/java/org/apache/kafka/streams/KafkaStreams.java
@@ -660,8 +660,25 @@ public class KafkaStreams implements AutoCloseable {
this.config = config;
this.time = time;
- // The application ID is a required config and hence should always have value
- final UUID processId = UUID.randomUUID();
+ // re-write the physical topology according to the config
+ internalTopologyBuilder.rewriteTopology(config);
+
+ // sanity check to fail-fast in case we cannot build a ProcessorTopology due to an exception
+ taskTopology = internalTopologyBuilder.buildTopology();
+ globalTaskTopology = internalTopologyBuilder.buildGlobalStateTopology();
+
+ final boolean hasGlobalTopology = globalTaskTopology != null;
+ final boolean hasPersistentStores = taskTopology.hasPersistentLocalStore() ||
+ (hasGlobalTopology && globalTaskTopology.hasPersistentGlobalStore());
+
+ final UUID processId;
+ try {
+ stateDirectory = new StateDirectory(config, time, hasPersistentStores);
+ processId = stateDirectory.initializeProcessId();
+ } catch (final ProcessorStateException fatal) {
+ throw new StreamsException(fatal);
+ }
+
final String userClientId = config.getString(StreamsConfig.CLIENT_ID_CONFIG);
final String applicationId = config.getString(StreamsConfig.APPLICATION_ID_CONFIG);
if (userClientId.length() <= 0) {
@@ -700,12 +717,6 @@ public class KafkaStreams implements AutoCloseable {
log.info("Kafka Streams version: {}", ClientMetrics.version());
log.info("Kafka Streams commit ID: {}", ClientMetrics.commitId());
- // re-write the physical topology according to the config
- internalTopologyBuilder.rewriteTopology(config);
-
- // sanity check to fail-fast in case we cannot build a ProcessorTopology due to an exception
- taskTopology = internalTopologyBuilder.buildTopology();
-
streamsMetadataState = new StreamsMetadataState(
internalTopologyBuilder,
parseHostInfo(config.getString(StreamsConfig.APPLICATION_SERVER_CONFIG)));
@@ -721,9 +732,6 @@ public class KafkaStreams implements AutoCloseable {
// create the stream thread, global update thread, and cleanup thread
threads = new StreamThread[numStreamThreads];
- globalTaskTopology = internalTopologyBuilder.buildGlobalStateTopology();
- final boolean hasGlobalTopology = globalTaskTopology != null;
-
if (numStreamThreads == 0 && !hasGlobalTopology) {
log.error("Topology with no input topics will create no stream threads and no global thread.");
throw new TopologyException("Topology has no stream threads and no global threads, " +
@@ -736,14 +744,6 @@ public class KafkaStreams implements AutoCloseable {
log.warn("Negative cache size passed in. Reverting to cache size of 0 bytes.");
}
final long cacheSizePerThread = totalCacheSize / (threads.length + (hasGlobalTopology ? 1 : 0));
- final boolean hasPersistentStores = taskTopology.hasPersistentLocalStore() ||
- (hasGlobalTopology && globalTaskTopology.hasPersistentGlobalStore());
-
- try {
- stateDirectory = new StateDirectory(config, time, hasPersistentStores);
- } catch (final ProcessorStateException fatal) {
- throw new StreamsException(fatal);
- }
final StateRestoreListener delegatingStateRestoreListener = new DelegatingStateRestoreListener();
GlobalStreamThread.State globalThreadState = null;
@@ -977,6 +977,7 @@ public class KafkaStreams implements AutoCloseable {
globalStreamThread = null;
}
+ stateDirectory.close();
adminClient.close();
streamsMetrics.removeAllClientLevelMetrics();
diff --git a/streams/src/main/java/org/apache/kafka/streams/processor/internals/StateDirectory.java b/streams/src/main/java/org/apache/kafka/streams/processor/internals/StateDirectory.java
index e388b6a..97d0865 100644
--- a/streams/src/main/java/org/apache/kafka/streams/processor/internals/StateDirectory.java
+++ b/streams/src/main/java/org/apache/kafka/streams/processor/internals/StateDirectory.java
@@ -16,6 +16,9 @@
*/
package org.apache.kafka.streams.processor.internals;
+import com.fasterxml.jackson.annotation.JsonIgnoreProperties;
+import com.fasterxml.jackson.annotation.JsonProperty;
+import com.fasterxml.jackson.databind.ObjectMapper;
import org.apache.kafka.common.utils.Time;
import org.apache.kafka.common.utils.Utils;
import org.apache.kafka.streams.StreamsConfig;
@@ -39,6 +42,7 @@ import java.nio.file.attribute.PosixFilePermissions;
import java.util.Collections;
import java.util.HashMap;
import java.util.Set;
+import java.util.UUID;
import java.util.regex.Pattern;
import static org.apache.kafka.streams.processor.internals.StateManagerUtil.CHECKPOINT_FILE_NAME;
@@ -54,6 +58,25 @@ public class StateDirectory {
private static final Logger log = LoggerFactory.getLogger(StateDirectory.class);
static final String LOCK_FILE_NAME = ".lock";
+ /* The process file is used to persist the process id across restarts.
+ * For compatibility reasons you should only ever add fields to the json schema
+ */
+ static final String PROCESS_FILE_NAME = "kafka-streams-process-metadata";
+
+ @JsonIgnoreProperties(ignoreUnknown = true)
+ static class StateDirectoryProcessFile {
+ @JsonProperty
+ private final UUID processId;
+
+ public StateDirectoryProcessFile() {
+ this.processId = null;
+ }
+
+ StateDirectoryProcessFile(final UUID processId) {
+ this.processId = processId;
+ }
+ }
+
private final Object taskDirCreationLock = new Object();
private final Time time;
private final String appId;
@@ -62,6 +85,9 @@ public class StateDirectory {
private final HashMap<TaskId, FileChannel> channels = new HashMap<>();
private final HashMap<TaskId, LockAndOwner> locks = new HashMap<>();
+ private FileChannel stateDirLockChannel;
+ private FileLock stateDirLock;
+
private FileChannel globalStateChannel;
private FileLock globalStateLock;
@@ -134,6 +160,61 @@ public class StateDirectory {
}
/**
+ * @return true if the state directory was successfully locked
+ */
+ private boolean lockStateDirectory() {
+ final File lockFile = new File(stateDir, LOCK_FILE_NAME);
+ try {
+ stateDirLockChannel = FileChannel.open(lockFile.toPath(), StandardOpenOption.CREATE, StandardOpenOption.WRITE);
+ stateDirLock = tryLock(stateDirLockChannel);
+ } catch (final IOException e) {
+ log.error("Unable to lock the state directory due to unexpected exception", e);
+ throw new ProcessorStateException("Failed to lock the state directory during startup", e);
+ }
+
+ return stateDirLock != null;
+ }
+
+ public UUID initializeProcessId() {
+ if (!hasPersistentStores) {
+ return UUID.randomUUID();
+ }
+
+ if (!lockStateDirectory()) {
+ log.error("Unable to obtain lock as state directory is already locked by another process");
+ throw new StreamsException("Unable to initialize state, this can happen if multiple instances of " +
+ "Kafka Streams are running in the same state directory");
+ }
+
+ final File processFile = new File(stateDir, PROCESS_FILE_NAME);
+ final ObjectMapper mapper = new ObjectMapper();
+
+ try {
+ if (processFile.exists()) {
+ try {
+ final StateDirectoryProcessFile processFileData = mapper.readValue(processFile, StateDirectoryProcessFile.class);
+ log.info("Reading UUID from process file: {}", processFileData.processId);
+ if (processFileData.processId != null) {
+ return processFileData.processId;
+ }
+ } catch (final Exception e) {
+ log.warn("Failed to read json process file", e);
+ }
+ }
+
+ final StateDirectoryProcessFile processFileData = new StateDirectoryProcessFile(UUID.randomUUID());
+ log.info("No process id found on disk, got fresh process id {}", processFileData.processId);
+
+ mapper.writeValue(processFile, processFileData);
+ return processFileData.processId;
+ } catch (final IOException e) {
+ log.error("Unable to read/write process file due to unexpected exception", e);
+ throw new ProcessorStateException(e);
+ }
+ }
+
+
+ /**
* Get or create the directory for the provided {@link TaskId}.
* @return directory for the {@link TaskId}
* @throws ProcessorStateException if the task directory does not exists and could not be created
@@ -310,6 +391,29 @@ public class StateDirectory {
}
}
+ public void close() {
+ if (hasPersistentStores) {
+ try {
+ stateDirLock.release();
+ stateDirLockChannel.close();
+
+ stateDirLock = null;
+ stateDirLockChannel = null;
+ } catch (final IOException e) {
+ log.error("Unexpected exception while unlocking the state dir", e);
+ throw new StreamsException("Failed to release the lock on the state directory", e);
+ }
+
+ // all threads should be stopped and cleaned up by now, so none should remain holding a lock
+ if (locks.isEmpty()) {
+ log.error("Some task directories still locked while closing state, this indicates unclean shutdown: {}", locks);
+ }
+ if (globalStateLock != null) {
+ log.error("Global state lock is present while closing the state, this indicates unclean shutdown");
+ }
+ }
+ }
+
public synchronized void clean() {
// remove task dirs
try {
diff --git a/streams/src/test/java/org/apache/kafka/streams/KafkaStreamsTest.java b/streams/src/test/java/org/apache/kafka/streams/KafkaStreamsTest.java
index f259696..6e6a3ea 100644
--- a/streams/src/test/java/org/apache/kafka/streams/KafkaStreamsTest.java
+++ b/streams/src/test/java/org/apache/kafka/streams/KafkaStreamsTest.java
@@ -830,6 +830,8 @@ public class KafkaStreamsTest {
anyObject(Time.class),
EasyMock.eq(true)
).andReturn(stateDirectory);
+ EasyMock.expect(stateDirectory.initializeProcessId()).andReturn(UUID.randomUUID());
+ stateDirectory.close();
PowerMock.replayAll(Executors.class, cleanupSchedule, stateDirectory);
props.setProperty(StreamsConfig.STATE_CLEANUP_DELAY_MS_CONFIG, "1");
@@ -999,6 +1001,7 @@ public class KafkaStreamsTest {
anyObject(Time.class),
EasyMock.eq(shouldFilesExist)
).andReturn(stateDirectory);
+ EasyMock.expect(stateDirectory.initializeProcessId()).andReturn(UUID.randomUUID());
PowerMock.replayAll();
diff --git a/streams/src/test/java/org/apache/kafka/streams/integration/KStreamRepartitionIntegrationTest.java b/streams/src/test/java/org/apache/kafka/streams/integration/KStreamRepartitionIntegrationTest.java
index a12bb85..ee1541d 100644
--- a/streams/src/test/java/org/apache/kafka/streams/integration/KStreamRepartitionIntegrationTest.java
+++ b/streams/src/test/java/org/apache/kafka/streams/integration/KStreamRepartitionIntegrationTest.java
@@ -622,8 +622,10 @@ public class KStreamRepartitionIntegrationTest {
.to(outputTopic);
startStreams(builder);
- final KafkaStreams kafkaStreamsToClose = startStreams(builder);
-
+ final Properties streamsToCloseConfigs = new Properties();
+ streamsToCloseConfigs.putAll(streamsConfiguration);
+ streamsToCloseConfigs.put(StreamsConfig.STATE_DIR_CONFIG, TestUtils.tempDirectory().getPath() + "-2");
+ final KafkaStreams kafkaStreamsToClose = startStreams(builder, streamsToCloseConfigs);
validateReceivedMessages(
new StringDeserializer(),
new LongDeserializer(),
@@ -724,12 +726,24 @@ public class KStreamRepartitionIntegrationTest {
}
private KafkaStreams startStreams(final StreamsBuilder builder) throws InterruptedException {
- return startStreams(builder, REBALANCING, RUNNING, null);
+ return startStreams(builder, REBALANCING, RUNNING, streamsConfiguration, null);
+ }
+
+ private KafkaStreams startStreams(final StreamsBuilder builder, final Properties streamsConfiguration) throws InterruptedException {
+ return startStreams(builder, REBALANCING, RUNNING, streamsConfiguration, null);
+ }
+
+ private KafkaStreams startStreams(final StreamsBuilder builder,
+ final State expectedOldState,
+ final State expectedNewState,
+ final Thread.UncaughtExceptionHandler uncaughtExceptionHandler) throws InterruptedException {
+ return startStreams(builder, expectedOldState, expectedNewState, streamsConfiguration, uncaughtExceptionHandler);
}
private KafkaStreams startStreams(final StreamsBuilder builder,
final State expectedOldState,
final State expectedNewState,
+ final Properties streamsConfiguration,
final UncaughtExceptionHandler uncaughtExceptionHandler) throws InterruptedException {
final CountDownLatch latch;
final KafkaStreams kafkaStreams = new KafkaStreams(builder.build(streamsConfiguration), streamsConfiguration);
diff --git a/streams/src/test/java/org/apache/kafka/streams/integration/KTableKTableForeignKeyInnerJoinMultiIntegrationTest.java b/streams/src/test/java/org/apache/kafka/streams/integration/KTableKTableForeignKeyInnerJoinMultiIntegrationTest.java
index eeac500..b52b638 100644
--- a/streams/src/test/java/org/apache/kafka/streams/integration/KTableKTableForeignKeyInnerJoinMultiIntegrationTest.java
+++ b/streams/src/test/java/org/apache/kafka/streams/integration/KTableKTableForeignKeyInnerJoinMultiIntegrationTest.java
@@ -50,7 +50,6 @@ import org.junit.Test;
import org.junit.experimental.categories.Category;
import java.io.IOException;
-import java.util.Arrays;
import java.util.Collections;
import java.util.HashSet;
import java.util.List;
@@ -58,6 +57,7 @@ import java.util.Properties;
import java.util.Set;
import java.util.function.Function;
+import static java.util.Arrays.asList;
import static org.junit.Assert.assertEquals;
@Category({IntegrationTest.class})
@@ -71,7 +71,9 @@ public class KTableKTableForeignKeyInnerJoinMultiIntegrationTest {
private final static String TABLE_2 = "table2";
private final static String TABLE_3 = "table3";
private final static String OUTPUT = "output-";
- private static Properties streamsConfig;
+ private final Properties streamsConfig = getStreamsConfig();
+ private final Properties streamsConfigTwo = getStreamsConfig();
+ private final Properties streamsConfigThree = getStreamsConfig();
private KafkaStreams streams;
private KafkaStreams streamsTwo;
private KafkaStreams streamsThree;
@@ -105,14 +107,8 @@ public class KTableKTableForeignKeyInnerJoinMultiIntegrationTest {
PRODUCER_CONFIG_3.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, IntegerSerializer.class);
PRODUCER_CONFIG_3.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, StringSerializer.class);
- streamsConfig = new Properties();
- streamsConfig.put(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG, CLUSTER.bootstrapServers());
- streamsConfig.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest");
- streamsConfig.put(StreamsConfig.STATE_DIR_CONFIG, TestUtils.tempDirectory().getPath());
- streamsConfig.put(StreamsConfig.CACHE_MAX_BYTES_BUFFERING_CONFIG, 0);
- streamsConfig.put(StreamsConfig.COMMIT_INTERVAL_MS_CONFIG, 100);
- final List<KeyValue<Integer, Float>> table1 = Arrays.asList(
+ final List<KeyValue<Integer, Float>> table1 = asList(
new KeyValue<>(1, 1.33f),
new KeyValue<>(2, 2.22f),
new KeyValue<>(3, -1.22f), //Won't be joined in yet.
@@ -120,7 +116,7 @@ public class KTableKTableForeignKeyInnerJoinMultiIntegrationTest {
);
//Partitions pre-computed using the default Murmur2 hash, just to ensure that all 3 partitions will be exercised.
- final List<KeyValue<String, Long>> table2 = Arrays.asList(
+ final List<KeyValue<String, Long>> table2 = asList(
new KeyValue<>("0", 0L), //partition 2
new KeyValue<>("1", 10L), //partition 0
new KeyValue<>("2", 20L), //partition 2
@@ -150,7 +146,12 @@ public class KTableKTableForeignKeyInnerJoinMultiIntegrationTest {
@Before
public void before() throws IOException {
- IntegrationTestUtils.purgeLocalStreamsState(streamsConfig);
+ final String stateDirBasePath = TestUtils.tempDirectory().getPath();
+ streamsConfig.put(StreamsConfig.STATE_DIR_CONFIG, stateDirBasePath + "-1");
+ streamsConfigTwo.put(StreamsConfig.STATE_DIR_CONFIG, stateDirBasePath + "-2");
+ streamsConfigThree.put(StreamsConfig.STATE_DIR_CONFIG, stateDirBasePath + "-3");
+
+ IntegrationTestUtils.purgeLocalStreamsState(asList(streamsConfig, streamsConfigTwo, streamsConfigThree));
}
@After
@@ -187,11 +188,10 @@ public class KTableKTableForeignKeyInnerJoinMultiIntegrationTest {
final boolean verifyQueryableState) throws Exception {
final String queryableName = verifyQueryableState ? joinType + "-store1" : null;
final String queryableNameTwo = verifyQueryableState ? joinType + "-store2" : null;
- streamsConfig.put(StreamsConfig.APPLICATION_ID_CONFIG, joinType + queryableName);
- streams = prepareTopology(queryableName, queryableNameTwo);
- streamsTwo = prepareTopology(queryableName, queryableNameTwo);
- streamsThree = prepareTopology(queryableName, queryableNameTwo);
+ streams = prepareTopology(queryableName, queryableNameTwo, streamsConfig);
+ streamsTwo = prepareTopology(queryableName, queryableNameTwo, streamsConfigTwo);
+ streamsThree = prepareTopology(queryableName, queryableNameTwo, streamsConfigThree);
streams.start();
streamsTwo.start();
streamsThree.start();
@@ -204,7 +204,20 @@ public class KTableKTableForeignKeyInnerJoinMultiIntegrationTest {
assertEquals(expectedResult, result);
}
- private KafkaStreams prepareTopology(final String queryableName, final String queryableNameTwo) {
+ private static Properties getStreamsConfig() {
+ final Properties streamsConfig = new Properties();
+ streamsConfig.put(StreamsConfig.APPLICATION_ID_CONFIG, "KTable-FKJ-Multi");
+ streamsConfig.put(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG, CLUSTER.bootstrapServers());
+ streamsConfig.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest");
+ streamsConfig.put(StreamsConfig.CACHE_MAX_BYTES_BUFFERING_CONFIG, 0);
+ streamsConfig.put(StreamsConfig.COMMIT_INTERVAL_MS_CONFIG, 100);
+ return streamsConfig;
+ }
+
+ private static KafkaStreams prepareTopology(final String queryableName,
+ final String queryableNameTwo,
+ final Properties streamsConfig) {
+
final UniqueTopicSerdeScope serdeScope = new UniqueTopicSerdeScope();
final StreamsBuilder builder = new StreamsBuilder();
diff --git a/streams/src/test/java/org/apache/kafka/streams/integration/StandbyTaskEOSIntegrationTest.java b/streams/src/test/java/org/apache/kafka/streams/integration/StandbyTaskEOSIntegrationTest.java
index 93b4f2f..e456990 100644
--- a/streams/src/test/java/org/apache/kafka/streams/integration/StandbyTaskEOSIntegrationTest.java
+++ b/streams/src/test/java/org/apache/kafka/streams/integration/StandbyTaskEOSIntegrationTest.java
@@ -39,6 +39,7 @@ import org.apache.kafka.streams.state.QueryableStoreTypes;
import org.apache.kafka.streams.state.Stores;
import org.apache.kafka.streams.state.internals.OffsetCheckpoint;
import org.apache.kafka.test.TestUtils;
+import org.junit.After;
import org.junit.Before;
import org.junit.ClassRule;
import org.junit.Rule;
@@ -91,6 +92,10 @@ public class StandbyTaskEOSIntegrationTest {
private String storeName;
private String outputTopic;
+ private KafkaStreams streamInstanceOne;
+ private KafkaStreams streamInstanceTwo;
+ private KafkaStreams streamInstanceOneRecovery;
+
@ClassRule
public static final EmbeddedKafkaCluster CLUSTER = new EmbeddedKafkaCluster(3);
@@ -109,6 +114,19 @@ public class StandbyTaskEOSIntegrationTest {
CLUSTER.createTopic(outputTopic, 1, 3);
}
+ @After
+ public void cleanUp() {
+ if (streamInstanceOne != null) {
+ streamInstanceOne.close();
+ }
+ if (streamInstanceTwo != null) {
+ streamInstanceTwo.close();
+ }
+ if (streamInstanceOneRecovery != null) {
+ streamInstanceOneRecovery.close();
+ }
+ }
+
@Test
public void shouldSurviveWithOneTaskAsStandby() throws Exception {
IntegrationTestUtils.produceKeyValuesSynchronouslyWithTimestamp(
@@ -129,21 +147,19 @@ public class StandbyTaskEOSIntegrationTest {
final CountDownLatch instanceLatch = new CountDownLatch(1);
- try (
- final KafkaStreams streamInstanceOne = buildStreamWithDirtyStateDir(stateDirPath + "/" + appId + "-1/", instanceLatch);
- final KafkaStreams streamInstanceTwo = buildStreamWithDirtyStateDir(stateDirPath + "/" + appId + "-2/", instanceLatch)
- ) {
- startApplicationAndWaitUntilRunning(asList(streamInstanceOne, streamInstanceTwo), Duration.ofSeconds(60));
+ streamInstanceOne = buildStreamWithDirtyStateDir(stateDirPath + "/" + appId + "-1/", instanceLatch);
+ streamInstanceTwo = buildStreamWithDirtyStateDir(stateDirPath + "/" + appId + "-2/", instanceLatch);
+ startApplicationAndWaitUntilRunning(asList(streamInstanceOne, streamInstanceTwo), Duration.ofSeconds(60));
- // Wait for the record to be processed
- assertTrue(instanceLatch.await(15, TimeUnit.SECONDS));
+ // Wait for the record to be processed
+ assertTrue(instanceLatch.await(15, TimeUnit.SECONDS));
- streamInstanceOne.close(Duration.ZERO);
- streamInstanceTwo.close(Duration.ZERO);
+ streamInstanceOne.close(Duration.ZERO);
+ streamInstanceTwo.close(Duration.ZERO);
+
+ streamInstanceOne.cleanUp();
+ streamInstanceTwo.cleanUp();
- streamInstanceOne.cleanUp();
- streamInstanceTwo.cleanUp();
- }
}
private KafkaStreams buildStreamWithDirtyStateDir(final String stateDirPath,
@@ -165,10 +181,10 @@ public class StandbyTaskEOSIntegrationTest {
builder.stream(inputTopic,
Consumed.with(Serdes.Integer(), Serdes.Integer()))
- .groupByKey()
- .count()
- .toStream()
- .peek((key, value) -> recordProcessLatch.countDown());
+ .groupByKey()
+ .count()
+ .toStream()
+ .peek((key, value) -> recordProcessLatch.countDown());
return new KafkaStreams(builder.build(), props);
}
@@ -192,124 +208,123 @@ public class StandbyTaskEOSIntegrationTest {
10L + time
);
- try (
- final KafkaStreams streamInstanceOne = buildWithDeduplicationTopology(base + "-1");
- final KafkaStreams streamInstanceTwo = buildWithDeduplicationTopology(base + "-2");
- final KafkaStreams streamInstanceOneRecovery = buildWithDeduplicationTopology(base + "-1")
- ) {
- // start first instance and wait for processing
- startApplicationAndWaitUntilRunning(Collections.singletonList(streamInstanceOne), Duration.ofSeconds(30));
- IntegrationTestUtils.waitUntilMinRecordsReceived(
- TestUtils.consumerConfig(
- CLUSTER.bootstrapServers(),
- IntegerDeserializer.class,
- IntegerDeserializer.class
- ),
- outputTopic,
- 1
- );
-
- // start second instance and wait for standby replication
- startApplicationAndWaitUntilRunning(Collections.singletonList(streamInstanceTwo), Duration.ofSeconds(30));
- waitForCondition(
- () -> streamInstanceTwo.store(
- StoreQueryParameters.fromNameAndType(
- storeName,
- QueryableStoreTypes.<Integer, Integer>keyValueStore()
- ).enableStaleStores()
- ).get(KEY_0) != null,
- REBALANCE_TIMEOUT,
- "Could not get key from standby store"
- );
- // sanity check that first instance is still active
- waitForCondition(
- () -> streamInstanceOne.store(
- StoreQueryParameters.fromNameAndType(
- storeName,
- QueryableStoreTypes.<Integer, Integer>keyValueStore()
- )
- ).get(KEY_0) != null,
- "Could not get key from main store"
- );
-
- // inject poison pill and wait for crash of first instance and recovery on second instance
- IntegrationTestUtils.produceKeyValuesSynchronouslyWithTimestamp(
- inputTopic,
- Collections.singletonList(
- new KeyValue<>(KEY_1, 0)
- ),
- TestUtils.producerConfig(
- CLUSTER.bootstrapServers(),
- IntegerSerializer.class,
- IntegerSerializer.class,
- new Properties()
- ),
- 10L + time
- );
- waitForCondition(
- () -> streamInstanceOne.state() == KafkaStreams.State.ERROR,
- "Stream instance 1 did not go into error state"
- );
- streamInstanceOne.close();
+ streamInstanceOne = buildWithDeduplicationTopology(base + "-1");
+ streamInstanceTwo = buildWithDeduplicationTopology(base + "-2");
+
+ // start first instance and wait for processing
+ startApplicationAndWaitUntilRunning(Collections.singletonList(streamInstanceOne), Duration.ofSeconds(30));
+ IntegrationTestUtils.waitUntilMinRecordsReceived(
+ TestUtils.consumerConfig(
+ CLUSTER.bootstrapServers(),
+ IntegerDeserializer.class,
+ IntegerDeserializer.class
+ ),
+ outputTopic,
+ 1
+ );
- IntegrationTestUtils.waitUntilMinRecordsReceived(
- TestUtils.consumerConfig(
- CLUSTER.bootstrapServers(),
- IntegerDeserializer.class,
- IntegerDeserializer.class
- ),
- outputTopic,
- 2
- );
-
- // "restart" first client and wait for standby recovery
- // (could actually also be active, but it does not matter as long as we enable "state stores"
- startApplicationAndWaitUntilRunning(
- Collections.singletonList(streamInstanceOneRecovery),
- Duration.ofSeconds(30)
- );
- waitForCondition(
- () -> streamInstanceOneRecovery.store(
- StoreQueryParameters.fromNameAndType(
- storeName,
- QueryableStoreTypes.<Integer, Integer>keyValueStore()
- ).enableStaleStores()
- ).get(KEY_0) != null,
- "Could not get key from recovered standby store"
- );
+ // start second instance and wait for standby replication
+ startApplicationAndWaitUntilRunning(Collections.singletonList(streamInstanceTwo), Duration.ofSeconds(30));
+ waitForCondition(
+ () -> streamInstanceTwo.store(
+ StoreQueryParameters.fromNameAndType(
+ storeName,
+ QueryableStoreTypes.<Integer, Integer>keyValueStore()
+ ).enableStaleStores()
+ ).get(KEY_0) != null,
+ REBALANCE_TIMEOUT,
+ "Could not get key from standby store"
+ );
+ // sanity check that first instance is still active
+ waitForCondition(
+ () -> streamInstanceOne.store(
+ StoreQueryParameters.fromNameAndType(
+ storeName,
+ QueryableStoreTypes.<Integer, Integer>keyValueStore()
+ )
+ ).get(KEY_0) != null,
+ "Could not get key from main store"
+ );
- streamInstanceTwo.close();
- waitForCondition(
- () -> streamInstanceOneRecovery.store(
- StoreQueryParameters.fromNameAndType(
- storeName,
- QueryableStoreTypes.<Integer, Integer>keyValueStore()
- )
- ).get(KEY_0) != null,
- REBALANCE_TIMEOUT,
- "Could not get key from recovered main store"
- );
-
- // re-inject poison pill and wait for crash of first instance
- skipRecord.set(false);
- IntegrationTestUtils.produceKeyValuesSynchronouslyWithTimestamp(
- inputTopic,
- Collections.singletonList(
- new KeyValue<>(KEY_1, 0)
- ),
- TestUtils.producerConfig(
- CLUSTER.bootstrapServers(),
- IntegerSerializer.class,
- IntegerSerializer.class,
- new Properties()
- ),
- 10L + time
- );
- waitForCondition(
- () -> streamInstanceOneRecovery.state() == KafkaStreams.State.ERROR,
- "Stream instance 1 did not go into error state"
- );
- }
+ // inject poison pill and wait for crash of first instance and recovery on second instance
+ IntegrationTestUtils.produceKeyValuesSynchronouslyWithTimestamp(
+ inputTopic,
+ Collections.singletonList(
+ new KeyValue<>(KEY_1, 0)
+ ),
+ TestUtils.producerConfig(
+ CLUSTER.bootstrapServers(),
+ IntegerSerializer.class,
+ IntegerSerializer.class,
+ new Properties()
+ ),
+ 10L + time
+ );
+ waitForCondition(
+ () -> streamInstanceOne.state() == KafkaStreams.State.ERROR,
+ "Stream instance 1 did not go into error state"
+ );
+ streamInstanceOne.close();
+
+ IntegrationTestUtils.waitUntilMinRecordsReceived(
+ TestUtils.consumerConfig(
+ CLUSTER.bootstrapServers(),
+ IntegerDeserializer.class,
+ IntegerDeserializer.class
+ ),
+ outputTopic,
+ 2
+ );
+
+ streamInstanceOneRecovery = buildWithDeduplicationTopology(base + "-1");
+
+ // "restart" first client and wait for standby recovery
+ // (could actually also be active, but it does not matter as long as we enable "state stores"
+ startApplicationAndWaitUntilRunning(
+ Collections.singletonList(streamInstanceOneRecovery),
+ Duration.ofSeconds(30)
+ );
+ waitForCondition(
+ () -> streamInstanceOneRecovery.store(
+ StoreQueryParameters.fromNameAndType(
+ storeName,
+ QueryableStoreTypes.<Integer, Integer>keyValueStore()
+ ).enableStaleStores()
+ ).get(KEY_0) != null,
+ "Could not get key from recovered standby store"
+ );
+
+ streamInstanceTwo.close();
+ waitForCondition(
+ () -> streamInstanceOneRecovery.store(
+ StoreQueryParameters.fromNameAndType(
+ storeName,
+ QueryableStoreTypes.<Integer, Integer>keyValueStore()
+ )
+ ).get(KEY_0) != null,
+ REBALANCE_TIMEOUT,
+ "Could not get key from recovered main store"
+ );
+
+ // re-inject poison pill and wait for crash of first instance
+ skipRecord.set(false);
+ IntegrationTestUtils.produceKeyValuesSynchronouslyWithTimestamp(
+ inputTopic,
+ Collections.singletonList(
+ new KeyValue<>(KEY_1, 0)
+ ),
+ TestUtils.producerConfig(
+ CLUSTER.bootstrapServers(),
+ IntegerSerializer.class,
+ IntegerSerializer.class,
+ new Properties()
+ ),
+ 10L + time
+ );
+ waitForCondition(
+ () -> streamInstanceOneRecovery.state() == KafkaStreams.State.ERROR,
+ "Stream instance 1 did not go into error state"
+ );
}
private KafkaStreams buildWithDeduplicationTopology(final String stateDirPath) {
diff --git a/streams/src/test/java/org/apache/kafka/streams/integration/StoreUpgradeIntegrationTest.java b/streams/src/test/java/org/apache/kafka/streams/integration/StoreUpgradeIntegrationTest.java
index 153f434..70082a0 100644
--- a/streams/src/test/java/org/apache/kafka/streams/integration/StoreUpgradeIntegrationTest.java
+++ b/streams/src/test/java/org/apache/kafka/streams/integration/StoreUpgradeIntegrationTest.java
@@ -518,8 +518,8 @@ public class StoreUpgradeIntegrationTest {
shouldMigrateWindowStoreToTimestampedWindowStoreUsingPapi(
- new KafkaStreams(streamsBuilderForOldStore.build(), props()),
- new KafkaStreams(streamsBuilderForNewStore.build(), props()),
+ streamsBuilderForOldStore,
+ streamsBuilderForNewStore,
false);
}
@@ -554,17 +554,17 @@ public class StoreUpgradeIntegrationTest {
.<Integer, Integer>stream(inputStream)
.process(TimestampedWindowedProcessor::new, STORE_NAME);
- final Properties props = props();
shouldMigrateWindowStoreToTimestampedWindowStoreUsingPapi(
- new KafkaStreams(streamsBuilderForOldStore.build(), props),
- new KafkaStreams(streamsBuilderForNewStore.build(), props),
+ streamsBuilderForOldStore,
+ streamsBuilderForNewStore,
true);
}
- private void shouldMigrateWindowStoreToTimestampedWindowStoreUsingPapi(final KafkaStreams kafkaStreamsOld,
- final KafkaStreams kafkaStreamsNew,
+ private void shouldMigrateWindowStoreToTimestampedWindowStoreUsingPapi(final StreamsBuilder streamsBuilderForOldStore,
+ final StreamsBuilder streamsBuilderForNewStore,
final boolean persistentStore) throws Exception {
- kafkaStreams = kafkaStreamsOld;
+ final Properties props = props();
+ kafkaStreams = new KafkaStreams(streamsBuilderForOldStore.build(), props);
kafkaStreams.start();
processWindowedKeyValueAndVerifyPlainCount(1, singletonList(
@@ -608,7 +608,7 @@ public class StoreUpgradeIntegrationTest {
kafkaStreams = null;
- kafkaStreams = kafkaStreamsNew;
+ kafkaStreams = new KafkaStreams(streamsBuilderForNewStore.build(), props);
kafkaStreams.start();
verifyWindowedCountWithTimestamp(new Windowed<>(1, new TimeWindow(0L, 1000L)), 2L, lastUpdateKeyOne);
diff --git a/streams/src/test/java/org/apache/kafka/streams/integration/utils/IntegrationTestUtils.java b/streams/src/test/java/org/apache/kafka/streams/integration/utils/IntegrationTestUtils.java
index 3de6a7d..bbe657f 100644
--- a/streams/src/test/java/org/apache/kafka/streams/integration/utils/IntegrationTestUtils.java
+++ b/streams/src/test/java/org/apache/kafka/streams/integration/utils/IntegrationTestUtils.java
@@ -153,6 +153,17 @@ public class IntegrationTestUtils {
}
}
+ /**
+ * Removes local state stores. Useful to reset state in-between integration test runs.
+ *
+ * @param streamsConfigurations Streams configuration settings
+ */
+ public static void purgeLocalStreamsState(final Collection<Properties> streamsConfigurations) throws IOException {
+ for (final Properties streamsConfig : streamsConfigurations) {
+ purgeLocalStreamsState(streamsConfig);
+ }
+ }
+
public static void cleanStateBeforeTest(final EmbeddedKafkaCluster cluster, final String... topics) {
cleanStateBeforeTest(cluster, 1, topics);
}
diff --git a/streams/src/test/java/org/apache/kafka/streams/processor/internals/StateDirectoryTest.java b/streams/src/test/java/org/apache/kafka/streams/processor/internals/StateDirectoryTest.java
index f5f04dc..b0d77aa 100644
--- a/streams/src/test/java/org/apache/kafka/streams/processor/internals/StateDirectoryTest.java
+++ b/streams/src/test/java/org/apache/kafka/streams/processor/internals/StateDirectoryTest.java
@@ -16,6 +16,8 @@
*/
package org.apache.kafka.streams.processor.internals;
+import com.fasterxml.jackson.annotation.JsonProperty;
+import com.fasterxml.jackson.databind.ObjectMapper;
import org.apache.kafka.common.TopicPartition;
import org.apache.kafka.common.utils.MockTime;
import org.apache.kafka.common.utils.Utils;
@@ -29,10 +31,14 @@ import org.junit.After;
import org.junit.Before;
import org.junit.Test;
+import java.io.BufferedWriter;
import java.io.File;
+import java.io.FileOutputStream;
import java.io.IOException;
+import java.io.OutputStreamWriter;
import java.nio.channels.FileChannel;
import java.nio.channels.OverlappingFileLockException;
+import java.nio.charset.StandardCharsets;
import java.nio.file.Files;
import java.nio.file.Path;
import java.nio.file.StandardOpenOption;
@@ -44,6 +50,7 @@ import java.util.EnumSet;
import java.util.Objects;
import java.util.Properties;
import java.util.Set;
+import java.util.UUID;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicBoolean;
@@ -52,6 +59,7 @@ import java.util.stream.Collectors;
import static org.apache.kafka.common.utils.Utils.mkSet;
import static org.apache.kafka.streams.processor.internals.StateDirectory.LOCK_FILE_NAME;
+import static org.apache.kafka.streams.processor.internals.StateDirectory.PROCESS_FILE_NAME;
import static org.apache.kafka.streams.processor.internals.StateManagerUtil.CHECKPOINT_FILE_NAME;
import static org.hamcrest.CoreMatchers.containsString;
import static org.hamcrest.CoreMatchers.endsWith;
@@ -644,6 +652,72 @@ public class StateDirectoryTest {
}
}
+ @Test
+ public void shouldPersistProcessIdAcrossRestart() {
+ final UUID processId = directory.initializeProcessId();
+ directory.close();
+ assertThat(directory.initializeProcessId(), equalTo(processId));
+ }
+
+ @Test
+ public void shouldGetFreshProcessIdIfProcessFileDeleted() {
+ final UUID processId = directory.initializeProcessId();
+ directory.close();
+
+ final File processFile = new File(appDir, PROCESS_FILE_NAME);
+ assertThat(processFile.exists(), is(true));
+ assertThat(processFile.delete(), is(true));
+
+ assertThat(directory.initializeProcessId(), not(processId));
+ }
+
+ @Test
+ public void shouldGetFreshProcessIdIfJsonUnreadable() throws Exception {
+ final File processFile = new File(appDir, PROCESS_FILE_NAME);
+ assertThat(processFile.createNewFile(), is(true));
+ final UUID processId = UUID.randomUUID();
+
+ final FileOutputStream fileOutputStream = new FileOutputStream(processFile);
+ try (final BufferedWriter writer = new BufferedWriter(
+ new OutputStreamWriter(fileOutputStream, StandardCharsets.UTF_8))) {
+ writer.write(processId.toString());
+ writer.flush();
+ fileOutputStream.getFD().sync();
+ }
+
+ assertThat(directory.initializeProcessId(), not(processId));
+ }
+
+ @Test
+ public void shouldReadFutureProcessFileFormat() throws Exception {
+ final File processFile = new File(appDir, PROCESS_FILE_NAME);
+ final ObjectMapper mapper = new ObjectMapper();
+ final UUID processId = UUID.randomUUID();
+ mapper.writeValue(processFile, new FutureStateDirectoryProcessFile(processId, "some random junk"));
+
+ assertThat(directory.initializeProcessId(), equalTo(processId));
+ }
+
+ private static class FutureStateDirectoryProcessFile {
+
+ @JsonProperty
+ private final UUID processId;
+
+ @JsonProperty
+ private final String newField;
+
+ public FutureStateDirectoryProcessFile() {
+ this.processId = null;
+ this.newField = null;
+ }
+
+ FutureStateDirectoryProcessFile(final UUID processId, final String newField) {
+ this.processId = processId;
+ this.newField = newField;
+
+ }
+ }
+
private static class CreateTaskDirRunner implements Runnable {
private final StateDirectory directory;
private final TaskId taskId;