You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@samza.apache.org by pm...@apache.org on 2019/10/23 01:40:34 UTC

[samza] 03/03: SAMZA-2356: [Transactional State] Do not trim changelog if time since last checkpoint is greater than min.compaction.lag. (#1196)

This is an automated email from the ASF dual-hosted git repository.

pmaheshwari pushed a commit to branch 1.3.0
in repository https://gitbox.apache.org/repos/asf/samza.git

commit cb2707f4151cfa5e58eddc46ffc38cb633397349
Author: Prateek Maheshwari <pr...@utexas.edu>
AuthorDate: Tue Oct 22 17:55:54 2019 -0700

    SAMZA-2356: [Transactional State] Do not trim changelog if time since last checkpoint is greater than min.compaction.lag. (#1196)
---
 .../org/apache/samza/checkpoint/CheckpointId.java  |  82 +++++++
 .../checkpoint/CheckpointedChangelogOffset.java    |  82 +++++++
 .../org/apache/samza/storage/StorageEngine.java    |   5 +-
 .../org/apache/samza/storage/kv/KeyValueStore.java |   5 +-
 .../java/org/apache/samza/config/TaskConfig.java   |   8 +-
 .../operators/util/InternalInMemoryStore.java      |   3 +-
 .../TransactionalStateTaskRestoreManager.java      |  62 ++++--
 .../org/apache/samza/container/TaskInstance.scala  |  23 +-
 .../NonTransactionalStateTaskStorageManager.scala  |   8 +-
 .../apache/samza/storage/TaskStorageManager.scala  |   5 +-
 .../TransactionalStateTaskStorageManager.scala     |  12 +-
 .../operators/impl/store/TestInMemoryStore.java    |   4 +-
 .../apache/samza/storage/MockStorageEngine.java    |   3 +-
 .../TestTransactionalStateTaskRestoreManager.java  | 241 +++++++++++++++++++--
 .../apache/samza/container/TestTaskInstance.scala  |  22 +-
 .../samza/storage/TestContainerStorageManager.java |   2 +-
 .../TestTransactionalStateTaskStorageManager.java  |  20 +-
 .../kv/inmemory/InMemoryKeyValueStore.scala        |   4 +-
 .../samza/storage/kv/RocksDbKeyValueStore.scala    |  12 +-
 .../samza/storage/kv/LargeMessageSafeStore.java    |   3 +-
 .../samza/storage/kv/AccessLoggedStore.scala       |   3 +-
 .../org/apache/samza/storage/kv/CachedStore.scala  |   5 +-
 .../samza/storage/kv/KeyValueStorageEngine.scala   |   4 +-
 .../org/apache/samza/storage/kv/LoggedStore.scala  |   3 +-
 .../samza/storage/kv/NullSafeKeyValueStore.scala   |   4 +-
 .../samza/storage/kv/SerializedKeyValueStore.scala |   4 +-
 .../samza/storage/kv/MockKeyValueStore.scala       |   4 +-
 .../kv/TransactionalStateIntegrationTest.java      |   2 +-
 ...ransactionalStateMultiStoreIntegrationTest.java |   2 +-
 29 files changed, 528 insertions(+), 109 deletions(-)

diff --git a/samza-api/src/main/java/org/apache/samza/checkpoint/CheckpointId.java b/samza-api/src/main/java/org/apache/samza/checkpoint/CheckpointId.java
new file mode 100644
index 0000000..95dfd24
--- /dev/null
+++ b/samza-api/src/main/java/org/apache/samza/checkpoint/CheckpointId.java
@@ -0,0 +1,82 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.samza.checkpoint;
+
+import java.util.Objects;
+import org.apache.commons.lang3.StringUtils;
+import org.apache.samza.annotation.InterfaceStability;
+
+/**
+ * Checkpoint ID has the format: [currentTimeMillis, last 6 digits of nanotime], separated by a dash.
+ * This is to avoid conflicts, e.g when requesting frequent manual commits.
+ *
+ * It is expected that persistent stores use the {@link #toString()} representation of the checkpoint id
+ * as the store checkpoint directory name.
+ */
+@InterfaceStability.Unstable
+public class CheckpointId {
+  public static final String SEPARATOR = "-";
+
+  private final long millis;
+  private final long nanos;
+
+  public CheckpointId(long millis, long nanos) {
+    this.millis = millis;
+    this.nanos = nanos;
+  }
+
+  public static CheckpointId create() {
+    return new CheckpointId(System.currentTimeMillis(), System.nanoTime() % 1000000);
+  }
+
+  public static CheckpointId fromString(String checkpointId) {
+    if (StringUtils.isBlank(checkpointId)) {
+      throw new IllegalArgumentException("Invalid checkpoint id: " + checkpointId);
+    }
+    String[] parts = checkpointId.split(SEPARATOR);
+    return new CheckpointId(Long.parseLong(parts[0]), Long.parseLong(parts[1]));
+  }
+
+  public long getMillis() {
+    return millis;
+  }
+
+  public long getNanos() {
+    return nanos;
+  }
+
+  @Override
+  public String toString() {
+    return String.format("%s%s%s", millis, SEPARATOR, nanos);
+  }
+
+  @Override
+  public boolean equals(Object o) {
+    if (this == o) return true;
+    if (o == null || getClass() != o.getClass()) return false;
+    CheckpointId that = (CheckpointId) o;
+    return millis == that.millis &&
+        nanos == that.nanos;
+  }
+
+  @Override
+  public int hashCode() {
+    return Objects.hash(millis, nanos);
+  }
+}
\ No newline at end of file
diff --git a/samza-api/src/main/java/org/apache/samza/checkpoint/CheckpointedChangelogOffset.java b/samza-api/src/main/java/org/apache/samza/checkpoint/CheckpointedChangelogOffset.java
new file mode 100644
index 0000000..407ce7a
--- /dev/null
+++ b/samza-api/src/main/java/org/apache/samza/checkpoint/CheckpointedChangelogOffset.java
@@ -0,0 +1,82 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.samza.checkpoint;
+
+import java.util.Objects;
+import org.apache.commons.lang3.StringUtils;
+import org.apache.samza.annotation.InterfaceStability;
+
+/**
+ * Checkpointed changelog offset has the format: [checkpointId, offset], separated by a colon.
+ */
+@InterfaceStability.Unstable
+public class CheckpointedChangelogOffset {
+  public static final String SEPARATOR = ":";
+
+  private final CheckpointId checkpointId;
+  private final String offset;
+
+  public CheckpointedChangelogOffset(CheckpointId checkpointId, String offset) {
+    this.checkpointId = checkpointId;
+    this.offset = offset;
+  }
+
+  public static CheckpointedChangelogOffset fromString(String message) {
+    if (StringUtils.isBlank(message)) {
+      throw new IllegalArgumentException("Invalid checkpointed changelog message: " + message);
+    }
+    String[] checkpointIdAndOffset = message.split(":");
+    if (checkpointIdAndOffset.length != 2) {
+      throw new IllegalArgumentException("Invalid checkpointed changelog offset: " + message);
+    }
+    CheckpointId checkpointId = CheckpointId.fromString(checkpointIdAndOffset[0]);
+    String offset = null;
+    if (!"null".equals(checkpointIdAndOffset[1])) {
+      offset = checkpointIdAndOffset[1];
+    }
+    return new CheckpointedChangelogOffset(checkpointId, offset);
+  }
+
+  public CheckpointId getCheckpointId() {
+    return checkpointId;
+  }
+
+  public String getOffset() {
+    return offset;
+  }
+
+  @Override
+  public String toString() {
+    return String.format("%s%s%s", checkpointId, SEPARATOR, offset);
+  }
+
+  @Override
+  public boolean equals(Object o) {
+    if (this == o) return true;
+    if (o == null || getClass() != o.getClass()) return false;
+    CheckpointedChangelogOffset that = (CheckpointedChangelogOffset) o;
+    return Objects.equals(checkpointId, that.checkpointId) &&
+        Objects.equals(offset, that.offset);
+  }
+
+  @Override
+  public int hashCode() {
+    return Objects.hash(checkpointId, offset);
+  }
+}
diff --git a/samza-api/src/main/java/org/apache/samza/storage/StorageEngine.java b/samza-api/src/main/java/org/apache/samza/storage/StorageEngine.java
index 804a250..8add1de 100644
--- a/samza-api/src/main/java/org/apache/samza/storage/StorageEngine.java
+++ b/samza-api/src/main/java/org/apache/samza/storage/StorageEngine.java
@@ -22,6 +22,8 @@ package org.apache.samza.storage;
 
 import java.nio.file.Path;
 import java.util.Optional;
+import org.apache.samza.annotation.InterfaceStability;
+import org.apache.samza.checkpoint.CheckpointId;
 import org.apache.samza.system.ChangelogSSPIterator;
 
 /**
@@ -55,7 +57,8 @@ public interface StorageEngine {
   /**
    * Checkpoint store snapshots.
    */
-  Optional<Path> checkpoint(String id);
+  @InterfaceStability.Unstable
+  Optional<Path> checkpoint(CheckpointId id);
 
   /**
    * Close the storage engine
diff --git a/samza-api/src/main/java/org/apache/samza/storage/kv/KeyValueStore.java b/samza-api/src/main/java/org/apache/samza/storage/kv/KeyValueStore.java
index d262e29..41faac3 100644
--- a/samza-api/src/main/java/org/apache/samza/storage/kv/KeyValueStore.java
+++ b/samza-api/src/main/java/org/apache/samza/storage/kv/KeyValueStore.java
@@ -25,6 +25,7 @@ import java.util.List;
 import java.util.Map;
 import java.util.Optional;
 import org.apache.samza.annotation.InterfaceStability;
+import org.apache.samza.checkpoint.CheckpointId;
 
 
 /**
@@ -150,6 +151,6 @@ public interface KeyValueStore<K, V> {
    * Create a persistent checkpoint / snapshot of the current store state and return it's path.
    * @return the path of the persistent store checkpoint, or an empty optional if checkpoints are not supported.
    */
-  @InterfaceStability.Evolving
-  Optional<Path> checkpoint(String id);
+  @InterfaceStability.Unstable
+  Optional<Path> checkpoint(CheckpointId id);
 }
diff --git a/samza-core/src/main/java/org/apache/samza/config/TaskConfig.java b/samza-core/src/main/java/org/apache/samza/config/TaskConfig.java
index 22e0fa9..b02f6c9 100644
--- a/samza-core/src/main/java/org/apache/samza/config/TaskConfig.java
+++ b/samza-core/src/main/java/org/apache/samza/config/TaskConfig.java
@@ -110,8 +110,8 @@ public class TaskConfig extends MapConfig {
   private static final boolean DEFAULT_TRANSACTIONAL_STATE_CHECKPOINT_ENABLED = true;
   public static final String TRANSACTIONAL_STATE_RESTORE_ENABLED = "task.transactional.state.restore.enabled";
   private static final boolean DEFAULT_TRANSACTIONAL_STATE_RESTORE_ENABLED = false;
-  public static final String TRANSACTIONAL_STATE_RETAIN_EXISTING_CHANGELOG_STATE =
-      "task.transactional.state.retain.existing.changelog.state";
+  public static final String TRANSACTIONAL_STATE_RETAIN_EXISTING_STATE =
+      "task.transactional.state.retain.existing.state";
   private static final boolean DEFAULT_TRANSACTIONAL_STATE_RETAIN_EXISTING_STATE = true;
 
   public TaskConfig(Config config) {
@@ -313,7 +313,7 @@ public class TaskConfig extends MapConfig {
     return getBoolean(TRANSACTIONAL_STATE_RESTORE_ENABLED, DEFAULT_TRANSACTIONAL_STATE_RESTORE_ENABLED);
   }
 
-  public boolean getTransactionalStateRetainExistingChangelogState() {
-    return getBoolean(TRANSACTIONAL_STATE_RETAIN_EXISTING_CHANGELOG_STATE, DEFAULT_TRANSACTIONAL_STATE_RETAIN_EXISTING_STATE);
+  public boolean getTransactionalStateRetainExistingState() {
+    return getBoolean(TRANSACTIONAL_STATE_RETAIN_EXISTING_STATE, DEFAULT_TRANSACTIONAL_STATE_RETAIN_EXISTING_STATE);
   }
 }
diff --git a/samza-core/src/main/java/org/apache/samza/operators/util/InternalInMemoryStore.java b/samza-core/src/main/java/org/apache/samza/operators/util/InternalInMemoryStore.java
index 2ad25eb..6a5ebf8 100644
--- a/samza-core/src/main/java/org/apache/samza/operators/util/InternalInMemoryStore.java
+++ b/samza-core/src/main/java/org/apache/samza/operators/util/InternalInMemoryStore.java
@@ -19,6 +19,7 @@
 
 package org.apache.samza.operators.util;
 
+import org.apache.samza.checkpoint.CheckpointId;
 import org.apache.samza.storage.kv.Entry;
 import org.apache.samza.storage.kv.KeyValueSnapshot;
 import org.apache.samza.storage.kv.KeyValueIterator;
@@ -140,7 +141,7 @@ public class InternalInMemoryStore<K, V> implements KeyValueStore<K, V> {
   }
 
   @Override
-  public Optional<Path> checkpoint(String id) {
+  public Optional<Path> checkpoint(CheckpointId id) {
     return Optional.empty();
   }
 }
diff --git a/samza-core/src/main/java/org/apache/samza/storage/TransactionalStateTaskRestoreManager.java b/samza-core/src/main/java/org/apache/samza/storage/TransactionalStateTaskRestoreManager.java
index 4dd7f59..1e54ea1 100644
--- a/samza-core/src/main/java/org/apache/samza/storage/TransactionalStateTaskRestoreManager.java
+++ b/samza-core/src/main/java/org/apache/samza/storage/TransactionalStateTaskRestoreManager.java
@@ -31,9 +31,12 @@ import java.util.HashMap;
 import java.util.List;
 import java.util.Map;
 import java.util.Optional;
+import org.apache.commons.lang3.StringUtils;
 import org.apache.samza.Partition;
 import org.apache.samza.SamzaException;
+import org.apache.samza.checkpoint.CheckpointedChangelogOffset;
 import org.apache.samza.config.Config;
+import org.apache.samza.config.StorageConfig;
 import org.apache.samza.config.TaskConfig;
 import org.apache.samza.container.TaskName;
 import org.apache.samza.job.model.TaskMode;
@@ -230,8 +233,16 @@ public class TransactionalStateTaskRestoreManager implements TaskRestoreManager
         SystemStreamPartitionMetadata changelogSSPMetadata = currentChangelogOffsets.get(changelogSSP);
         String oldestOffset = changelogSSPMetadata.getOldestOffset();
         String newestOffset = changelogSSPMetadata.getNewestOffset();
-        String checkpointedOffset = checkpointedChangelogOffsets.get(changelogSSP);
 
+        String checkpointMessage = checkpointedChangelogOffsets.get(changelogSSP);
+        String checkpointedOffset = null;  // can be null if no message, or message has null offset
+        long timeSinceLastCheckpointInMs = Long.MAX_VALUE;
+        if (StringUtils.isNotBlank(checkpointMessage)) {
+          CheckpointedChangelogOffset checkpointedChangelogOffset = CheckpointedChangelogOffset.fromString(checkpointMessage);
+          checkpointedOffset = checkpointedChangelogOffset.getOffset();
+          timeSinceLastCheckpointInMs = System.currentTimeMillis() -
+              checkpointedChangelogOffset.getCheckpointId().getMillis();
+        }
 
         Optional<File> currentDirOptional;
         Optional<List<File>> checkpointDirsOptional;
@@ -255,21 +266,27 @@ public class TransactionalStateTaskRestoreManager implements TaskRestoreManager
             storeDirsToDelete.put(storeName, currentDir);
           });
 
-        // first check if checkpointed offset is invalid (i.e., out of range of current offsets, or null)
         if (checkpointedOffset == null && oldestOffset != null) {
           // this can mean that either this is the initial migration for this feature and there are no previously
           // checkpointed changelog offsets, or that this is a new store or changelog topic after the initial migration.
 
           // if this is the first time migration, it might be desirable to retain existing data.
-          // if this is new store or topic, it's possible that the container previously died after writing some data to
-          // the changelog but before a commit, so it's desirable to delete the store, not restore anything and
+          // if this is new store or topic, it is possible that the container previously died after writing some data to
+          // the changelog but before a commit, so it is desirable to delete the store, not restore anything and
           // trim the changelog
 
-          // since we can't easily tell the difference b/w the two scenarios by just looking at the store and changelogs,
+          // since we can't tell the difference b/w the two scenarios by just looking at the store and changelogs,
           // we'll request users to indicate whether to retain existing data using a config flag. this flag should only
           // be set during migrations, and turned off after the first successful commit of the new container (i.e. next
           // deploy). for simplicity, we'll always delete the local store, and restore from changelog if necessary.
 
+          // the former scenario should not be common. the recommended way to opt-in to the transactional state feature
+          // is to first upgrade to the latest samza version but keep the transactional state restore config off.
+          // this will create the store checkpoint directories and write the changelog offset to the checkpoint, but
+          // will not use them during restore. once this is done (i.e. at least one commit after upgrade), the
+          // transactional state restore feature can be turned on on subsequent deploys. this code path exists as a
+          // fail-safe against clearing changelogs in case users do not follow upgrade instructions and enable the
+          // feature directly.
           checkpointDirsOptional.ifPresent(checkpointDirs ->
               checkpointDirs.forEach(checkpointDir -> {
                   LOG.info("Marking checkpoint directory: {} for store: {} in task: {} for deletion since checkpointed " +
@@ -278,7 +295,7 @@ public class TransactionalStateTaskRestoreManager implements TaskRestoreManager
                   storeDirsToDelete.put(storeName, checkpointDir);
                 }));
 
-          if (new TaskConfig(config).getTransactionalStateRetainExistingChangelogState()) {
+          if (new TaskConfig(config).getTransactionalStateRetainExistingState()) {
             // mark for restore from (oldest, newest) to recreate local state.
             LOG.warn("Checkpointed offset for store: {} in task: {} is null. Since retain existing state is true, " +
                 "local state will be fully restored from current changelog contents. " +
@@ -290,7 +307,7 @@ public class TransactionalStateTaskRestoreManager implements TaskRestoreManager
             // mark for restore from (oldest, null) to trim entire changelog.
             storesToRestore.put(storeName, new RestoreOffsets(oldestOffset, null));
           }
-        } else if (// check if the checkpointed offset is in range of current oldest and newest offsets
+        } else if (// check if the checkpointed offset is out of range of current oldest and newest offsets
             admin.offsetComparator(oldestOffset, checkpointedOffset) > 0 ||
             admin.offsetComparator(checkpointedOffset, newestOffset) > 0) {
           // checkpointed offset is out of range. this could mean that this is a TTL topic and the checkpointed
@@ -312,12 +329,29 @@ public class TransactionalStateTaskRestoreManager implements TaskRestoreManager
                 "will be fully restored from current changelog contents.", storeName);
             storesToRestore.put(storeName, new RestoreOffsets(oldestOffset, checkpointedOffset));
           } else { // persistent logged store
-            // if there exists a valid store checkpoint directory with oldest offset <= local offset <= checkpointed offset,
+            String targetOffset;
+
+            // check checkpoint time against min.compaction.lag.ms. if older, restore from checkpointed offset to newest
+            // with no trim. be conservative. allow 10% safety margin to avoid deletions when the downtime is close
+            // to min.compaction.lag.ms
+            long minCompactionLagMs = new StorageConfig(config).getChangelogMinCompactionLagMs(storeName);
+            if (timeSinceLastCheckpointInMs > .9 * minCompactionLagMs) {
+              LOG.warn("Checkpointed offset for store: {} in task: {} is: {}. It is in range of oldest: {} and " +
+                  "newest: {} changelog offset. However, time since last checkpoint is: {}, which is greater than " +
+                  "0.9 * min.compaction.lag.ms: {} for the changelog topic. Since there is a chance that" +
+                  "the changelog topic has been compacted, restoring store to the end of the current changelog contents." +
+                  "There is no transactional local state guarantee.", storeName, taskName, checkpointedOffset,
+                  oldestOffset, newestOffset, timeSinceLastCheckpointInMs, minCompactionLagMs);
+              targetOffset = newestOffset;
+            } else {
+              targetOffset = checkpointedOffset;
+            }
+
+            // if there exists a valid store checkpoint directory with oldest offset <= local offset <= target offset,
             // retain it and restore the delta. delete all other checkpoint directories for the store. if more than one such
             // checkpoint directory exists, retain the one with the highest local offset and delete the rest.
             boolean hasValidCheckpointDir = false;
             for (File checkpointDir: checkpointDirsOptional.get()) {
-              // TODO HIGH pmaheshw: should validation check / warn for compact lag config staleness too?
               if (storageManagerUtil.isLoggedStoreValid(
                   storeName, checkpointDir, config, storeChangelogs, taskModel, clock, storeEngines)) {
                 String localOffset = storageManagerUtil.readOffsetFile(
@@ -326,17 +360,17 @@ public class TransactionalStateTaskRestoreManager implements TaskRestoreManager
                     checkpointDir, taskName);
 
                 if (admin.offsetComparator(localOffset, oldestOffset) >= 0 &&
-                    admin.offsetComparator(localOffset, checkpointedOffset) <= 0 &&
+                    admin.offsetComparator(localOffset, targetOffset) <= 0 &&
                     (storesToRestore.get(storeName) == null ||
                         admin.offsetComparator(localOffset, storesToRestore.get(storeName).startingOffset) > 0)) {
                   hasValidCheckpointDir = true;
                   LOG.info("Temporarily marking checkpoint dir: {} for store: {} in task: {} for retention. " +
-                          "May be overridden later.", checkpointDir, storeName, taskName);
+                      "May be overridden later.", checkpointDir, storeName, taskName);
                   storeDirToRetain.put(storeName, checkpointDir);
                   // mark for restore even if local == checkpointed, so that the changelog gets trimmed.
                   LOG.info("Temporarily marking store: {} in task: {} for restore from beginning offset: {} to " +
-                          "ending offset: {}. May be overridden later", storeName, taskName, localOffset, checkpointedOffset);
-                  storesToRestore.put(storeName, new RestoreOffsets(localOffset, checkpointedOffset));
+                      "ending offset: {}. May be overridden later", storeName, taskName, localOffset, targetOffset);
+                  storesToRestore.put(storeName, new RestoreOffsets(localOffset, targetOffset));
                 }
               }
             }
@@ -353,7 +387,7 @@ public class TransactionalStateTaskRestoreManager implements TaskRestoreManager
 
             // if the store had not valid checkpoint dirs to retain, restore from changelog
             if (!hasValidCheckpointDir) {
-              storesToRestore.put(storeName, new RestoreOffsets(oldestOffset, checkpointedOffset));
+              storesToRestore.put(storeName, new RestoreOffsets(oldestOffset, targetOffset));
             }
           }
         }
diff --git a/samza-core/src/main/scala/org/apache/samza/container/TaskInstance.scala b/samza-core/src/main/scala/org/apache/samza/container/TaskInstance.scala
index a87c535..2a4f1d6 100644
--- a/samza-core/src/main/scala/org/apache/samza/container/TaskInstance.scala
+++ b/samza-core/src/main/scala/org/apache/samza/container/TaskInstance.scala
@@ -24,7 +24,7 @@ import java.util.{Objects, Optional}
 import java.util.concurrent.ScheduledExecutorService
 
 import org.apache.samza.SamzaException
-import org.apache.samza.checkpoint.{Checkpoint, OffsetManager}
+import org.apache.samza.checkpoint.{Checkpoint, CheckpointId, CheckpointedChangelogOffset, OffsetManager}
 import org.apache.samza.config.{Config, StreamConfig, TaskConfig}
 import org.apache.samza.context._
 import org.apache.samza.job.model.{JobModel, TaskModel}
@@ -253,23 +253,26 @@ class TaskInstance(
       trace("Flushing state stores for taskName: %s" format taskName)
       newestChangelogOffsets = storageManager.flush()
       trace("Got newest changelog offsets for taskName: %s as: %s " format(taskName, newestChangelogOffsets))
+    }
+
+    val checkpointId = CheckpointId.create()
+    if (storageManager != null && newestChangelogOffsets != null) {
+      trace("Checkpointing stores for taskName: %s with checkpoint id: %s" format (taskName, checkpointId))
+      storageManager.checkpoint(checkpointId, newestChangelogOffsets.toMap)
+    }
+
+    if (newestChangelogOffsets != null) {
       newestChangelogOffsets.foreach {case (ssp, newestOffsetOption) =>
-        allCheckpointOffsets.put(ssp, newestOffsetOption.orNull)
+        val offset = new CheckpointedChangelogOffset(checkpointId, newestOffsetOption.orNull).toString
+        allCheckpointOffsets.put(ssp, offset)
       }
     }
-
     val checkpoint = new Checkpoint(allCheckpointOffsets)
     trace("Got combined checkpoint offsets for taskName: %s as: %s" format (taskName, allCheckpointOffsets))
 
-    var checkpointId: String = null
-    if (storageManager != null && newestChangelogOffsets != null) {
-      trace("Checkpointing stores for taskName: %s" format taskName)
-      checkpointId = storageManager.checkpoint(newestChangelogOffsets.toMap)
-    }
-
     offsetManager.writeCheckpoint(taskName, checkpoint)
 
-    if (storageManager != null && checkpointId != null) {
+    if (storageManager != null) {
       trace("Remove old checkpoint stores for taskName: %s" format taskName)
       storageManager.removeOldCheckpoints(checkpointId)
     }
diff --git a/samza-core/src/main/scala/org/apache/samza/storage/NonTransactionalStateTaskStorageManager.scala b/samza-core/src/main/scala/org/apache/samza/storage/NonTransactionalStateTaskStorageManager.scala
index cfca0f7..7b38749 100644
--- a/samza-core/src/main/scala/org/apache/samza/storage/NonTransactionalStateTaskStorageManager.scala
+++ b/samza-core/src/main/scala/org/apache/samza/storage/NonTransactionalStateTaskStorageManager.scala
@@ -23,6 +23,7 @@ import java.io._
 
 import com.google.common.annotations.VisibleForTesting
 import com.google.common.collect.ImmutableSet
+import org.apache.samza.checkpoint.CheckpointId
 import org.apache.samza.container.TaskName
 import org.apache.samza.job.model.TaskMode
 import org.apache.samza.system._
@@ -57,11 +58,10 @@ class NonTransactionalStateTaskStorageManager(
     newestChangelogSSPOffsets
   }
 
-  def checkpoint(newestChangelogOffsets: Map[SystemStreamPartition, Option[String]]): String = {
-    null
-  }
+  override def checkpoint(checkpointId: CheckpointId,
+    newestChangelogOffsets: Map[SystemStreamPartition, Option[String]]): Unit = {}
 
-  override def removeOldCheckpoints(checkpointId: String): Unit =  {}
+  override def removeOldCheckpoints(checkpointId: CheckpointId): Unit = {}
 
   @VisibleForTesting
   def stop() {
diff --git a/samza-core/src/main/scala/org/apache/samza/storage/TaskStorageManager.scala b/samza-core/src/main/scala/org/apache/samza/storage/TaskStorageManager.scala
index c98461b..50d6418 100644
--- a/samza-core/src/main/scala/org/apache/samza/storage/TaskStorageManager.scala
+++ b/samza-core/src/main/scala/org/apache/samza/storage/TaskStorageManager.scala
@@ -19,6 +19,7 @@
 
 package org.apache.samza.storage
 
+import org.apache.samza.checkpoint.CheckpointId
 import org.apache.samza.system.SystemStreamPartition
 
 trait TaskStorageManager {
@@ -27,9 +28,9 @@ trait TaskStorageManager {
 
   def flush(): Map[SystemStreamPartition, Option[String]]
 
-  def checkpoint(newestChangelogOffsets: Map[SystemStreamPartition, Option[String]]): String
+  def checkpoint(checkpointId: CheckpointId, newestChangelogOffsets: Map[SystemStreamPartition, Option[String]]): Unit
 
-  def removeOldCheckpoints(checkpointId: String): Unit
+  def removeOldCheckpoints(checkpointId: CheckpointId): Unit
 
   def stop(): Unit
 
diff --git a/samza-core/src/main/scala/org/apache/samza/storage/TransactionalStateTaskStorageManager.scala b/samza-core/src/main/scala/org/apache/samza/storage/TransactionalStateTaskStorageManager.scala
index c808866..20c7271 100644
--- a/samza-core/src/main/scala/org/apache/samza/storage/TransactionalStateTaskStorageManager.scala
+++ b/samza-core/src/main/scala/org/apache/samza/storage/TransactionalStateTaskStorageManager.scala
@@ -26,6 +26,7 @@ import com.google.common.annotations.VisibleForTesting
 import com.google.common.collect.ImmutableSet
 import org.apache.commons.io.FileUtils
 import org.apache.commons.io.filefilter.WildcardFileFilter
+import org.apache.samza.checkpoint.CheckpointId
 import org.apache.samza.{Partition, SamzaException}
 import org.apache.samza.container.TaskName
 import org.apache.samza.job.model.TaskMode
@@ -56,15 +57,14 @@ class TransactionalStateTaskStorageManager(
     getNewestChangelogSSPOffsets(taskName, storeChangelogs, partition, systemAdmins)
   }
 
-  def checkpoint(newestChangelogOffsets: Map[SystemStreamPartition, Option[String]]): String = {
+  def checkpoint(checkpointId: CheckpointId, newestChangelogOffsets: Map[SystemStreamPartition, Option[String]]): Unit = {
     debug("Checkpointing stores.")
 
-    val id = System.currentTimeMillis().toString
     val checkpointPaths = containerStorageManager.getAllStores(taskName).asScala
       .filter { case (storeName, storeEngine) =>
         storeEngine.getStoreProperties.isLoggedStore && storeEngine.getStoreProperties.isPersistedToDisk}
       .flatMap { case (storeName, storeEngine) => {
-        val pathOptional = storeEngine.checkpoint(id)
+        val pathOptional = storeEngine.checkpoint(checkpointId)
         if (pathOptional.isPresent) {
           Some(storeName, pathOptional.get())
         } else {
@@ -74,11 +74,9 @@ class TransactionalStateTaskStorageManager(
       .toMap
 
     writeChangelogOffsetFiles(checkpointPaths, storeChangelogs, newestChangelogOffsets)
-
-    id
   }
 
-  def removeOldCheckpoints(latestCheckpointId: String): Unit = {
+  def removeOldCheckpoints(latestCheckpointId: CheckpointId): Unit = {
     if (latestCheckpointId != null) {
       debug("Removing older checkpoints before " + latestCheckpointId)
 
@@ -93,7 +91,7 @@ class TransactionalStateTaskStorageManager(
             val checkpointDirs = storeDir.listFiles(fileFilter)
 
             checkpointDirs
-              .filter(!_.getName.contains(latestCheckpointId))
+              .filter(!_.getName.contains(latestCheckpointId.toString))
               .foreach(checkpointDir => {
                 FileUtils.deleteDirectory(checkpointDir)
               })
diff --git a/samza-core/src/test/java/org/apache/samza/operators/impl/store/TestInMemoryStore.java b/samza-core/src/test/java/org/apache/samza/operators/impl/store/TestInMemoryStore.java
index c742409..0022cc7 100644
--- a/samza-core/src/test/java/org/apache/samza/operators/impl/store/TestInMemoryStore.java
+++ b/samza-core/src/test/java/org/apache/samza/operators/impl/store/TestInMemoryStore.java
@@ -19,6 +19,8 @@
 package org.apache.samza.operators.impl.store;
 
 import com.google.common.primitives.UnsignedBytes;
+
+import org.apache.samza.checkpoint.CheckpointId;
 import org.apache.samza.serializers.Serde;
 import org.apache.samza.storage.kv.Entry;
 import org.apache.samza.storage.kv.KeyValueSnapshot;
@@ -132,7 +134,7 @@ public class TestInMemoryStore<K, V> implements KeyValueStore<K, V> {
   }
 
   @Override
-  public Optional<Path> checkpoint(String id) {
+  public Optional<Path> checkpoint(CheckpointId id) {
     return Optional.empty();
   }
 
diff --git a/samza-core/src/test/java/org/apache/samza/storage/MockStorageEngine.java b/samza-core/src/test/java/org/apache/samza/storage/MockStorageEngine.java
index 405abd7..2fdb81a 100644
--- a/samza-core/src/test/java/org/apache/samza/storage/MockStorageEngine.java
+++ b/samza-core/src/test/java/org/apache/samza/storage/MockStorageEngine.java
@@ -26,6 +26,7 @@ import java.util.Collections;
 
 import java.util.List;
 import java.util.Optional;
+import org.apache.samza.checkpoint.CheckpointId;
 import org.apache.samza.system.ChangelogSSPIterator;
 import org.apache.samza.system.IncomingMessageEnvelope;
 import org.apache.samza.system.SystemStreamPartition;
@@ -63,7 +64,7 @@ public class MockStorageEngine implements StorageEngine {
   }
 
   @Override
-  public Optional<Path> checkpoint(String id) {
+  public Optional<Path> checkpoint(CheckpointId id) {
     return Optional.empty();
   }
 
diff --git a/samza-core/src/test/java/org/apache/samza/storage/TestTransactionalStateTaskRestoreManager.java b/samza-core/src/test/java/org/apache/samza/storage/TestTransactionalStateTaskRestoreManager.java
index 789c0e7..879f8d5 100644
--- a/samza-core/src/test/java/org/apache/samza/storage/TestTransactionalStateTaskRestoreManager.java
+++ b/samza-core/src/test/java/org/apache/samza/storage/TestTransactionalStateTaskRestoreManager.java
@@ -31,6 +31,8 @@ import java.util.HashMap;
 import java.util.Map;
 import java.util.Set;
 import org.apache.samza.Partition;
+import org.apache.samza.checkpoint.CheckpointId;
+import org.apache.samza.checkpoint.CheckpointedChangelogOffset;
 import org.apache.samza.config.Config;
 import org.apache.samza.config.MapConfig;
 import org.apache.samza.config.TaskConfig;
@@ -180,8 +182,10 @@ public class TestTransactionalStateTaskRestoreManager {
     Map<String, SystemStream> mockStoreChangelogs = ImmutableMap.of(store1Name, changelog1SystemStream);
 
     String changelog1CheckpointedOffset = "5";
+    CheckpointedChangelogOffset changelog1CheckpointMessage =
+        new CheckpointedChangelogOffset(CheckpointId.create(), changelog1CheckpointedOffset);
     ImmutableMap<SystemStreamPartition, String> mockCheckpointedChangelogOffset =
-        ImmutableMap.of(changelog1SSP, changelog1CheckpointedOffset);
+        ImmutableMap.of(changelog1SSP, changelog1CheckpointMessage.toString());
     Map<SystemStreamPartition, SystemStreamPartitionMetadata> mockCurrentChangelogOffsets =
         ImmutableMap.of(changelog1SSP, changelog1SSPMetadata);
 
@@ -242,9 +246,11 @@ public class TestTransactionalStateTaskRestoreManager {
     Map<String, SystemStream> mockStoreChangelogs = ImmutableMap.of(store1Name, changelog1SystemStream);
 
     String changelog1CheckpointedOffset = "21";
+    CheckpointedChangelogOffset changelog1CheckpointMessage =
+        new CheckpointedChangelogOffset(CheckpointId.create(), changelog1CheckpointedOffset);
     Map<SystemStreamPartition, String> mockCheckpointedChangelogOffset =
         new HashMap<SystemStreamPartition, String>() { {
-          put(changelog1SSP, changelog1CheckpointedOffset);
+          put(changelog1SSP, changelog1CheckpointMessage.toString());
         } };
     Map<SystemStreamPartition, SystemStreamPartitionMetadata> mockCurrentChangelogOffsets =
         ImmutableMap.of(changelog1SSP, changelog1SSPMetadata);
@@ -307,9 +313,79 @@ public class TestTransactionalStateTaskRestoreManager {
     Map<String, SystemStream> mockStoreChangelogs = ImmutableMap.of(store1Name, changelog1SystemStream);
 
     String changelog1CheckpointedOffset = "5";
+    CheckpointedChangelogOffset changelog1CheckpointMessage =
+        new CheckpointedChangelogOffset(CheckpointId.create(), changelog1CheckpointedOffset);
     Map<SystemStreamPartition, String> mockCheckpointedChangelogOffset =
         new HashMap<SystemStreamPartition, String>() { {
-          put(changelog1SSP, changelog1CheckpointedOffset);
+          put(changelog1SSP, changelog1CheckpointMessage.toString());
+        } };
+    Map<SystemStreamPartition, SystemStreamPartitionMetadata> mockCurrentChangelogOffsets =
+        ImmutableMap.of(changelog1SSP, changelog1SSPMetadata);
+
+    SystemAdmins mockSystemAdmins = mock(SystemAdmins.class);
+    SystemAdmin mockSystemAdmin = mock(SystemAdmin.class);
+    when(mockSystemAdmins.getSystemAdmin(changelog1SSP.getSystem())).thenReturn(mockSystemAdmin);
+    StorageManagerUtil mockStorageManagerUtil = mock(StorageManagerUtil.class);
+    File mockLoggedStoreBaseDir = mock(File.class);
+    File mockNonLoggedStoreBaseDir = mock(File.class);
+    Config mockConfig = mock(Config.class);
+    Clock mockClock = mock(Clock.class);
+
+    Mockito.when(mockSystemAdmin.offsetComparator(anyString(), anyString()))
+        .thenAnswer((Answer<Integer>) invocation -> {
+            String offset1 = (String) invocation.getArguments()[0];
+            String offset2 = (String) invocation.getArguments()[1];
+            return Long.valueOf(offset1).compareTo(Long.valueOf(offset2));
+          });
+
+    StoreActions storeActions = TransactionalStateTaskRestoreManager.getStoreActions(
+        mockTaskModel, mockStoreEngines, mockStoreChangelogs, mockCheckpointedChangelogOffset,
+        mockCurrentChangelogOffsets, mockSystemAdmins, mockStorageManagerUtil,
+        mockLoggedStoreBaseDir, mockNonLoggedStoreBaseDir, mockConfig, mockClock);
+
+    // ensure that there is nothing to retain or delete
+    assertEquals(0, storeActions.storeDirsToDelete.size());
+    assertEquals(0, storeActions.storeDirsToRetain.size());
+    // ensure that we mark the store for full restore (from current oldest to current newest)
+    assertEquals("10", storeActions.storesToRestore.get(store1Name).startingOffset);
+    assertEquals("20", storeActions.storesToRestore.get(store1Name).endingOffset);
+  }
+
+  /**
+   * This can happen if the changelog offset is valid but the checkpoint is older than min compaction lag ms. E.g., when
+   * the job/container shut down and restarted after a long time.
+   */
+  @Test
+  public void testGetStoreActionsForLoggedNonPersistentStore_FullRestoreIfCheckpointedOffsetInRangeButMaybeCompacted() {
+    TaskModel mockTaskModel = mock(TaskModel.class);
+    TaskName taskName = new TaskName("Partition 0");
+    when(mockTaskModel.getTaskName()).thenReturn(taskName);
+    Partition taskChangelogPartition = new Partition(0);
+    when(mockTaskModel.getChangelogPartition()).thenReturn(taskChangelogPartition);
+
+    String store1Name = "store1";
+    StorageEngine store1Engine = mock(StorageEngine.class);
+    StoreProperties mockStore1Properties = mock(StoreProperties.class);
+    when(store1Engine.getStoreProperties()).thenReturn(mockStore1Properties);
+    when(mockStore1Properties.isLoggedStore()).thenReturn(true);
+    when(mockStore1Properties.isPersistedToDisk()).thenReturn(false); // non-persistent store
+    Map<String, StorageEngine> mockStoreEngines = ImmutableMap.of(store1Name, store1Engine);
+
+    String changelog1SystemName = "system1";
+    String changelog1StreamName = "store1Changelog";
+    SystemStream changelog1SystemStream = new SystemStream(changelog1SystemName, changelog1StreamName);
+    SystemStreamPartition changelog1SSP = new SystemStreamPartition(changelog1SystemStream, taskChangelogPartition);
+    // checkpointed changelog offset > newest offset (e.g. changelog topic got changed)
+    SystemStreamPartitionMetadata changelog1SSPMetadata = new SystemStreamPartitionMetadata("10", "20", "21");
+    Map<String, SystemStream> mockStoreChangelogs = ImmutableMap.of(store1Name, changelog1SystemStream);
+
+    String changelog1CheckpointedOffset = "5";
+    CheckpointId checkpointId = CheckpointId.fromString("0-0"); // checkpoint id older than default min.compaction.lag.ms
+    CheckpointedChangelogOffset changelog1CheckpointMessage =
+        new CheckpointedChangelogOffset(checkpointId, changelog1CheckpointedOffset);
+    Map<SystemStreamPartition, String> mockCheckpointedChangelogOffset =
+        new HashMap<SystemStreamPartition, String>() { {
+          put(changelog1SSP, changelog1CheckpointMessage.toString());
         } };
     Map<SystemStreamPartition, SystemStreamPartitionMetadata> mockCurrentChangelogOffsets =
         ImmutableMap.of(changelog1SSP, changelog1SSPMetadata);
@@ -375,9 +451,11 @@ public class TestTransactionalStateTaskRestoreManager {
     Map<String, SystemStream> mockStoreChangelogs = ImmutableMap.of(store1Name, changelog1SystemStream);
 
     String changelog1CheckpointedOffset = null;
+    CheckpointedChangelogOffset changelog1CheckpointMessage =
+        new CheckpointedChangelogOffset(CheckpointId.create(), changelog1CheckpointedOffset);
     Map<SystemStreamPartition, String> mockCheckpointedChangelogOffset =
         new HashMap<SystemStreamPartition, String>() { {
-          put(changelog1SSP, changelog1CheckpointedOffset);
+          put(changelog1SSP, changelog1CheckpointMessage.toString());
         } };
     Map<SystemStreamPartition, SystemStreamPartitionMetadata> mockCurrentChangelogOffsets =
         ImmutableMap.of(changelog1SSP, changelog1SSPMetadata);
@@ -389,7 +467,7 @@ public class TestTransactionalStateTaskRestoreManager {
     File mockLoggedStoreBaseDir = mock(File.class);
     File mockNonLoggedStoreBaseDir = mock(File.class);
     HashMap<String, String> configMap = new HashMap<>();
-    configMap.put(TaskConfig.TRANSACTIONAL_STATE_RETAIN_EXISTING_CHANGELOG_STATE, "false");
+    configMap.put(TaskConfig.TRANSACTIONAL_STATE_RETAIN_EXISTING_STATE, "false");
     Config mockConfig = new MapConfig(configMap);
     Clock mockClock = mock(Clock.class);
 
@@ -437,9 +515,11 @@ public class TestTransactionalStateTaskRestoreManager {
     Map<String, SystemStream> mockStoreChangelogs = ImmutableMap.of(store1Name, changelog1SystemStream);
 
     String changelog1CheckpointedOffset = null;
+    CheckpointedChangelogOffset changelog1CheckpointMessage =
+        new CheckpointedChangelogOffset(CheckpointId.create(), changelog1CheckpointedOffset);
     Map<SystemStreamPartition, String> mockCheckpointedChangelogOffset =
         new HashMap<SystemStreamPartition, String>() { {
-          put(changelog1SSP, changelog1CheckpointedOffset);
+          put(changelog1SSP, changelog1CheckpointMessage.toString());
         } };
     Map<SystemStreamPartition, SystemStreamPartitionMetadata> mockCurrentChangelogOffsets =
         ImmutableMap.of(changelog1SSP, changelog1SSPMetadata);
@@ -451,7 +531,7 @@ public class TestTransactionalStateTaskRestoreManager {
     File mockLoggedStoreBaseDir = mock(File.class);
     File mockNonLoggedStoreBaseDir = mock(File.class);
     HashMap<String, String> configMap = new HashMap<>();
-    configMap.put(TaskConfig.TRANSACTIONAL_STATE_RETAIN_EXISTING_CHANGELOG_STATE, "true");
+    configMap.put(TaskConfig.TRANSACTIONAL_STATE_RETAIN_EXISTING_STATE, "true");
     Config mockConfig = new MapConfig(configMap);
     Clock mockClock = mock(Clock.class);
 
@@ -499,8 +579,10 @@ public class TestTransactionalStateTaskRestoreManager {
     Map<String, SystemStream> mockStoreChangelogs = ImmutableMap.of(store1Name, changelog1SystemStream);
 
     String changelog1CheckpointedOffset = "5";
+    CheckpointedChangelogOffset changelog1CheckpointMessage =
+        new CheckpointedChangelogOffset(CheckpointId.create(), changelog1CheckpointedOffset);
     ImmutableMap<SystemStreamPartition, String> mockCheckpointedChangelogOffset =
-        ImmutableMap.of(changelog1SSP, changelog1CheckpointedOffset);
+        ImmutableMap.of(changelog1SSP, changelog1CheckpointMessage.toString());
     Map<SystemStreamPartition, SystemStreamPartitionMetadata> mockCurrentChangelogOffsets =
         ImmutableMap.of(changelog1SSP, changelog1SSPMetadata);
 
@@ -562,8 +644,10 @@ public class TestTransactionalStateTaskRestoreManager {
     Map<String, SystemStream> mockStoreChangelogs = ImmutableMap.of(store1Name, changelog1SystemStream);
 
     String changelog1CheckpointedOffset = "5";
+    CheckpointedChangelogOffset changelog1CheckpointMessage =
+        new CheckpointedChangelogOffset(CheckpointId.create(), changelog1CheckpointedOffset);
     ImmutableMap<SystemStreamPartition, String> mockCheckpointedChangelogOffset =
-        ImmutableMap.of(changelog1SSP, changelog1CheckpointedOffset);
+        ImmutableMap.of(changelog1SSP, changelog1CheckpointMessage.toString());
     Map<SystemStreamPartition, SystemStreamPartitionMetadata> mockCurrentChangelogOffsets =
         ImmutableMap.of(changelog1SSP, changelog1SSPMetadata);
 
@@ -643,8 +727,10 @@ public class TestTransactionalStateTaskRestoreManager {
     Map<String, SystemStream> mockStoreChangelogs = ImmutableMap.of(store1Name, changelog1SystemStream);
 
     String changelog1CheckpointedOffset = "5";
+    CheckpointedChangelogOffset changelog1CheckpointMessage =
+        new CheckpointedChangelogOffset(CheckpointId.create(), changelog1CheckpointedOffset);
     ImmutableMap<SystemStreamPartition, String> mockCheckpointedChangelogOffset =
-        ImmutableMap.of(changelog1SSP, changelog1CheckpointedOffset);
+        ImmutableMap.of(changelog1SSP, changelog1CheckpointMessage.toString());
     Map<SystemStreamPartition, SystemStreamPartitionMetadata> mockCurrentChangelogOffsets =
         ImmutableMap.of(changelog1SSP, changelog1SSPMetadata);
 
@@ -727,8 +813,10 @@ public class TestTransactionalStateTaskRestoreManager {
     Map<String, SystemStream> mockStoreChangelogs = ImmutableMap.of(store1Name, changelog1SystemStream);
 
     String changelog1CheckpointedOffset = "5";
+    CheckpointedChangelogOffset changelog1CheckpointMessage =
+        new CheckpointedChangelogOffset(CheckpointId.create(), changelog1CheckpointedOffset);
     ImmutableMap<SystemStreamPartition, String> mockCheckpointedChangelogOffset =
-        ImmutableMap.of(changelog1SSP, changelog1CheckpointedOffset);
+        ImmutableMap.of(changelog1SSP, changelog1CheckpointMessage.toString());
     Map<SystemStreamPartition, SystemStreamPartitionMetadata> mockCurrentChangelogOffsets =
         ImmutableMap.of(changelog1SSP, changelog1SSPMetadata);
 
@@ -810,8 +898,10 @@ public class TestTransactionalStateTaskRestoreManager {
     Map<String, SystemStream> mockStoreChangelogs = ImmutableMap.of(store1Name, changelog1SystemStream);
 
     String changelog1CheckpointedOffset = "5";
+    CheckpointedChangelogOffset changelog1CheckpointMessage =
+        new CheckpointedChangelogOffset(CheckpointId.create(), changelog1CheckpointedOffset);
     ImmutableMap<SystemStreamPartition, String> mockCheckpointedChangelogOffset =
-        ImmutableMap.of(changelog1SSP, changelog1CheckpointedOffset);
+        ImmutableMap.of(changelog1SSP, changelog1CheckpointMessage.toString());
     Map<SystemStreamPartition, SystemStreamPartitionMetadata> mockCurrentChangelogOffsets =
         ImmutableMap.of(changelog1SSP, changelog1SSPMetadata);
 
@@ -893,8 +983,10 @@ public class TestTransactionalStateTaskRestoreManager {
     Map<String, SystemStream> mockStoreChangelogs = ImmutableMap.of(store1Name, changelog1SystemStream);
 
     String changelog1CheckpointedOffset = "5";
+    CheckpointedChangelogOffset changelog1CheckpointMessage =
+        new CheckpointedChangelogOffset(CheckpointId.create(), changelog1CheckpointedOffset);
     ImmutableMap<SystemStreamPartition, String> mockCheckpointedChangelogOffset =
-        ImmutableMap.of(changelog1SSP, changelog1CheckpointedOffset);
+        ImmutableMap.of(changelog1SSP, changelog1CheckpointMessage.toString());
     Map<SystemStreamPartition, SystemStreamPartitionMetadata> mockCurrentChangelogOffsets =
         ImmutableMap.of(changelog1SSP, changelog1SSPMetadata);
 
@@ -979,9 +1071,11 @@ public class TestTransactionalStateTaskRestoreManager {
     Map<String, SystemStream> mockStoreChangelogs = ImmutableMap.of(store1Name, changelog1SystemStream);
 
     String changelog1CheckpointedOffset = null;
+    CheckpointedChangelogOffset changelog1CheckpointMessage =
+        new CheckpointedChangelogOffset(CheckpointId.create(), changelog1CheckpointedOffset);
     Map<SystemStreamPartition, String> mockCheckpointedChangelogOffset =
         new HashMap<SystemStreamPartition, String>() { {
-          put(changelog1SSP, changelog1CheckpointedOffset);
+          put(changelog1SSP, changelog1CheckpointMessage.toString());
         } };
     Map<SystemStreamPartition, SystemStreamPartitionMetadata> mockCurrentChangelogOffsets =
         ImmutableMap.of(changelog1SSP, changelog1SSPMetadata);
@@ -993,7 +1087,7 @@ public class TestTransactionalStateTaskRestoreManager {
     File mockLoggedStoreBaseDir = mock(File.class);
     File mockNonLoggedStoreBaseDir = mock(File.class);
     HashMap<String, String> configMap = new HashMap<>();
-    configMap.put(TaskConfig.TRANSACTIONAL_STATE_RETAIN_EXISTING_CHANGELOG_STATE, "true");
+    configMap.put(TaskConfig.TRANSACTIONAL_STATE_RETAIN_EXISTING_STATE, "true");
     Config mockConfig = new MapConfig(configMap);
     Clock mockClock = mock(Clock.class);
 
@@ -1072,9 +1166,11 @@ public class TestTransactionalStateTaskRestoreManager {
     Map<String, SystemStream> mockStoreChangelogs = ImmutableMap.of(store1Name, changelog1SystemStream);
 
     String changelog1CheckpointedOffset = null;
+    CheckpointedChangelogOffset changelog1CheckpointMessage =
+        new CheckpointedChangelogOffset(CheckpointId.create(), changelog1CheckpointedOffset);
     Map<SystemStreamPartition, String> mockCheckpointedChangelogOffset =
         new HashMap<SystemStreamPartition, String>() { {
-          put(changelog1SSP, changelog1CheckpointedOffset);
+          put(changelog1SSP, changelog1CheckpointMessage.toString());
         } };
     Map<SystemStreamPartition, SystemStreamPartitionMetadata> mockCurrentChangelogOffsets =
         ImmutableMap.of(changelog1SSP, changelog1SSPMetadata);
@@ -1086,7 +1182,7 @@ public class TestTransactionalStateTaskRestoreManager {
     File mockLoggedStoreBaseDir = mock(File.class);
     File mockNonLoggedStoreBaseDir = mock(File.class);
     HashMap<String, String> configMap = new HashMap<>();
-    configMap.put(TaskConfig.TRANSACTIONAL_STATE_RETAIN_EXISTING_CHANGELOG_STATE, "false");
+    configMap.put(TaskConfig.TRANSACTIONAL_STATE_RETAIN_EXISTING_STATE, "false");
     Config mockConfig = new MapConfig(configMap);
     Clock mockClock = mock(Clock.class);
 
@@ -1163,9 +1259,11 @@ public class TestTransactionalStateTaskRestoreManager {
     Map<String, SystemStream> mockStoreChangelogs = ImmutableMap.of(store1Name, changelog1SystemStream);
 
     String changelog1CheckpointedOffset = "5";
+    CheckpointedChangelogOffset changelog1CheckpointMessage =
+        new CheckpointedChangelogOffset(CheckpointId.create(), changelog1CheckpointedOffset);
     Map<SystemStreamPartition, String> mockCheckpointedChangelogOffset =
         new HashMap<SystemStreamPartition, String>() { {
-          put(changelog1SSP, changelog1CheckpointedOffset);
+          put(changelog1SSP, changelog1CheckpointMessage.toString());
         } };
     Map<SystemStreamPartition, SystemStreamPartitionMetadata> mockCurrentChangelogOffsets =
         ImmutableMap.of(changelog1SSP, changelog1SSPMetadata);
@@ -1177,7 +1275,7 @@ public class TestTransactionalStateTaskRestoreManager {
     File mockLoggedStoreBaseDir = mock(File.class);
     File mockNonLoggedStoreBaseDir = mock(File.class);
     HashMap<String, String> configMap = new HashMap<>();
-    configMap.put(TaskConfig.TRANSACTIONAL_STATE_RETAIN_EXISTING_CHANGELOG_STATE, "true"); // should not matter
+    configMap.put(TaskConfig.TRANSACTIONAL_STATE_RETAIN_EXISTING_STATE, "true"); // should not matter
     Config mockConfig = new MapConfig(configMap);
     Clock mockClock = mock(Clock.class);
 
@@ -1250,9 +1348,11 @@ public class TestTransactionalStateTaskRestoreManager {
     Map<String, SystemStream> mockStoreChangelogs = ImmutableMap.of(store1Name, changelog1SystemStream);
 
     String changelog1CheckpointedOffset = null;
+    CheckpointedChangelogOffset changelog1CheckpointMessage =
+        new CheckpointedChangelogOffset(CheckpointId.create(), changelog1CheckpointedOffset);
     Map<SystemStreamPartition, String> mockCheckpointedChangelogOffset =
         new HashMap<SystemStreamPartition, String>() { {
-          put(changelog1SSP, changelog1CheckpointedOffset);
+          put(changelog1SSP, changelog1CheckpointMessage.toString());
         } };
     Map<SystemStreamPartition, SystemStreamPartitionMetadata> mockCurrentChangelogOffsets =
         ImmutableMap.of(changelog1SSP, changelog1SSPMetadata);
@@ -1264,7 +1364,7 @@ public class TestTransactionalStateTaskRestoreManager {
     File mockLoggedStoreBaseDir = mock(File.class);
     File mockNonLoggedStoreBaseDir = mock(File.class);
     HashMap<String, String> configMap = new HashMap<>();
-    configMap.put(TaskConfig.TRANSACTIONAL_STATE_RETAIN_EXISTING_CHANGELOG_STATE, "true"); // should not matter
+    configMap.put(TaskConfig.TRANSACTIONAL_STATE_RETAIN_EXISTING_STATE, "true"); // should not matter
     Config mockConfig = new MapConfig(configMap);
     Clock mockClock = mock(Clock.class);
 
@@ -1343,9 +1443,11 @@ public class TestTransactionalStateTaskRestoreManager {
     Map<String, SystemStream> mockStoreChangelogs = ImmutableMap.of(store1Name, changelog1SystemStream);
 
     String changelog1CheckpointedOffset = "5";
+    CheckpointedChangelogOffset changelog1CheckpointMessage =
+        new CheckpointedChangelogOffset(CheckpointId.create(), changelog1CheckpointedOffset);
     Map<SystemStreamPartition, String> mockCheckpointedChangelogOffset =
         new HashMap<SystemStreamPartition, String>() { {
-          put(changelog1SSP, changelog1CheckpointedOffset);
+          put(changelog1SSP, changelog1CheckpointMessage.toString());
         } };
     Map<SystemStreamPartition, SystemStreamPartitionMetadata> mockCurrentChangelogOffsets =
         ImmutableMap.of(changelog1SSP, changelog1SSPMetadata);
@@ -1403,6 +1505,97 @@ public class TestTransactionalStateTaskRestoreManager {
   }
 
   /**
+   * This can happen if the changelog offset is valid but the checkpoint is older than min compaction lag ms. E.g., when
+   * the job/container shut down and restarted after a long time.
+   */
+  @Test
+  public void testGetStoreActionsForLoggedPersistentStore_RestoreFromLocalToNewestIfCheckpointedOffsetInRangeButMaybeCompacted() {
+    TaskModel mockTaskModel = mock(TaskModel.class);
+    TaskName taskName = new TaskName("Partition 0");
+    when(mockTaskModel.getTaskName()).thenReturn(taskName);
+    Partition taskChangelogPartition = new Partition(0);
+    when(mockTaskModel.getChangelogPartition()).thenReturn(taskChangelogPartition);
+
+    String store1Name = "store1";
+    StorageEngine store1Engine = mock(StorageEngine.class);
+    StoreProperties mockStore1Properties = mock(StoreProperties.class);
+    when(store1Engine.getStoreProperties()).thenReturn(mockStore1Properties);
+    when(mockStore1Properties.isLoggedStore()).thenReturn(true);
+    when(mockStore1Properties.isPersistedToDisk()).thenReturn(true);
+    Map<String, StorageEngine> mockStoreEngines = ImmutableMap.of(store1Name, store1Engine);
+
+    String changelog1SystemName = "system1";
+    String changelog1StreamName = "store1Changelog";
+    SystemStream changelog1SystemStream = new SystemStream(changelog1SystemName, changelog1StreamName);
+    SystemStreamPartition changelog1SSP = new SystemStreamPartition(changelog1SystemStream, taskChangelogPartition);
+    // checkpointed changelog offset is valid
+    SystemStreamPartitionMetadata changelog1SSPMetadata = new SystemStreamPartitionMetadata("4", "20", "21");
+    Map<String, SystemStream> mockStoreChangelogs = ImmutableMap.of(store1Name, changelog1SystemStream);
+
+    String changelog1CheckpointedOffset = "5";
+    CheckpointId checkpointId = CheckpointId.fromString("0-0"); // checkpoint timestamp older than default min compaction lag
+    CheckpointedChangelogOffset changelog1CheckpointMessage =
+        new CheckpointedChangelogOffset(checkpointId, changelog1CheckpointedOffset);
+    Map<SystemStreamPartition, String> mockCheckpointedChangelogOffset =
+        new HashMap<SystemStreamPartition, String>() { {
+          put(changelog1SSP, changelog1CheckpointMessage.toString());
+        } };
+    Map<SystemStreamPartition, SystemStreamPartitionMetadata> mockCurrentChangelogOffsets =
+        ImmutableMap.of(changelog1SSP, changelog1SSPMetadata);
+
+    SystemAdmins mockSystemAdmins = mock(SystemAdmins.class);
+    SystemAdmin mockSystemAdmin = mock(SystemAdmin.class);
+    when(mockSystemAdmins.getSystemAdmin(changelog1SSP.getSystem())).thenReturn(mockSystemAdmin);
+    StorageManagerUtil mockStorageManagerUtil = mock(StorageManagerUtil.class);
+    File mockLoggedStoreBaseDir = mock(File.class);
+    File mockNonLoggedStoreBaseDir = mock(File.class);
+    Config mockConfig = mock(Config.class);
+    Clock mockClock = mock(Clock.class);
+
+    File mockCurrentStoreDir = mock(File.class);
+    File mockStoreNewerCheckpointDir = mock(File.class);
+    File mockStoreOlderCheckpointDir = mock(File.class);
+    String olderCheckpointDirLocalOffset = "3";
+    String newerCheckpointDirLocalOffset = "5";
+    when(mockStorageManagerUtil.getTaskStoreDir(eq(mockLoggedStoreBaseDir), eq(store1Name), eq(taskName), any()))
+        .thenReturn(mockCurrentStoreDir);
+    when(mockStorageManagerUtil.getTaskStoreCheckpointDirs(eq(mockLoggedStoreBaseDir), eq(store1Name), eq(taskName), any()))
+        .thenReturn(ImmutableList.of(mockStoreNewerCheckpointDir, mockStoreOlderCheckpointDir));
+    when(mockStorageManagerUtil.isLoggedStoreValid(eq(store1Name), eq(mockStoreNewerCheckpointDir), any(),
+        eq(mockStoreChangelogs), eq(mockTaskModel), any(), eq(mockStoreEngines))).thenReturn(true);
+    when(mockStorageManagerUtil.isLoggedStoreValid(eq(store1Name), eq(mockStoreOlderCheckpointDir), any(),
+        eq(mockStoreChangelogs), eq(mockTaskModel), any(), eq(mockStoreEngines))).thenReturn(true);
+    Set<SystemStreamPartition> mockChangelogSSPs = ImmutableSet.of(changelog1SSP);
+    when(mockStorageManagerUtil.readOffsetFile(eq(mockStoreNewerCheckpointDir), eq(mockChangelogSSPs), eq(false)))
+        .thenReturn(ImmutableMap.of(changelog1SSP, newerCheckpointDirLocalOffset));
+    when(mockStorageManagerUtil.readOffsetFile(eq(mockStoreOlderCheckpointDir), eq(mockChangelogSSPs), eq(false)))
+        .thenReturn(ImmutableMap.of(changelog1SSP, olderCheckpointDirLocalOffset)); // less than checkpointed offset (5)
+
+    Mockito.when(mockSystemAdmin.offsetComparator(anyString(), anyString()))
+        .thenAnswer((Answer<Integer>) invocation -> {
+            String offset1 = (String) invocation.getArguments()[0];
+            String offset2 = (String) invocation.getArguments()[1];
+            return Long.valueOf(offset1).compareTo(Long.valueOf(offset2));
+          });
+
+    StoreActions storeActions = TransactionalStateTaskRestoreManager.getStoreActions(
+        mockTaskModel, mockStoreEngines, mockStoreChangelogs, mockCheckpointedChangelogOffset,
+        mockCurrentChangelogOffsets, mockSystemAdmins, mockStorageManagerUtil,
+        mockLoggedStoreBaseDir, mockNonLoggedStoreBaseDir, mockConfig, mockClock);
+
+    // ensure that the current store dir and older checkpoint dir are marked for deletion
+    assertEquals(2, storeActions.storeDirsToDelete.get(store1Name).size());
+    assertTrue(storeActions.storeDirsToDelete.get(store1Name).contains(mockCurrentStoreDir));
+    assertTrue(storeActions.storeDirsToDelete.get(store1Name).contains(mockStoreOlderCheckpointDir));
+    // ensure that newer checkpoint dir is retained
+    assertEquals(1, storeActions.storeDirsToRetain.size());
+    assertEquals(mockStoreNewerCheckpointDir, storeActions.storeDirsToRetain.get(store1Name));
+    // ensure that we mark the store for restore to head (from local checkpoint to current newest)
+    assertEquals("5", storeActions.storesToRestore.get(store1Name).startingOffset);
+    assertEquals("20", storeActions.storesToRestore.get(store1Name).endingOffset);
+  }
+
+  /**
    * This can happen if the changelog topic was manually deleted and recreated, and the checkpointed/local changelog
    * offset is not valid anymore.
    */
@@ -1431,9 +1624,11 @@ public class TestTransactionalStateTaskRestoreManager {
     Map<String, SystemStream> mockStoreChangelogs = ImmutableMap.of(store1Name, changelog1SystemStream);
 
     String changelog1CheckpointedOffset = "21";
+    CheckpointedChangelogOffset changelog1CheckpointMessage =
+        new CheckpointedChangelogOffset(CheckpointId.create(), changelog1CheckpointedOffset);
     Map<SystemStreamPartition, String> mockCheckpointedChangelogOffset =
         new HashMap<SystemStreamPartition, String>() { {
-          put(changelog1SSP, changelog1CheckpointedOffset);
+          put(changelog1SSP, changelog1CheckpointMessage.toString());
         } };
     Map<SystemStreamPartition, SystemStreamPartitionMetadata> mockCurrentChangelogOffsets =
         ImmutableMap.of(changelog1SSP, changelog1SSPMetadata);
diff --git a/samza-core/src/test/scala/org/apache/samza/container/TestTaskInstance.scala b/samza-core/src/test/scala/org/apache/samza/container/TestTaskInstance.scala
index 85cccf8..a54ae72 100644
--- a/samza-core/src/test/scala/org/apache/samza/container/TestTaskInstance.scala
+++ b/samza-core/src/test/scala/org/apache/samza/container/TestTaskInstance.scala
@@ -22,7 +22,7 @@ package org.apache.samza.container
 import java.util.Collections
 
 import org.apache.samza.{Partition, SamzaException}
-import org.apache.samza.checkpoint.{Checkpoint, OffsetManager}
+import org.apache.samza.checkpoint.{Checkpoint, CheckpointedChangelogOffset, OffsetManager}
 import org.apache.samza.config.MapConfig
 import org.apache.samza.context.{TaskContext => _, _}
 import org.apache.samza.job.model.TaskModel
@@ -216,10 +216,9 @@ class TestTaskInstance extends AssertionsForJUnit with MockitoSugar {
     val inputOffsets = new Checkpoint(Map(SYSTEM_STREAM_PARTITION -> "4").asJava)
     val changelogSSP = new SystemStreamPartition(new SystemStream(SYSTEM_NAME, "test-changelog-stream"), new Partition(0))
     val changelogOffsets = Map(changelogSSP -> Some("5"))
-    val checkpointId = "1234"
     when(this.offsetManager.buildCheckpoint(TASK_NAME)).thenReturn(inputOffsets)
     when(this.taskStorageManager.flush()).thenReturn(changelogOffsets)
-    when(this.taskStorageManager.checkpoint(any[Map[SystemStreamPartition, Option[String]]])).thenReturn(checkpointId)
+    doNothing().when(this.taskStorageManager).checkpoint(any(), any[Map[SystemStreamPartition, Option[String]]])
     taskInstance.commit
 
     val mockOrder = inOrder(this.offsetManager, this.collector, this.taskTableManager, this.taskStorageManager)
@@ -238,7 +237,7 @@ class TestTaskInstance extends AssertionsForJUnit with MockitoSugar {
     mockOrder.verify(this.taskStorageManager).flush()
 
     // Stores checkpoints should be created next with the newest changelog offsets
-    mockOrder.verify(this.taskStorageManager).checkpoint(changelogOffsets)
+    mockOrder.verify(this.taskStorageManager).checkpoint(any(), Matchers.eq(changelogOffsets))
 
     // Input checkpoint should be written with the snapshot captured at the beginning of commit and the
     // newest changelog offset captured during storage manager flush
@@ -246,10 +245,10 @@ class TestTaskInstance extends AssertionsForJUnit with MockitoSugar {
     mockOrder.verify(offsetManager).writeCheckpoint(any(), captor.capture)
     val cp = captor.getValue
     assertEquals("4", cp.getOffsets.get(SYSTEM_STREAM_PARTITION))
-    assertEquals("5", cp.getOffsets.get(changelogSSP))
+    assertEquals("5", CheckpointedChangelogOffset.fromString(cp.getOffsets.get(changelogSSP)).getOffset)
 
     // Old checkpointed stores should be cleared
-    mockOrder.verify(this.taskStorageManager).removeOldCheckpoints(checkpointId)
+    mockOrder.verify(this.taskStorageManager).removeOldCheckpoints(any())
     verify(commitsCounter).inc()
   }
 
@@ -269,7 +268,10 @@ class TestTaskInstance extends AssertionsForJUnit with MockitoSugar {
     verify(offsetManager).writeCheckpoint(any(), captor.capture)
     val cp = captor.getValue
     assertEquals("4", cp.getOffsets.get(SYSTEM_STREAM_PARTITION))
-    assertEquals(null, cp.getOffsets.get(changelogSSP))
+    val message = cp.getOffsets.get(changelogSSP)
+    val checkpointedOffset = CheckpointedChangelogOffset.fromString(message)
+    assertNull(checkpointedOffset.getOffset)
+    assertNotNull(checkpointedOffset.getCheckpointId)
     verify(commitsCounter).inc()
   }
 
@@ -320,7 +322,7 @@ class TestTaskInstance extends AssertionsForJUnit with MockitoSugar {
     val inputOffsets = new Checkpoint(Map(SYSTEM_STREAM_PARTITION -> "4").asJava)
     when(this.offsetManager.buildCheckpoint(TASK_NAME)).thenReturn(inputOffsets)
     when(this.taskStorageManager.flush()).thenReturn(Map[SystemStreamPartition, Option[String]]())
-    when(this.taskStorageManager.checkpoint(any())).thenThrow(new SamzaException("Error creating store checkpoint"))
+    when(this.taskStorageManager.checkpoint(any(), any())).thenThrow(new SamzaException("Error creating store checkpoint"))
 
     try {
       taskInstance.commit
@@ -341,8 +343,8 @@ class TestTaskInstance extends AssertionsForJUnit with MockitoSugar {
     val inputOffsets = new Checkpoint(Map(SYSTEM_STREAM_PARTITION -> "4").asJava)
     when(this.offsetManager.buildCheckpoint(TASK_NAME)).thenReturn(inputOffsets)
     when(this.taskStorageManager.flush()).thenReturn(Map[SystemStreamPartition, Option[String]]())
-    when(this.taskStorageManager.checkpoint(any())).thenReturn("id")
-    when(this.taskStorageManager.removeOldCheckpoints("id"))
+    doNothing().when(this.taskStorageManager).checkpoint(any(), any())
+    when(this.taskStorageManager.removeOldCheckpoints(any()))
       .thenThrow(new SamzaException("Error clearing old checkpoints"))
 
     try {
diff --git a/samza-core/src/test/scala/org/apache/samza/storage/TestContainerStorageManager.java b/samza-core/src/test/scala/org/apache/samza/storage/TestContainerStorageManager.java
index 8f0fbb3..43872e9 100644
--- a/samza-core/src/test/scala/org/apache/samza/storage/TestContainerStorageManager.java
+++ b/samza-core/src/test/scala/org/apache/samza/storage/TestContainerStorageManager.java
@@ -171,7 +171,7 @@ public class TestContainerStorageManager {
     configMap.put("stores." + STORE_NAME + ".key.serde", "stringserde");
     configMap.put("stores." + STORE_NAME + ".msg.serde", "stringserde");
     configMap.put("serializers.registry.stringserde.class", StringSerdeFactory.class.getName());
-    configMap.put(TaskConfig.TRANSACTIONAL_STATE_RETAIN_EXISTING_CHANGELOG_STATE, "true");
+    configMap.put(TaskConfig.TRANSACTIONAL_STATE_RETAIN_EXISTING_STATE, "true");
     Config config = new MapConfig(configMap);
 
     Map<String, Serde<Object>> serdes = new HashMap<>();
diff --git a/samza-core/src/test/scala/org/apache/samza/storage/TestTransactionalStateTaskStorageManager.java b/samza-core/src/test/scala/org/apache/samza/storage/TestTransactionalStateTaskStorageManager.java
index 69a2379..f2d4972 100644
--- a/samza-core/src/test/scala/org/apache/samza/storage/TestTransactionalStateTaskStorageManager.java
+++ b/samza-core/src/test/scala/org/apache/samza/storage/TestTransactionalStateTaskStorageManager.java
@@ -32,6 +32,7 @@ import java.util.HashMap;
 import java.util.Optional;
 import org.apache.samza.Partition;
 import org.apache.samza.SamzaException;
+import org.apache.samza.checkpoint.CheckpointId;
 import org.apache.samza.container.TaskName;
 import org.apache.samza.job.model.TaskMode;
 import org.apache.samza.system.SystemAdmin;
@@ -49,7 +50,6 @@ import static org.junit.Assert.assertFalse;
 import static org.junit.Assert.fail;
 import static org.mockito.Matchers.any;
 import static org.mockito.Matchers.anyBoolean;
-import static org.mockito.Matchers.anyString;
 import static org.mockito.Matchers.eq;
 import static org.mockito.Mockito.doNothing;
 import static org.mockito.Mockito.doReturn;
@@ -272,7 +272,7 @@ public class TestTransactionalStateTaskStorageManager {
     when(lpStoreProps.isPersistedToDisk()).thenReturn(true);
     when(lpStoreProps.isLoggedStore()).thenReturn(true);
     Path mockPath = mock(Path.class);
-    when(mockLPStore.checkpoint(anyString())).thenReturn(Optional.of(mockPath));
+    when(mockLPStore.checkpoint(any())).thenReturn(Optional.of(mockPath));
 
     StorageEngine mockPStore = mock(StorageEngine.class);
     StoreProperties pStoreProps = mock(StoreProperties.class);
@@ -309,14 +309,14 @@ public class TestTransactionalStateTaskStorageManager {
         ImmutableMap.of(mock(SystemStreamPartition.class), Option.apply("1")));
 
     // invoke checkpoint
-    tsm.checkpoint(offsets);
+    tsm.checkpoint(CheckpointId.create(), offsets);
 
     // ensure that checkpoint is never called for non-logged persistent stores since they're
     // always cleared on restart.
-    verify(mockPStore, never()).checkpoint(anyString());
+    verify(mockPStore, never()).checkpoint(any());
     // ensure that checkpoint is never called for in-memory stores since they're not persistent.
-    verify(mockIStore, never()).checkpoint(anyString());
-    verify(mockLIStore, never()).checkpoint(anyString());
+    verify(mockIStore, never()).checkpoint(any());
+    verify(mockLIStore, never()).checkpoint(any());
     verify(tsm).writeChangelogOffsetFiles(checkpointPathsCaptor.capture(), any(), eq(offsets));
     Map<String, Path> checkpointPaths = checkpointPathsCaptor.getValue();
     assertEquals(1, checkpointPaths.size());
@@ -332,7 +332,7 @@ public class TestTransactionalStateTaskStorageManager {
     when(mockLPStore.getStoreProperties()).thenReturn(lpStoreProps);
     when(lpStoreProps.isPersistedToDisk()).thenReturn(true);
     when(lpStoreProps.isLoggedStore()).thenReturn(true);
-    when(mockLPStore.checkpoint(anyString())).thenThrow(new IllegalStateException());
+    when(mockLPStore.checkpoint(any())).thenThrow(new IllegalStateException());
     java.util.Map<String, StorageEngine> taskStores =
         ImmutableMap.of("loggedPersistentStore", mockLPStore);
     when(csm.getAllStores(any())).thenReturn(taskStores);
@@ -343,7 +343,7 @@ public class TestTransactionalStateTaskStorageManager {
         ImmutableMap.of(mock(SystemStreamPartition.class), Option.apply("1")));
 
     // invoke checkpoint
-    tsm.checkpoint(offsets);
+    tsm.checkpoint(CheckpointId.create(), offsets);
     verify(tsm, never()).writeChangelogOffsetFiles(any(), any(), any());
     fail("Should have thrown an exception if error creating store checkpoint");
   }
@@ -358,7 +358,7 @@ public class TestTransactionalStateTaskStorageManager {
     when(lpStoreProps.isPersistedToDisk()).thenReturn(true);
     when(lpStoreProps.isLoggedStore()).thenReturn(true);
     Path mockPath = mock(Path.class);
-    when(mockLPStore.checkpoint(anyString())).thenReturn(Optional.of(mockPath));
+    when(mockLPStore.checkpoint(any())).thenReturn(Optional.of(mockPath));
     java.util.Map<String, StorageEngine> taskStores =
         ImmutableMap.of("loggedPersistentStore", mockLPStore);
     when(csm.getAllStores(any())).thenReturn(taskStores);
@@ -371,7 +371,7 @@ public class TestTransactionalStateTaskStorageManager {
         ImmutableMap.of(mock(SystemStreamPartition.class), Option.apply("1")));
 
     // invoke checkpoint
-    tsm.checkpoint(offsets);
+    tsm.checkpoint(CheckpointId.create(), offsets);
 
     fail("Should have thrown an exception if error writing offset file.");
   }
diff --git a/samza-kv-inmemory/src/main/scala/org/apache/samza/storage/kv/inmemory/InMemoryKeyValueStore.scala b/samza-kv-inmemory/src/main/scala/org/apache/samza/storage/kv/inmemory/InMemoryKeyValueStore.scala
index d482c15..2d26d29 100644
--- a/samza-kv-inmemory/src/main/scala/org/apache/samza/storage/kv/inmemory/InMemoryKeyValueStore.scala
+++ b/samza-kv-inmemory/src/main/scala/org/apache/samza/storage/kv/inmemory/InMemoryKeyValueStore.scala
@@ -25,6 +25,8 @@ import java.nio.file.Path
 import java.util
 import java.util.Optional
 
+import org.apache.samza.checkpoint.CheckpointId
+
 /**
  * In memory implementation of a key value store.
  *
@@ -127,7 +129,7 @@ class InMemoryKeyValueStore(val metrics: KeyValueStoreMetrics = new KeyValueStor
     }
   }
 
-  override def checkpoint(id: String): Optional[Path] = {
+  override def checkpoint(id: CheckpointId): Optional[Path] = {
     // No checkpoint being persisted. State restores from Changelog.
     Optional.empty()
   }
diff --git a/samza-kv-rocksdb/src/main/scala/org/apache/samza/storage/kv/RocksDbKeyValueStore.scala b/samza-kv-rocksdb/src/main/scala/org/apache/samza/storage/kv/RocksDbKeyValueStore.scala
index a2ae8b0..300177a 100644
--- a/samza-kv-rocksdb/src/main/scala/org/apache/samza/storage/kv/RocksDbKeyValueStore.scala
+++ b/samza-kv-rocksdb/src/main/scala/org/apache/samza/storage/kv/RocksDbKeyValueStore.scala
@@ -21,15 +21,13 @@ package org.apache.samza.storage.kv
 
 import java.io.File
 import java.nio.file.{Path, Paths}
-import java.util
-import java.util.{Comparator, Optional}
 import java.util.concurrent.TimeUnit
 import java.util.concurrent.locks.ReentrantReadWriteLock
+import java.util.{Comparator, Optional}
 
-import org.apache.commons.io.FileUtils
-import org.apache.samza.{SamzaException, checkpoint}
+import org.apache.samza.SamzaException
+import org.apache.samza.checkpoint.CheckpointId
 import org.apache.samza.config.Config
-import org.apache.samza.serializers.CheckpointSerde
 import org.apache.samza.util.Logging
 import org.rocksdb.{TtlDB, _}
 
@@ -239,9 +237,9 @@ class RocksDbKeyValueStore(
     trace("Flushed store: %s" format storeName)
   }
 
-  override def checkpoint(id: String): Optional[Path] = {
+  override def checkpoint(id: CheckpointId): Optional[Path] = {
     val checkpoint = Checkpoint.create(db)
-    val checkpointPath = dir.getPath + "-" + id
+    val checkpointPath = dir.getPath + "-" + id.toString
     checkpoint.createCheckpoint(checkpointPath)
     Optional.of(Paths.get(checkpointPath))
   }
diff --git a/samza-kv/src/main/java/org/apache/samza/storage/kv/LargeMessageSafeStore.java b/samza-kv/src/main/java/org/apache/samza/storage/kv/LargeMessageSafeStore.java
index 7e514e7..177a986 100644
--- a/samza-kv/src/main/java/org/apache/samza/storage/kv/LargeMessageSafeStore.java
+++ b/samza-kv/src/main/java/org/apache/samza/storage/kv/LargeMessageSafeStore.java
@@ -22,6 +22,7 @@ import java.nio.file.Path;
 import java.util.ArrayList;
 import java.util.List;
 import java.util.Optional;
+import org.apache.samza.checkpoint.CheckpointId;
 import org.apache.samza.metrics.MetricsRegistryMap;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
@@ -126,7 +127,7 @@ public class LargeMessageSafeStore implements KeyValueStore<byte[], byte[]> {
   }
 
   @Override
-  public Optional<Path> checkpoint(String id) {
+  public Optional<Path> checkpoint(CheckpointId id) {
     return store.checkpoint(id);
   }
 
diff --git a/samza-kv/src/main/scala/org/apache/samza/storage/kv/AccessLoggedStore.scala b/samza-kv/src/main/scala/org/apache/samza/storage/kv/AccessLoggedStore.scala
index ace7aa5..8c32793 100644
--- a/samza-kv/src/main/scala/org/apache/samza/storage/kv/AccessLoggedStore.scala
+++ b/samza-kv/src/main/scala/org/apache/samza/storage/kv/AccessLoggedStore.scala
@@ -24,6 +24,7 @@ import java.nio.file.Path
 import java.util
 import java.util.Optional
 
+import org.apache.samza.checkpoint.CheckpointId
 import org.apache.samza.config.StorageConfig
 import org.apache.samza.task.MessageCollector
 import org.apache.samza.util.Logging
@@ -163,7 +164,7 @@ class AccessLoggedStore[K, V](
     bytes
   }
 
-  override def checkpoint(id: String): Optional[Path] = {
+  override def checkpoint(id: CheckpointId): Optional[Path] = {
     store.checkpoint(id)
   }
 }
diff --git a/samza-kv/src/main/scala/org/apache/samza/storage/kv/CachedStore.scala b/samza-kv/src/main/scala/org/apache/samza/storage/kv/CachedStore.scala
index 41d2d9f..5c1961c 100644
--- a/samza-kv/src/main/scala/org/apache/samza/storage/kv/CachedStore.scala
+++ b/samza-kv/src/main/scala/org/apache/samza/storage/kv/CachedStore.scala
@@ -20,10 +20,13 @@
 package org.apache.samza.storage.kv
 
 import org.apache.samza.util.Logging
+
 import scala.collection._
 import java.nio.file.Path
 import java.util.{Arrays, Optional}
 
+import org.apache.samza.checkpoint.CheckpointId
+
 /**
  * A write-behind caching layer around the rocksdb store. The purpose of this cache is three-fold:
  * 1. Batch together writes to rocksdb, this turns out to be a great optimization
@@ -293,7 +296,7 @@ class CachedStore[K, V](
     store.snapshot(from, to)
   }
 
-  override def checkpoint(id: String): Optional[Path] = {
+  override def checkpoint(id: CheckpointId): Optional[Path] = {
     store.checkpoint(id)
   }
 }
diff --git a/samza-kv/src/main/scala/org/apache/samza/storage/kv/KeyValueStorageEngine.scala b/samza-kv/src/main/scala/org/apache/samza/storage/kv/KeyValueStorageEngine.scala
index 61ff059..bc6778e 100644
--- a/samza-kv/src/main/scala/org/apache/samza/storage/kv/KeyValueStorageEngine.scala
+++ b/samza-kv/src/main/scala/org/apache/samza/storage/kv/KeyValueStorageEngine.scala
@@ -29,6 +29,8 @@ import org.apache.samza.util.TimerUtil
 import java.nio.file.Path
 import java.util.Optional
 
+import org.apache.samza.checkpoint.CheckpointId
+
 /**
  * A key value store.
  *
@@ -208,7 +210,7 @@ class KeyValueStorageEngine[K, V](
     }
   }
 
-  def checkpoint(id: String): Optional[Path] = {
+  def checkpoint(id: CheckpointId): Optional[Path] = {
     updateTimer(metrics.checkpointNs) {
       trace("Checkpointing.")
       metrics.checkpoints.inc
diff --git a/samza-kv/src/main/scala/org/apache/samza/storage/kv/LoggedStore.scala b/samza-kv/src/main/scala/org/apache/samza/storage/kv/LoggedStore.scala
index 4c238bb..320e801 100644
--- a/samza-kv/src/main/scala/org/apache/samza/storage/kv/LoggedStore.scala
+++ b/samza-kv/src/main/scala/org/apache/samza/storage/kv/LoggedStore.scala
@@ -22,6 +22,7 @@ package org.apache.samza.storage.kv
 import java.nio.file.Path
 import java.util.Optional
 
+import org.apache.samza.checkpoint.CheckpointId
 import org.apache.samza.util.Logging
 import org.apache.samza.system.{OutgoingMessageEnvelope, SystemStreamPartition}
 import org.apache.samza.task.MessageCollector
@@ -121,7 +122,7 @@ class LoggedStore[K, V](
     store.snapshot(from, to)
   }
 
-  override def checkpoint(id: String): Optional[Path] = {
+  override def checkpoint(id: CheckpointId): Optional[Path] = {
     store.checkpoint(id)
   }
 }
\ No newline at end of file
diff --git a/samza-kv/src/main/scala/org/apache/samza/storage/kv/NullSafeKeyValueStore.scala b/samza-kv/src/main/scala/org/apache/samza/storage/kv/NullSafeKeyValueStore.scala
index 3bc4674..8bb6fa2 100644
--- a/samza-kv/src/main/scala/org/apache/samza/storage/kv/NullSafeKeyValueStore.scala
+++ b/samza-kv/src/main/scala/org/apache/samza/storage/kv/NullSafeKeyValueStore.scala
@@ -22,6 +22,8 @@ package org.apache.samza.storage.kv
 import java.nio.file.Path
 import java.util.Optional
 
+import org.apache.samza.checkpoint.CheckpointId
+
 import scala.collection.JavaConverters._
 
 object NullSafeKeyValueStore {
@@ -99,7 +101,7 @@ class NullSafeKeyValueStore[K, V](store: KeyValueStore[K, V]) extends KeyValueSt
     store.snapshot(from, to)
   }
 
-  override def checkpoint(id: String): Optional[Path] = {
+  override def checkpoint(id: CheckpointId): Optional[Path] = {
     store.checkpoint(id)
   }
 }
diff --git a/samza-kv/src/main/scala/org/apache/samza/storage/kv/SerializedKeyValueStore.scala b/samza-kv/src/main/scala/org/apache/samza/storage/kv/SerializedKeyValueStore.scala
index 169452c..5b3456c 100644
--- a/samza-kv/src/main/scala/org/apache/samza/storage/kv/SerializedKeyValueStore.scala
+++ b/samza-kv/src/main/scala/org/apache/samza/storage/kv/SerializedKeyValueStore.scala
@@ -21,6 +21,8 @@ package org.apache.samza.storage.kv
 
 import java.nio.file.Path
 import java.util.Optional
+
+import org.apache.samza.checkpoint.CheckpointId
 import org.apache.samza.util.Logging
 import org.apache.samza.serializers._
 
@@ -166,7 +168,7 @@ class SerializedKeyValueStore[K, V](
     }
   }
 
-  override def checkpoint(id: String): Optional[Path] = {
+  override def checkpoint(id: CheckpointId): Optional[Path] = {
     store.checkpoint(id)
   }
 }
diff --git a/samza-kv/src/test/scala/org/apache/samza/storage/kv/MockKeyValueStore.scala b/samza-kv/src/test/scala/org/apache/samza/storage/kv/MockKeyValueStore.scala
index c20c2c5..c0fc080 100644
--- a/samza-kv/src/test/scala/org/apache/samza/storage/kv/MockKeyValueStore.scala
+++ b/samza-kv/src/test/scala/org/apache/samza/storage/kv/MockKeyValueStore.scala
@@ -24,6 +24,8 @@ import java.util
 import java.nio.file.Path
 import java.util.Optional
 
+import org.apache.samza.checkpoint.CheckpointId
+
 /**
  * A mock key-value store wrapper that handles serialization
  */
@@ -76,7 +78,7 @@ class MockKeyValueStore extends KeyValueStore[String, String] {
     throw new UnsupportedOperationException("iterator() not supported")
   }
 
-  override def checkpoint(id: String): Optional[Path] = {
+  override def checkpoint(id: CheckpointId): Optional[Path] = {
     Optional.empty()
   }
 }
\ No newline at end of file
diff --git a/samza-test/src/test/java/org/apache/samza/storage/kv/TransactionalStateIntegrationTest.java b/samza-test/src/test/java/org/apache/samza/storage/kv/TransactionalStateIntegrationTest.java
index 67a4de8..05e6737 100644
--- a/samza-test/src/test/java/org/apache/samza/storage/kv/TransactionalStateIntegrationTest.java
+++ b/samza-test/src/test/java/org/apache/samza/storage/kv/TransactionalStateIntegrationTest.java
@@ -89,7 +89,7 @@ public class TransactionalStateIntegrationTest extends StreamApplicationIntegrat
       put(TaskConfig.CHECKPOINT_MANAGER_FACTORY, "org.apache.samza.checkpoint.kafka.KafkaCheckpointManagerFactory");
       put(TaskConfig.COMMIT_MS, "-1"); // manual commit only
       put(TaskConfig.TRANSACTIONAL_STATE_RESTORE_ENABLED, "true");
-      put(TaskConfig.TRANSACTIONAL_STATE_RETAIN_EXISTING_CHANGELOG_STATE, "true");
+      put(TaskConfig.TRANSACTIONAL_STATE_RETAIN_EXISTING_STATE, "true");
       put(KafkaConfig.CHECKPOINT_REPLICATION_FACTOR(), "1");
       put(JobConfig.JOB_LOGGED_STORE_BASE_DIR, LOGGED_STORE_BASE_DIR);
     } };
diff --git a/samza-test/src/test/java/org/apache/samza/storage/kv/TransactionalStateMultiStoreIntegrationTest.java b/samza-test/src/test/java/org/apache/samza/storage/kv/TransactionalStateMultiStoreIntegrationTest.java
index 0c678b6..41eb1ab 100644
--- a/samza-test/src/test/java/org/apache/samza/storage/kv/TransactionalStateMultiStoreIntegrationTest.java
+++ b/samza-test/src/test/java/org/apache/samza/storage/kv/TransactionalStateMultiStoreIntegrationTest.java
@@ -88,7 +88,7 @@ public class TransactionalStateMultiStoreIntegrationTest extends StreamApplicati
       put(TaskConfig.CHECKPOINT_MANAGER_FACTORY, "org.apache.samza.checkpoint.kafka.KafkaCheckpointManagerFactory");
       put(TaskConfig.COMMIT_MS, "-1"); // manual commit only
       put(TaskConfig.TRANSACTIONAL_STATE_RESTORE_ENABLED, "true");
-      put(TaskConfig.TRANSACTIONAL_STATE_RETAIN_EXISTING_CHANGELOG_STATE, "true");
+      put(TaskConfig.TRANSACTIONAL_STATE_RETAIN_EXISTING_STATE, "true");
       put(KafkaConfig.CHECKPOINT_REPLICATION_FACTOR(), "1");
       put(JobConfig.JOB_LOGGED_STORE_BASE_DIR, LOGGED_STORE_BASE_DIR);
     } };