You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@samza.apache.org by dc...@apache.org on 2021/12/22 19:41:13 UTC
[samza] branch master updated: Fix rollback stale checkpoints (#1571)
This is an automated email from the ASF dual-hosted git repository.
dchen pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/samza.git
The following commit(s) were added to refs/heads/master by this push:
new c125b15 Fix rollback stale checkpoints (#1571)
c125b15 is described below
commit c125b15142c921c2276508ae0322d3ea508bb136
Author: Daniel Chen <dc...@apache.org>
AuthorDate: Wed Dec 22 14:39:18 2021 -0500
Fix rollback stale checkpoints (#1571)
* Fix stale v2 checkpoints on rollback
* Fix checkstyle
* fixed liveCheckpointMaxAgeMillis
Co-authored-by: dxichen <da...@gmail.com>
---
.../java/org/apache/samza/config/TaskConfig.java | 6 +++
.../checkpoint/kafka/KafkaCheckpointManager.scala | 27 ++++++++----
.../CheckpointVersionIntegrationTest.java | 48 ++++++++++++++++++----
3 files changed, 64 insertions(+), 17 deletions(-)
diff --git a/samza-core/src/main/java/org/apache/samza/config/TaskConfig.java b/samza-core/src/main/java/org/apache/samza/config/TaskConfig.java
index 06a8727..a2b96eb 100644
--- a/samza-core/src/main/java/org/apache/samza/config/TaskConfig.java
+++ b/samza-core/src/main/java/org/apache/samza/config/TaskConfig.java
@@ -127,6 +127,8 @@ public class TaskConfig extends MapConfig {
// checkpoint version to read during container startup
public static final String CHECKPOINT_READ_VERSIONS = "task.checkpoint.read.versions";
public static final List<String> DEFAULT_CHECKPOINT_READ_VERSIONS = ImmutableList.of("1");
+ public static final String LIVE_CHECKPOINT_MAX_AGE_MS = "task.live.checkpoint.max.age";
+ public static final long DEFAULT_LIVE_CHECKPOINT_MAX_AGE_MS = 600000L; // 10 mins
public static final String TRANSACTIONAL_STATE_CHECKPOINT_ENABLED = "task.transactional.state.checkpoint.enabled";
private static final boolean DEFAULT_TRANSACTIONAL_STATE_CHECKPOINT_ENABLED = true;
@@ -360,6 +362,10 @@ public class TaskConfig extends MapConfig {
}
}
+ public long getLiveCheckpointMaxAgeMillis() {
+ return getLong(LIVE_CHECKPOINT_MAX_AGE_MS, DEFAULT_LIVE_CHECKPOINT_MAX_AGE_MS);
+ }
+
public boolean getTransactionalStateCheckpointEnabled() {
return getBoolean(TRANSACTIONAL_STATE_CHECKPOINT_ENABLED, DEFAULT_TRANSACTIONAL_STATE_CHECKPOINT_ENABLED);
}
diff --git a/samza-kafka/src/main/scala/org/apache/samza/checkpoint/kafka/KafkaCheckpointManager.scala b/samza-kafka/src/main/scala/org/apache/samza/checkpoint/kafka/KafkaCheckpointManager.scala
index 7dbb9b3..7793b56 100644
--- a/samza-kafka/src/main/scala/org/apache/samza/checkpoint/kafka/KafkaCheckpointManager.scala
+++ b/samza-kafka/src/main/scala/org/apache/samza/checkpoint/kafka/KafkaCheckpointManager.scala
@@ -82,6 +82,8 @@ class KafkaCheckpointManager(checkpointSpec: KafkaStreamSpec,
val stopConsumerAfterFirstRead: Boolean = new TaskConfig(config).getCheckpointManagerConsumerStopAfterFirstRead
val checkpointReadVersions: util.List[lang.Short] = new TaskConfig(config).getCheckpointReadVersions
+ val liveCheckpointMaxAgeMillis: Long = new TaskConfig(config).getLiveCheckpointMaxAgeMillis
+
/**
* Create checkpoint stream prior to start.
@@ -243,12 +245,15 @@ class KafkaCheckpointManager(checkpointSpec: KafkaStreamSpec,
*/
private def readCheckpoints(): Map[TaskName, Checkpoint] = {
val checkpoints = mutable.Map[TaskName, Checkpoint]()
+ val checkpointAppendTime = mutable.Map[TaskName, Long]()
val iterator = new SystemStreamPartitionIterator(systemConsumer, checkpointSsp)
var numMessagesRead = 0
while (iterator.hasNext) {
val checkpointEnvelope: IncomingMessageEnvelope = iterator.next
+ // Kafka log append time for the checkpoint message
+ val checkpointEnvelopeTs = checkpointEnvelope.getEventTime;
val offset = checkpointEnvelope.getOffset
numMessagesRead += 1
@@ -290,9 +295,12 @@ class KafkaCheckpointManager(checkpointSpec: KafkaStreamSpec,
// if checkpoint key version does not match configured checkpoint version to read, skip the message.
if (checkpointReadVersions.contains(
KafkaCheckpointLogKey.CHECKPOINT_KEY_VERSIONS.get(checkpointKey.getType))) {
- if (!checkpoints.contains(checkpointKey.getTaskName) ||
- shouldOverrideCheckpoint(checkpoints.get(checkpointKey.getTaskName), checkpointKey)) {
- checkpoints.put(checkpointKey.getTaskName, deserializeCheckpoint(checkpointKey, msgBytes))
+ val taskName = checkpointKey.getTaskName
+ if (!checkpoints.contains(taskName) ||
+ shouldOverrideCheckpoint(checkpoints.get(taskName), checkpointKey, checkpointAppendTime.get(taskName),
+ checkpointEnvelopeTs)) {
+ checkpoints.put(taskName, deserializeCheckpoint(checkpointKey, msgBytes))
+ checkpointAppendTime.put(taskName, checkpointEnvelopeTs)
} // else ignore the de-prioritized checkpoint
} else {
// Ignore and skip the unknown checkpoint key type. We do not want to throw any exceptions for this case
@@ -375,19 +383,22 @@ class KafkaCheckpointManager(checkpointSpec: KafkaStreamSpec,
}
}
- private def shouldOverrideCheckpoint(currentCheckpoint: Option[Checkpoint],
- newCheckpointKey: KafkaCheckpointLogKey): Boolean = {
+ private def shouldOverrideCheckpoint(currentCheckpoint: Option[Checkpoint], newCheckpointKey: KafkaCheckpointLogKey,
+ currentCheckpointAppendTime: Option[Long], newCheckpointAppendTime: Long): Boolean = {
val newCheckpointVersion = KafkaCheckpointLogKey.CHECKPOINT_KEY_VERSIONS.get(newCheckpointKey.getType)
if (newCheckpointVersion == null) {
// Unknown checkpoint version
throw new IllegalArgumentException("Unknown checkpoint key type: " + newCheckpointKey.getType +
" for checkpoint key: " + newCheckpointKey)
}
- // Override checkpoint if the current checkpoint does not exist or if new checkpoint has a higher restore
- // priority than the currently written checkpoint
+ // Override checkpoint if:
+ // 1. The current checkpoint does not exist or
+ // 2. The new checkpoint has a higher restore priority than the currently written checkpoint
+ // 3. The current checkpoint is determined to be stale compared to the new checkpoint timestamp
currentCheckpoint.isEmpty ||
checkpointReadVersions.indexOf(newCheckpointVersion) <=
- checkpointReadVersions.indexOf(currentCheckpoint.get.getVersion)
+ checkpointReadVersions.indexOf(currentCheckpoint.get.getVersion) ||
+ (newCheckpointAppendTime - currentCheckpointAppendTime.get > liveCheckpointMaxAgeMillis)
}
private def deserializeCheckpoint(checkpointKey: KafkaCheckpointLogKey, checkpointMsgBytes: Array[Byte]): Checkpoint = {
diff --git a/samza-test/src/test/java/org/apache/samza/checkpoint/CheckpointVersionIntegrationTest.java b/samza-test/src/test/java/org/apache/samza/checkpoint/CheckpointVersionIntegrationTest.java
index 66eb79c..2b6f045 100644
--- a/samza-test/src/test/java/org/apache/samza/checkpoint/CheckpointVersionIntegrationTest.java
+++ b/samza-test/src/test/java/org/apache/samza/checkpoint/CheckpointVersionIntegrationTest.java
@@ -77,7 +77,7 @@ public class CheckpointVersionIntegrationTest extends StreamApplicationIntegrati
List<String> inputMessagesOnInitialRun = Arrays.asList("1", "2", "3", "2", "97", "-97", ":98", ":99", ":crash_once");
// double check collectors.flush
List<String> expectedChangelogMessagesOnInitialRun = Arrays.asList("1", "2", "3", "2", "97", null, "98", "99");
- initialRun(inputMessagesOnInitialRun, expectedChangelogMessagesOnInitialRun);
+ runStatefulApp(inputMessagesOnInitialRun, inputMessagesOnInitialRun, expectedChangelogMessagesOnInitialRun, CONFIGS);
// first two are reverts for uncommitted messages from last run for keys 98 and 99
List<String> expectedChangelogMessagesAfterSecondRun =
@@ -85,11 +85,42 @@ public class CheckpointVersionIntegrationTest extends StreamApplicationIntegrati
List<String> expectedInitialStoreContentsOnSecondRun = Arrays.asList("1", "2", "3");
Map<String, String> configOverrides = new HashMap<>(CONFIGS);
configOverrides.put(TaskConfig.CHECKPOINT_READ_VERSIONS, "2");
- secondRun(CHANGELOG_TOPIC,
- expectedChangelogMessagesAfterSecondRun, expectedInitialStoreContentsOnSecondRun, configOverrides);
+ finalRun(CHANGELOG_TOPIC,
+ expectedChangelogMessagesAfterSecondRun, expectedInitialStoreContentsOnSecondRun,
+ Arrays.asList("4", "5", "5", ":shutdown"), configOverrides);
}
- private void initialRun(List<String> inputMessages, List<String> expectedChangelogMessages) {
+ @Test
+ public void testStopCheckpointV1V2AndRestartStaleCheckpointV2() {
+ List<String> inputMessagesOnInitialRun = Arrays.asList("1", "2", "3", "2", "97", "-97", ":98", ":99", ":crash_once");
+ // double check collectors.flush
+ List<String> expectedChangelogMessagesOnInitialRun = Arrays.asList("1", "2", "3", "2", "97", null, "98", "99");
+ runStatefulApp(inputMessagesOnInitialRun, inputMessagesOnInitialRun, expectedChangelogMessagesOnInitialRun, CONFIGS);
+
+
+ Map<String, String> secondConfigRunOverrides = new HashMap<>(CONFIGS);
+ // only write checkpoint v1, making checkpoint v2 stale
+ secondConfigRunOverrides.put(TaskConfig.CHECKPOINT_WRITE_VERSIONS, "1");
+ secondConfigRunOverrides.put(TaskConfig.CHECKPOINT_READ_VERSIONS, "2, 1");
+ List<String> inputMessagesOnSecondRun = Arrays.asList("77", "78", "79", ":shutdown");
+ // first two are reverts for uncommitted messages from last run for keys 98 and 99
+ expectedChangelogMessagesOnInitialRun = Arrays.asList(null, null, "98", "99", "77", "78", "79");
+ runStatefulApp(inputMessagesOnSecondRun, inputMessagesOnSecondRun, expectedChangelogMessagesOnInitialRun,
+ secondConfigRunOverrides);
+
+ // takes the latest written checkpoint v1 from run 2 since v2 checkpoints are stale
+ List<String> expectedInitialStoreContentsOnSecondRun = Arrays.asList("1", "2", "3", "77", "78", "79", "98", "99");
+
+ Map<String, String> configOverrides = new HashMap<>(CONFIGS);
+ configOverrides.put(TaskConfig.CHECKPOINT_READ_VERSIONS, "2, 1");
+ // Does not have to rewind to the last written v2 checkpoints (1, 2, 3) despite the v2 priority
+ configOverrides.put(TaskConfig.LIVE_CHECKPOINT_MAX_AGE_MS, "0"); // use the latest checkpoint
+ finalRun(CHANGELOG_TOPIC,
+ Collections.emptyList(), expectedInitialStoreContentsOnSecondRun, Collections.emptyList(), configOverrides);
+ }
+
+ private void runStatefulApp(List<String> inputMessages, List<String> expectedInputTopicMessages,
+ List<String> expectedChangelogMessages, Map<String, String> configs) {
// create input topic and produce the first batch of input messages
createTopic(INPUT_TOPIC, 1);
inputMessages.forEach(m -> produceMessage(INPUT_TOPIC, 0, m, m));
@@ -99,12 +130,12 @@ public class CheckpointVersionIntegrationTest extends StreamApplicationIntegrati
List<ConsumerRecord<String, String>> inputRecords =
consumeMessages(Collections.singletonList(INPUT_TOPIC), inputMessages.size());
List<String> readInputMessages = inputRecords.stream().map(ConsumerRecord::value).collect(Collectors.toList());
- Assert.assertEquals(inputMessages, readInputMessages);
+ Assert.assertEquals(expectedInputTopicMessages, readInputMessages);
}
// run the application
RunApplicationContext context = runApplication(
- new MyStatefulApplication(INPUT_SYSTEM, INPUT_TOPIC, Collections.singletonMap(STORE_NAME, CHANGELOG_TOPIC)), "myApp", CONFIGS);
+ new MyStatefulApplication(INPUT_SYSTEM, INPUT_TOPIC, Collections.singletonMap(STORE_NAME, CHANGELOG_TOPIC)), "myApp", configs);
// wait for the application to finish
context.getRunner().waitForFinish();
@@ -120,14 +151,13 @@ public class CheckpointVersionIntegrationTest extends StreamApplicationIntegrati
LOG.info("Finished initial run");
}
- private void secondRun(String changelogTopic, List<String> expectedChangelogMessages,
- List<String> expectedInitialStoreContents, Map<String, String> overriddenConfigs) {
+ private void finalRun(String changelogTopic, List<String> expectedChangelogMessages,
+ List<String> expectedInitialStoreContents, List<String> inputMessages, Map<String, String> overriddenConfigs) {
// remove previous files so restore is from the checkpointV2
new FileUtil().rm(new File(LOGGED_STORE_BASE_DIR));
// produce the second batch of input messages
- List<String> inputMessages = Arrays.asList("4", "5", "5", ":shutdown");
inputMessages.forEach(m -> produceMessage(INPUT_TOPIC, 0, m, m));
// run the application