You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@samza.apache.org by dc...@apache.org on 2021/12/22 19:41:13 UTC

[samza] branch master updated: Fix rollback stale checkpoints (#1571)

This is an automated email from the ASF dual-hosted git repository.

dchen pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/samza.git


The following commit(s) were added to refs/heads/master by this push:
     new c125b15  Fix rollback stale checkpoints (#1571)
c125b15 is described below

commit c125b15142c921c2276508ae0322d3ea508bb136
Author: Daniel Chen <dc...@apache.org>
AuthorDate: Wed Dec 22 14:39:18 2021 -0500

    Fix rollback stale checkpoints (#1571)
    
    * Fix stale v2 checkpoints on rollback
    
    * Fix checkstyle
    
    * fixed liveCheckpointMaxAgeMillis
    
    Co-authored-by: dxichen <da...@gmail.com>
---
 .../java/org/apache/samza/config/TaskConfig.java   |  6 +++
 .../checkpoint/kafka/KafkaCheckpointManager.scala  | 27 ++++++++----
 .../CheckpointVersionIntegrationTest.java          | 48 ++++++++++++++++++----
 3 files changed, 64 insertions(+), 17 deletions(-)

diff --git a/samza-core/src/main/java/org/apache/samza/config/TaskConfig.java b/samza-core/src/main/java/org/apache/samza/config/TaskConfig.java
index 06a8727..a2b96eb 100644
--- a/samza-core/src/main/java/org/apache/samza/config/TaskConfig.java
+++ b/samza-core/src/main/java/org/apache/samza/config/TaskConfig.java
@@ -127,6 +127,8 @@ public class TaskConfig extends MapConfig {
   // checkpoint version to read during container startup
   public static final String CHECKPOINT_READ_VERSIONS = "task.checkpoint.read.versions";
   public static final List<String> DEFAULT_CHECKPOINT_READ_VERSIONS = ImmutableList.of("1");
+  public static final String LIVE_CHECKPOINT_MAX_AGE_MS = "task.live.checkpoint.max.age";
+  public static final long DEFAULT_LIVE_CHECKPOINT_MAX_AGE_MS = 600000L; // 10 mins
 
   public static final String TRANSACTIONAL_STATE_CHECKPOINT_ENABLED = "task.transactional.state.checkpoint.enabled";
   private static final boolean DEFAULT_TRANSACTIONAL_STATE_CHECKPOINT_ENABLED = true;
@@ -360,6 +362,10 @@ public class TaskConfig extends MapConfig {
     }
   }
 
+  public long getLiveCheckpointMaxAgeMillis() {
+    return getLong(LIVE_CHECKPOINT_MAX_AGE_MS, DEFAULT_LIVE_CHECKPOINT_MAX_AGE_MS);
+  }
+
   public boolean getTransactionalStateCheckpointEnabled() {
     return getBoolean(TRANSACTIONAL_STATE_CHECKPOINT_ENABLED, DEFAULT_TRANSACTIONAL_STATE_CHECKPOINT_ENABLED);
   }
diff --git a/samza-kafka/src/main/scala/org/apache/samza/checkpoint/kafka/KafkaCheckpointManager.scala b/samza-kafka/src/main/scala/org/apache/samza/checkpoint/kafka/KafkaCheckpointManager.scala
index 7dbb9b3..7793b56 100644
--- a/samza-kafka/src/main/scala/org/apache/samza/checkpoint/kafka/KafkaCheckpointManager.scala
+++ b/samza-kafka/src/main/scala/org/apache/samza/checkpoint/kafka/KafkaCheckpointManager.scala
@@ -82,6 +82,8 @@ class KafkaCheckpointManager(checkpointSpec: KafkaStreamSpec,
   val stopConsumerAfterFirstRead: Boolean = new TaskConfig(config).getCheckpointManagerConsumerStopAfterFirstRead
 
   val checkpointReadVersions: util.List[lang.Short] = new TaskConfig(config).getCheckpointReadVersions
+  val liveCheckpointMaxAgeMillis: Long = new TaskConfig(config).getLiveCheckpointMaxAgeMillis
+
 
   /**
     * Create checkpoint stream prior to start.
@@ -243,12 +245,15 @@ class KafkaCheckpointManager(checkpointSpec: KafkaStreamSpec,
     */
   private def readCheckpoints(): Map[TaskName, Checkpoint] = {
     val checkpoints = mutable.Map[TaskName, Checkpoint]()
+    val checkpointAppendTime = mutable.Map[TaskName, Long]()
 
     val iterator = new SystemStreamPartitionIterator(systemConsumer, checkpointSsp)
     var numMessagesRead = 0
 
     while (iterator.hasNext) {
       val checkpointEnvelope: IncomingMessageEnvelope = iterator.next
+      // Kafka log append time for the checkpoint message
+      val checkpointEnvelopeTs = checkpointEnvelope.getEventTime;
       val offset = checkpointEnvelope.getOffset
 
       numMessagesRead += 1
@@ -290,9 +295,12 @@ class KafkaCheckpointManager(checkpointSpec: KafkaStreamSpec,
           // if checkpoint key version does not match configured checkpoint version to read, skip the message.
           if (checkpointReadVersions.contains(
             KafkaCheckpointLogKey.CHECKPOINT_KEY_VERSIONS.get(checkpointKey.getType))) {
-            if (!checkpoints.contains(checkpointKey.getTaskName) ||
-              shouldOverrideCheckpoint(checkpoints.get(checkpointKey.getTaskName), checkpointKey)) {
-              checkpoints.put(checkpointKey.getTaskName, deserializeCheckpoint(checkpointKey, msgBytes))
+            val taskName = checkpointKey.getTaskName
+            if (!checkpoints.contains(taskName) ||
+              shouldOverrideCheckpoint(checkpoints.get(taskName), checkpointKey, checkpointAppendTime.get(taskName),
+                checkpointEnvelopeTs)) {
+              checkpoints.put(taskName, deserializeCheckpoint(checkpointKey, msgBytes))
+              checkpointAppendTime.put(taskName, checkpointEnvelopeTs)
             } // else ignore the de-prioritized checkpoint
           } else {
             // Ignore and skip the unknown checkpoint key type. We do not want to throw any exceptions for this case
@@ -375,19 +383,22 @@ class KafkaCheckpointManager(checkpointSpec: KafkaStreamSpec,
     }
   }
 
-  private def shouldOverrideCheckpoint(currentCheckpoint: Option[Checkpoint],
-    newCheckpointKey: KafkaCheckpointLogKey): Boolean = {
+  private def shouldOverrideCheckpoint(currentCheckpoint: Option[Checkpoint], newCheckpointKey: KafkaCheckpointLogKey,
+    currentCheckpointAppendTime: Option[Long], newCheckpointAppendTime: Long): Boolean = {
     val newCheckpointVersion = KafkaCheckpointLogKey.CHECKPOINT_KEY_VERSIONS.get(newCheckpointKey.getType)
     if (newCheckpointVersion == null) {
       // Unknown checkpoint version
       throw new IllegalArgumentException("Unknown checkpoint key type: " + newCheckpointKey.getType +
         " for checkpoint key: " + newCheckpointKey)
     }
-    // Override checkpoint if the current checkpoint does not exist or if new checkpoint has a higher restore
-    // priority than the currently written checkpoint
+    // Override checkpoint if:
+    // 1. The current checkpoint does not exist or
+    // 2. The new checkpoint has a higher restore priority than the currently written checkpoint
+    // 3. The current checkpoint is determined to be stale compared to the new checkpoint timestamp
     currentCheckpoint.isEmpty ||
       checkpointReadVersions.indexOf(newCheckpointVersion) <=
-        checkpointReadVersions.indexOf(currentCheckpoint.get.getVersion)
+        checkpointReadVersions.indexOf(currentCheckpoint.get.getVersion) ||
+      (newCheckpointAppendTime - currentCheckpointAppendTime.get > liveCheckpointMaxAgeMillis)
   }
 
   private def deserializeCheckpoint(checkpointKey: KafkaCheckpointLogKey, checkpointMsgBytes: Array[Byte]): Checkpoint = {
diff --git a/samza-test/src/test/java/org/apache/samza/checkpoint/CheckpointVersionIntegrationTest.java b/samza-test/src/test/java/org/apache/samza/checkpoint/CheckpointVersionIntegrationTest.java
index 66eb79c..2b6f045 100644
--- a/samza-test/src/test/java/org/apache/samza/checkpoint/CheckpointVersionIntegrationTest.java
+++ b/samza-test/src/test/java/org/apache/samza/checkpoint/CheckpointVersionIntegrationTest.java
@@ -77,7 +77,7 @@ public class CheckpointVersionIntegrationTest extends StreamApplicationIntegrati
     List<String> inputMessagesOnInitialRun = Arrays.asList("1", "2", "3", "2", "97", "-97", ":98", ":99", ":crash_once");
     // double check collectors.flush
     List<String> expectedChangelogMessagesOnInitialRun = Arrays.asList("1", "2", "3", "2", "97", null, "98", "99");
-    initialRun(inputMessagesOnInitialRun, expectedChangelogMessagesOnInitialRun);
+    runStatefulApp(inputMessagesOnInitialRun, inputMessagesOnInitialRun, expectedChangelogMessagesOnInitialRun, CONFIGS);
 
     // first two are reverts for uncommitted messages from last run for keys 98 and 99
     List<String> expectedChangelogMessagesAfterSecondRun =
@@ -85,11 +85,42 @@ public class CheckpointVersionIntegrationTest extends StreamApplicationIntegrati
     List<String> expectedInitialStoreContentsOnSecondRun = Arrays.asList("1", "2", "3");
     Map<String, String> configOverrides = new HashMap<>(CONFIGS);
     configOverrides.put(TaskConfig.CHECKPOINT_READ_VERSIONS, "2");
-    secondRun(CHANGELOG_TOPIC,
-        expectedChangelogMessagesAfterSecondRun, expectedInitialStoreContentsOnSecondRun, configOverrides);
+    finalRun(CHANGELOG_TOPIC,
+        expectedChangelogMessagesAfterSecondRun, expectedInitialStoreContentsOnSecondRun,
+        Arrays.asList("4", "5", "5", ":shutdown"), configOverrides);
   }
 
-  private void initialRun(List<String> inputMessages, List<String> expectedChangelogMessages) {
+  @Test
+  public void testStopCheckpointV1V2AndRestartStaleCheckpointV2() {
+    List<String> inputMessagesOnInitialRun = Arrays.asList("1", "2", "3", "2", "97", "-97", ":98", ":99", ":crash_once");
+    // double check collectors.flush
+    List<String> expectedChangelogMessagesOnInitialRun = Arrays.asList("1", "2", "3", "2", "97", null, "98", "99");
+    runStatefulApp(inputMessagesOnInitialRun, inputMessagesOnInitialRun, expectedChangelogMessagesOnInitialRun, CONFIGS);
+
+
+    Map<String, String> secondConfigRunOverrides = new HashMap<>(CONFIGS);
+    // only write checkpoint v1, making checkpoint v2 stale
+    secondConfigRunOverrides.put(TaskConfig.CHECKPOINT_WRITE_VERSIONS, "1");
+    secondConfigRunOverrides.put(TaskConfig.CHECKPOINT_READ_VERSIONS, "2, 1");
+    List<String> inputMessagesOnSecondRun = Arrays.asList("77", "78", "79", ":shutdown");
+    // first two are reverts for uncommitted messages from last run for keys 98 and 99
+    expectedChangelogMessagesOnInitialRun = Arrays.asList(null, null, "98", "99", "77", "78", "79");
+    runStatefulApp(inputMessagesOnSecondRun, inputMessagesOnSecondRun, expectedChangelogMessagesOnInitialRun,
+        secondConfigRunOverrides);
+
+    // takes the latest written checkpoint v1 from run 2 since v2 checkpoints are stale
+    List<String> expectedInitialStoreContentsOnSecondRun = Arrays.asList("1", "2", "3", "77", "78", "79", "98", "99");
+
+    Map<String, String> configOverrides = new HashMap<>(CONFIGS);
+    configOverrides.put(TaskConfig.CHECKPOINT_READ_VERSIONS, "2, 1");
+    // Does not have to rewind to the last written v2 checkpoints (1, 2, 3) despite the v2 priority
+    configOverrides.put(TaskConfig.LIVE_CHECKPOINT_MAX_AGE_MS, "0"); // use the latest checkpoint
+    finalRun(CHANGELOG_TOPIC,
+        Collections.emptyList(), expectedInitialStoreContentsOnSecondRun, Collections.emptyList(), configOverrides);
+  }
+
+  private void runStatefulApp(List<String> inputMessages, List<String> expectedInputTopicMessages,
+      List<String> expectedChangelogMessages, Map<String, String> configs) {
     // create input topic and produce the first batch of input messages
     createTopic(INPUT_TOPIC, 1);
     inputMessages.forEach(m -> produceMessage(INPUT_TOPIC, 0, m, m));
@@ -99,12 +130,12 @@ public class CheckpointVersionIntegrationTest extends StreamApplicationIntegrati
       List<ConsumerRecord<String, String>> inputRecords =
           consumeMessages(Collections.singletonList(INPUT_TOPIC), inputMessages.size());
       List<String> readInputMessages = inputRecords.stream().map(ConsumerRecord::value).collect(Collectors.toList());
-      Assert.assertEquals(inputMessages, readInputMessages);
+      Assert.assertEquals(expectedInputTopicMessages, readInputMessages);
     }
 
     // run the application
     RunApplicationContext context = runApplication(
-        new MyStatefulApplication(INPUT_SYSTEM, INPUT_TOPIC, Collections.singletonMap(STORE_NAME, CHANGELOG_TOPIC)), "myApp", CONFIGS);
+        new MyStatefulApplication(INPUT_SYSTEM, INPUT_TOPIC, Collections.singletonMap(STORE_NAME, CHANGELOG_TOPIC)), "myApp", configs);
 
     // wait for the application to finish
     context.getRunner().waitForFinish();
@@ -120,14 +151,13 @@ public class CheckpointVersionIntegrationTest extends StreamApplicationIntegrati
     LOG.info("Finished initial run");
   }
 
-  private void secondRun(String changelogTopic, List<String> expectedChangelogMessages,
-      List<String> expectedInitialStoreContents, Map<String, String> overriddenConfigs) {
+  private void finalRun(String changelogTopic, List<String> expectedChangelogMessages,
+      List<String> expectedInitialStoreContents,  List<String> inputMessages, Map<String, String> overriddenConfigs) {
     // remove previous files so restore is from the checkpointV2
     new FileUtil().rm(new File(LOGGED_STORE_BASE_DIR));
 
     // produce the second batch of input messages
 
-    List<String> inputMessages = Arrays.asList("4", "5", "5", ":shutdown");
     inputMessages.forEach(m -> produceMessage(INPUT_TOPIC, 0, m, m));
 
     // run the application