You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@samza.apache.org by pm...@apache.org on 2019/10/23 01:40:31 UTC

[samza] branch 1.3.0 updated (e2928e1 -> cb2707f)

This is an automated email from the ASF dual-hosted git repository.

pmaheshwari pushed a change to branch 1.3.0
in repository https://gitbox.apache.org/repos/asf/samza.git.


    from e2928e1  SAMZA-2354: Improve UDF discovery in samza-sql. (#1192)
     new 0de2385  SAMZA-2352: Use min.compaction.lag.ms to avoid compacting the Kafka changelog topic (#1190)
     new 1b1af6e  Moving writeMetadataFile() invocation to ContainerLaunchUtil from LocalContainerRunner (#1198)
     new cb2707f  SAMZA-2356: [Transactional State] Do not trim changelog if time since last checkpoint is greater than min.compaction.lag. (#1196)

The 3 revisions listed above as "new" are entirely new to this
repository and will be described in separate emails.  The revisions
listed as "add" were already present in the repository and have only
been added to this reference.


Summary of changes:
 .../org/apache/samza/checkpoint/CheckpointId.java  |  82 +++++++
 .../checkpoint/CheckpointedChangelogOffset.java    |  82 +++++++
 .../org/apache/samza/storage/StorageEngine.java    |   5 +-
 .../org/apache/samza/storage/kv/KeyValueStore.java |   5 +-
 .../org/apache/samza/config/StorageConfig.java     |  16 ++
 .../java/org/apache/samza/config/TaskConfig.java   |   8 +-
 .../operators/util/InternalInMemoryStore.java      |   3 +-
 .../apache/samza/runtime/ContainerLaunchUtil.java  |   1 +
 .../apache/samza/runtime/LocalContainerRunner.java |   3 -
 .../TransactionalStateTaskRestoreManager.java      |  62 ++++--
 .../org/apache/samza/container/TaskInstance.scala  |  23 +-
 .../NonTransactionalStateTaskStorageManager.scala  |   8 +-
 .../apache/samza/storage/TaskStorageManager.scala  |   5 +-
 .../TransactionalStateTaskStorageManager.scala     |  12 +-
 .../org/apache/samza/config/TestStorageConfig.java |  14 ++
 .../operators/impl/store/TestInMemoryStore.java    |   4 +-
 .../apache/samza/storage/MockStorageEngine.java    |   3 +-
 .../TestTransactionalStateTaskRestoreManager.java  | 241 +++++++++++++++++++--
 .../apache/samza/container/TestTaskInstance.scala  |  22 +-
 .../samza/storage/TestContainerStorageManager.java |   2 +-
 .../TestTransactionalStateTaskStorageManager.java  |  20 +-
 .../org/apache/samza/config/KafkaConfig.scala      |  12 +-
 .../org/apache/samza/config/TestKafkaConfig.scala  |  15 +-
 .../kv/inmemory/InMemoryKeyValueStore.scala        |   4 +-
 .../samza/storage/kv/RocksDbKeyValueStore.scala    |  12 +-
 .../samza/storage/kv/LargeMessageSafeStore.java    |   3 +-
 .../samza/storage/kv/AccessLoggedStore.scala       |   3 +-
 .../org/apache/samza/storage/kv/CachedStore.scala  |   5 +-
 .../samza/storage/kv/KeyValueStorageEngine.scala   |   4 +-
 .../org/apache/samza/storage/kv/LoggedStore.scala  |   3 +-
 .../samza/storage/kv/NullSafeKeyValueStore.scala   |   4 +-
 .../samza/storage/kv/SerializedKeyValueStore.scala |   4 +-
 .../samza/storage/kv/MockKeyValueStore.scala       |   4 +-
 .../kv/TransactionalStateIntegrationTest.java      |   2 +-
 ...ransactionalStateMultiStoreIntegrationTest.java |   2 +-
 35 files changed, 584 insertions(+), 114 deletions(-)
 create mode 100644 samza-api/src/main/java/org/apache/samza/checkpoint/CheckpointId.java
 create mode 100644 samza-api/src/main/java/org/apache/samza/checkpoint/CheckpointedChangelogOffset.java


[samza] 02/03: Moving writeMetadataFile() invocation to ContainerLaunchUtil from LocalContainerRunner (#1198)

Posted by pm...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

pmaheshwari pushed a commit to branch 1.3.0
in repository https://gitbox.apache.org/repos/asf/samza.git

commit 1b1af6e21a1627d6ba41c4a1ff20745290ad72c2
Author: rmatharu <40...@users.noreply.github.com>
AuthorDate: Mon Oct 21 13:28:02 2019 -0700

    Moving writeMetadataFile() invocation to ContainerLaunchUtil from LocalContainerRunner (#1198)
---
 .../src/main/java/org/apache/samza/runtime/ContainerLaunchUtil.java    | 1 +
 .../src/main/java/org/apache/samza/runtime/LocalContainerRunner.java   | 3 ---
 2 files changed, 1 insertion(+), 3 deletions(-)

diff --git a/samza-core/src/main/java/org/apache/samza/runtime/ContainerLaunchUtil.java b/samza-core/src/main/java/org/apache/samza/runtime/ContainerLaunchUtil.java
index e1b9b24..47f7398 100644
--- a/samza-core/src/main/java/org/apache/samza/runtime/ContainerLaunchUtil.java
+++ b/samza-core/src/main/java/org/apache/samza/runtime/ContainerLaunchUtil.java
@@ -79,6 +79,7 @@ public class ContainerLaunchUtil {
       JobModel jobModel) {
 
     Config config = jobModel.getConfig();
+    DiagnosticsUtil.writeMetadataFile(jobName, jobId, containerId, execEnvContainerId, config);
     run(appDesc, jobName, jobId, containerId, execEnvContainerId, jobModel, config, buildExternalContext(config));
 
     System.exit(0);
diff --git a/samza-core/src/main/java/org/apache/samza/runtime/LocalContainerRunner.java b/samza-core/src/main/java/org/apache/samza/runtime/LocalContainerRunner.java
index ebebbe7..07cd637 100644
--- a/samza-core/src/main/java/org/apache/samza/runtime/LocalContainerRunner.java
+++ b/samza-core/src/main/java/org/apache/samza/runtime/LocalContainerRunner.java
@@ -31,7 +31,6 @@ import org.apache.samza.config.JobConfig;
 import org.apache.samza.config.ShellCommandConfig;
 import org.apache.samza.container.SamzaContainer;
 import org.apache.samza.job.model.JobModel;
-import org.apache.samza.util.DiagnosticsUtil;
 import org.apache.samza.util.SamzaUncaughtExceptionHandler;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
@@ -72,8 +71,6 @@ public class LocalContainerRunner {
     MDC.put("jobName", jobName);
     MDC.put("jobId", jobId);
 
-    DiagnosticsUtil.writeMetadataFile(jobName, jobId, containerId, execEnvContainerId, config);
-
     ApplicationDescriptorImpl<? extends ApplicationDescriptor> appDesc =
         ApplicationDescriptorUtil.getAppDescriptor(ApplicationUtil.fromConfig(config), config);
 


[samza] 01/03: SAMZA-2352: Use min.compaction.lag.ms to avoid compacting the Kafka changelog topic (#1190)

Posted by pm...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

pmaheshwari pushed a commit to branch 1.3.0
in repository https://gitbox.apache.org/repos/asf/samza.git

commit 0de2385540747af500efcf134cf80f0748ffff40
Author: xinyuiscool <xi...@linkedin.com>
AuthorDate: Thu Oct 17 14:12:45 2019 -0700

    SAMZA-2352: Use min.compaction.lag.ms to avoid compacting the Kafka changelog topic (#1190)
---
 .../main/java/org/apache/samza/config/StorageConfig.java | 16 ++++++++++++++++
 .../java/org/apache/samza/config/TestStorageConfig.java  | 14 ++++++++++++++
 .../main/scala/org/apache/samza/config/KafkaConfig.scala | 12 +++++++++++-
 .../scala/org/apache/samza/config/TestKafkaConfig.scala  | 15 ++++++++++++++-
 4 files changed, 55 insertions(+), 2 deletions(-)

diff --git a/samza-core/src/main/java/org/apache/samza/config/StorageConfig.java b/samza-core/src/main/java/org/apache/samza/config/StorageConfig.java
index 86c7e7d..2b7ce02 100644
--- a/samza-core/src/main/java/org/apache/samza/config/StorageConfig.java
+++ b/samza-core/src/main/java/org/apache/samza/config/StorageConfig.java
@@ -31,6 +31,8 @@ import org.apache.samza.SamzaException;
 import org.apache.samza.execution.StreamManager;
 import org.apache.samza.util.StreamUtil;
 
+import static com.google.common.base.Preconditions.*;
+
 
 /**
  * Config helper methods related to storage.
@@ -45,6 +47,7 @@ public class StorageConfig extends MapConfig {
   public static final String MSG_SERDE = STORE_PREFIX + "%s.msg.serde";
   public static final String CHANGELOG_STREAM = STORE_PREFIX + "%s" + CHANGELOG_SUFFIX;
   public static final String ACCESSLOG_STREAM_SUFFIX = "access-log";
+  // TODO: setting replication.factor seems not working as in KafkaConfig.
   public static final String CHANGELOG_REPLICATION_FACTOR = STORE_PREFIX + "%s.changelog.replication.factor";
   public static final String CHANGELOG_MAX_MSG_SIZE_BYTES = STORE_PREFIX + "%s.changelog.max.message.size.bytes";
   public static final int DEFAULT_CHANGELOG_MAX_MSG_SIZE_BYTES = 1048576;
@@ -52,6 +55,10 @@ public class StorageConfig extends MapConfig {
   public static final boolean DEFAULT_DISALLOW_LARGE_MESSAGES = false;
   public static final String DROP_LARGE_MESSAGES = STORE_PREFIX + "%s.drop.large.messages";
   public static final boolean DEFAULT_DROP_LARGE_MESSAGES = false;
+  // The log compaction lag time for transactional state change log
+  public static final String MIN_COMPACTION_LAG_MS = "min.compaction.lag.ms";
+  public static final String CHANGELOG_MIN_COMPACTION_LAG_MS = STORE_PREFIX + "%s.changelog." + MIN_COMPACTION_LAG_MS;
+  public static final long DEFAULT_CHANGELOG_MIN_COMPACTION_LAG_MS = TimeUnit.HOURS.toMillis(4);
 
   static final String CHANGELOG_SYSTEM = "job.changelog.system";
   static final String CHANGELOG_DELETE_RETENTION_MS = STORE_PREFIX + "%s.changelog.delete.retention.ms";
@@ -207,6 +214,15 @@ public class StorageConfig extends MapConfig {
     return getBoolean(String.format(DROP_LARGE_MESSAGES, storeName), DEFAULT_DROP_LARGE_MESSAGES);
   }
 
+  public long getChangelogMinCompactionLagMs(String storeName) {
+    String minCompactLagConfigName = String.format(CHANGELOG_MIN_COMPACTION_LAG_MS, storeName);
+    // Avoid the inconsistency of overriding using stores.x.changelog.kafka...
+    checkArgument(get("stores." + storeName + ".changelog.kafka." + MIN_COMPACTION_LAG_MS) == null,
+        "Use " + minCompactLagConfigName + " to set kafka min.compaction.lag.ms property.");
+
+    return getLong(minCompactLagConfigName, DEFAULT_CHANGELOG_MIN_COMPACTION_LAG_MS);
+  }
+
   /**
    * Helper method to check if a system has a changelog attached to it.
    */
diff --git a/samza-core/src/test/java/org/apache/samza/config/TestStorageConfig.java b/samza-core/src/test/java/org/apache/samza/config/TestStorageConfig.java
index e094de2..713aa49 100644
--- a/samza-core/src/test/java/org/apache/samza/config/TestStorageConfig.java
+++ b/samza-core/src/test/java/org/apache/samza/config/TestStorageConfig.java
@@ -25,6 +25,7 @@ import java.util.Optional;
 import com.google.common.collect.ImmutableList;
 import com.google.common.collect.ImmutableMap;
 import com.google.common.collect.ImmutableSet;
+import java.util.concurrent.TimeUnit;
 import org.apache.samza.SamzaException;
 import org.junit.Test;
 
@@ -296,4 +297,17 @@ public class TestStorageConfig {
         new MapConfig(ImmutableMap.of(String.format(StorageConfig.DROP_LARGE_MESSAGES, STORE_NAME0), "true")));
     assertEquals(true, storageConfig.getDropLargeMessages(STORE_NAME0));
   }
+
+  @Test
+  public void testGetChangelogMinCompactionLagMs() {
+    // empty config, return default lag ms
+    assertEquals(DEFAULT_CHANGELOG_MIN_COMPACTION_LAG_MS,
+        new StorageConfig(new MapConfig()).getChangelogMinCompactionLagMs(STORE_NAME0));
+
+    long lagOverride = TimeUnit.HOURS.toMillis(6);
+    StorageConfig storageConfig = new StorageConfig(
+        new MapConfig(ImmutableMap.of(String.format(CHANGELOG_MIN_COMPACTION_LAG_MS, STORE_NAME0),
+            String.valueOf(lagOverride))));
+    assertEquals(lagOverride, storageConfig.getChangelogMinCompactionLagMs(STORE_NAME0));
+  }
 }
diff --git a/samza-kafka/src/main/scala/org/apache/samza/config/KafkaConfig.scala b/samza-kafka/src/main/scala/org/apache/samza/config/KafkaConfig.scala
index f8051f2..75fbb6b 100644
--- a/samza-kafka/src/main/scala/org/apache/samza/config/KafkaConfig.scala
+++ b/samza-kafka/src/main/scala/org/apache/samza/config/KafkaConfig.scala
@@ -336,8 +336,18 @@ class KafkaConfig(config: Config) extends ScalaMapConfig(config) {
         kafkaChangeLogProperties.setProperty("max.message.bytes", getChangelogStreamMaxMessageByte(name))
     }
 
+    val storageConfig = new StorageConfig(config)
     kafkaChangeLogProperties.setProperty("segment.bytes", KafkaConfig.CHANGELOG_DEFAULT_SEGMENT_SIZE)
-    kafkaChangeLogProperties.setProperty("delete.retention.ms", String.valueOf(new StorageConfig(config).getChangeLogDeleteRetentionInMs(name)))
+    kafkaChangeLogProperties.setProperty("delete.retention.ms", String.valueOf(storageConfig.getChangeLogDeleteRetentionInMs(name)))
+
+    // To enable transactional state, we will need to avoid the head of the changelog
+    // (the messages after last checkpoint) being log-compacted so we can trim the rest of the updates.
+    // We use min.compaction.log.ms to control the compaction time.
+    if (new TaskConfig(this).getTransactionalStateRestoreEnabled) {
+      kafkaChangeLogProperties.setProperty(StorageConfig.MIN_COMPACTION_LAG_MS,
+        String.valueOf(storageConfig.getChangelogMinCompactionLagMs(name)))
+    }
+
     filteredConfigs.asScala.foreach { kv => kafkaChangeLogProperties.setProperty(kv._1, kv._2) }
     kafkaChangeLogProperties
   }
diff --git a/samza-kafka/src/test/scala/org/apache/samza/config/TestKafkaConfig.scala b/samza-kafka/src/test/scala/org/apache/samza/config/TestKafkaConfig.scala
index ea6c3f8..8558a85 100644
--- a/samza-kafka/src/test/scala/org/apache/samza/config/TestKafkaConfig.scala
+++ b/samza-kafka/src/test/scala/org/apache/samza/config/TestKafkaConfig.scala
@@ -20,6 +20,7 @@
 package org.apache.samza.config
 
 import java.util.Properties
+import java.util.concurrent.TimeUnit
 
 import org.apache.samza.config.factories.PropertiesConfigFactory
 import org.junit.Assert._
@@ -82,11 +83,11 @@ class TestKafkaConfig {
 
   @Test
   def testChangeLogProperties() {
+    props.setProperty("job.changelog.system", SYSTEM_NAME)
     props.setProperty("systems." + SYSTEM_NAME + ".samza.factory", "org.apache.samza.system.kafka.KafkaSystemFactory")
     props.setProperty("stores.test1.changelog", "kafka.mychangelog1")
     props.setProperty("stores.test2.changelog", "kafka.mychangelog2")
     props.setProperty("stores.test2.changelog.max.message.bytes", "1024000")
-    props.setProperty("job.changelog.system", "kafka")
     props.setProperty("stores.test3.changelog", "otherstream")
     props.setProperty("stores.test1.changelog.kafka.cleanup.policy", "delete")
     props.setProperty("stores.test4.rocksdb.ttl.ms", "3600")
@@ -107,6 +108,7 @@ class TestKafkaConfig {
     assertEquals("otherstream", storeToChangelog.getOrDefault("test3", ""))
     assertNull(kafkaConfig.getChangelogKafkaProperties("test1").getProperty("retention.ms"))
     assertNull(kafkaConfig.getChangelogKafkaProperties("test2").getProperty("retention.ms"))
+    assertNull(kafkaConfig.getChangelogKafkaProperties("test1").getProperty("min.compaction.lag.ms"))
 
     props.setProperty("systems." + SYSTEM_NAME + ".samza.factory", "org.apache.samza.system.kafka.SomeOtherFactory")
     val storeToChangelog1 = kafkaConfig.getKafkaChangelogEnabledStores()
@@ -138,6 +140,17 @@ class TestKafkaConfig {
       String.valueOf(KafkaConfig.DEFAULT_RETENTION_MS_FOR_BATCH))
     assertEquals(batchKafkaConfig.getChangelogKafkaProperties("test3").getProperty("max.message.bytes"),
       KafkaConfig.DEFAULT_LOG_COMPACT_TOPIC_MAX_MESSAGE_BYTES)
+
+    // test compaction config for transactional state
+    val lagOverride = String.valueOf(TimeUnit.HOURS.toMillis(6))
+    props.setProperty(TaskConfig.TRANSACTIONAL_STATE_RESTORE_ENABLED, "true")
+    props.setProperty("stores.test2.changelog.min.compaction.lag.ms", lagOverride)
+    val tsMapConfig = new MapConfig(props.asScala.asJava)
+    val tsKafkaConfig = new KafkaConfig(tsMapConfig)
+    assertEquals(String.valueOf(StorageConfig.DEFAULT_CHANGELOG_MIN_COMPACTION_LAG_MS),
+      tsKafkaConfig.getChangelogKafkaProperties("test1").getProperty("min.compaction.lag.ms"))
+    assertEquals(lagOverride,
+      tsKafkaConfig.getChangelogKafkaProperties("test2").getProperty("min.compaction.lag.ms"))
   }
 
   @Test


[samza] 03/03: SAMZA-2356: [Transactional State] Do not trim changelog if time since last checkpoint is greater than min.compaction.lag. (#1196)

Posted by pm...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

pmaheshwari pushed a commit to branch 1.3.0
in repository https://gitbox.apache.org/repos/asf/samza.git

commit cb2707f4151cfa5e58eddc46ffc38cb633397349
Author: Prateek Maheshwari <pr...@utexas.edu>
AuthorDate: Tue Oct 22 17:55:54 2019 -0700

    SAMZA-2356: [Transactional State] Do not trim changelog if time since last checkpoint is greater than min.compaction.lag. (#1196)
---
 .../org/apache/samza/checkpoint/CheckpointId.java  |  82 +++++++
 .../checkpoint/CheckpointedChangelogOffset.java    |  82 +++++++
 .../org/apache/samza/storage/StorageEngine.java    |   5 +-
 .../org/apache/samza/storage/kv/KeyValueStore.java |   5 +-
 .../java/org/apache/samza/config/TaskConfig.java   |   8 +-
 .../operators/util/InternalInMemoryStore.java      |   3 +-
 .../TransactionalStateTaskRestoreManager.java      |  62 ++++--
 .../org/apache/samza/container/TaskInstance.scala  |  23 +-
 .../NonTransactionalStateTaskStorageManager.scala  |   8 +-
 .../apache/samza/storage/TaskStorageManager.scala  |   5 +-
 .../TransactionalStateTaskStorageManager.scala     |  12 +-
 .../operators/impl/store/TestInMemoryStore.java    |   4 +-
 .../apache/samza/storage/MockStorageEngine.java    |   3 +-
 .../TestTransactionalStateTaskRestoreManager.java  | 241 +++++++++++++++++++--
 .../apache/samza/container/TestTaskInstance.scala  |  22 +-
 .../samza/storage/TestContainerStorageManager.java |   2 +-
 .../TestTransactionalStateTaskStorageManager.java  |  20 +-
 .../kv/inmemory/InMemoryKeyValueStore.scala        |   4 +-
 .../samza/storage/kv/RocksDbKeyValueStore.scala    |  12 +-
 .../samza/storage/kv/LargeMessageSafeStore.java    |   3 +-
 .../samza/storage/kv/AccessLoggedStore.scala       |   3 +-
 .../org/apache/samza/storage/kv/CachedStore.scala  |   5 +-
 .../samza/storage/kv/KeyValueStorageEngine.scala   |   4 +-
 .../org/apache/samza/storage/kv/LoggedStore.scala  |   3 +-
 .../samza/storage/kv/NullSafeKeyValueStore.scala   |   4 +-
 .../samza/storage/kv/SerializedKeyValueStore.scala |   4 +-
 .../samza/storage/kv/MockKeyValueStore.scala       |   4 +-
 .../kv/TransactionalStateIntegrationTest.java      |   2 +-
 ...ransactionalStateMultiStoreIntegrationTest.java |   2 +-
 29 files changed, 528 insertions(+), 109 deletions(-)

diff --git a/samza-api/src/main/java/org/apache/samza/checkpoint/CheckpointId.java b/samza-api/src/main/java/org/apache/samza/checkpoint/CheckpointId.java
new file mode 100644
index 0000000..95dfd24
--- /dev/null
+++ b/samza-api/src/main/java/org/apache/samza/checkpoint/CheckpointId.java
@@ -0,0 +1,82 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.samza.checkpoint;
+
+import java.util.Objects;
+import org.apache.commons.lang3.StringUtils;
+import org.apache.samza.annotation.InterfaceStability;
+
+/**
+ * Checkpoint ID has the format: [currentTimeMillis, last 6 digits of nanotime], separated by a dash.
+ * This is to avoid conflicts, e.g when requesting frequent manual commits.
+ *
+ * It is expected that persistent stores use the {@link #toString()} representation of the checkpoint id
+ * as the store checkpoint directory name.
+ */
+@InterfaceStability.Unstable
+public class CheckpointId {
+  public static final String SEPARATOR = "-";
+
+  private final long millis;
+  private final long nanos;
+
+  public CheckpointId(long millis, long nanos) {
+    this.millis = millis;
+    this.nanos = nanos;
+  }
+
+  public static CheckpointId create() {
+    return new CheckpointId(System.currentTimeMillis(), System.nanoTime() % 1000000);
+  }
+
+  public static CheckpointId fromString(String checkpointId) {
+    if (StringUtils.isBlank(checkpointId)) {
+      throw new IllegalArgumentException("Invalid checkpoint id: " + checkpointId);
+    }
+    String[] parts = checkpointId.split(SEPARATOR);
+    return new CheckpointId(Long.parseLong(parts[0]), Long.parseLong(parts[1]));
+  }
+
+  public long getMillis() {
+    return millis;
+  }
+
+  public long getNanos() {
+    return nanos;
+  }
+
+  @Override
+  public String toString() {
+    return String.format("%s%s%s", millis, SEPARATOR, nanos);
+  }
+
+  @Override
+  public boolean equals(Object o) {
+    if (this == o) return true;
+    if (o == null || getClass() != o.getClass()) return false;
+    CheckpointId that = (CheckpointId) o;
+    return millis == that.millis &&
+        nanos == that.nanos;
+  }
+
+  @Override
+  public int hashCode() {
+    return Objects.hash(millis, nanos);
+  }
+}
\ No newline at end of file
diff --git a/samza-api/src/main/java/org/apache/samza/checkpoint/CheckpointedChangelogOffset.java b/samza-api/src/main/java/org/apache/samza/checkpoint/CheckpointedChangelogOffset.java
new file mode 100644
index 0000000..407ce7a
--- /dev/null
+++ b/samza-api/src/main/java/org/apache/samza/checkpoint/CheckpointedChangelogOffset.java
@@ -0,0 +1,82 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.samza.checkpoint;
+
+import java.util.Objects;
+import org.apache.commons.lang3.StringUtils;
+import org.apache.samza.annotation.InterfaceStability;
+
+/**
+ * Checkpointed changelog offset has the format: [checkpointId, offset], separated by a colon.
+ */
+@InterfaceStability.Unstable
+public class CheckpointedChangelogOffset {
+  public static final String SEPARATOR = ":";
+
+  private final CheckpointId checkpointId;
+  private final String offset;
+
+  public CheckpointedChangelogOffset(CheckpointId checkpointId, String offset) {
+    this.checkpointId = checkpointId;
+    this.offset = offset;
+  }
+
+  public static CheckpointedChangelogOffset fromString(String message) {
+    if (StringUtils.isBlank(message)) {
+      throw new IllegalArgumentException("Invalid checkpointed changelog message: " + message);
+    }
+    String[] checkpointIdAndOffset = message.split(":");
+    if (checkpointIdAndOffset.length != 2) {
+      throw new IllegalArgumentException("Invalid checkpointed changelog offset: " + message);
+    }
+    CheckpointId checkpointId = CheckpointId.fromString(checkpointIdAndOffset[0]);
+    String offset = null;
+    if (!"null".equals(checkpointIdAndOffset[1])) {
+      offset = checkpointIdAndOffset[1];
+    }
+    return new CheckpointedChangelogOffset(checkpointId, offset);
+  }
+
+  public CheckpointId getCheckpointId() {
+    return checkpointId;
+  }
+
+  public String getOffset() {
+    return offset;
+  }
+
+  @Override
+  public String toString() {
+    return String.format("%s%s%s", checkpointId, SEPARATOR, offset);
+  }
+
+  @Override
+  public boolean equals(Object o) {
+    if (this == o) return true;
+    if (o == null || getClass() != o.getClass()) return false;
+    CheckpointedChangelogOffset that = (CheckpointedChangelogOffset) o;
+    return Objects.equals(checkpointId, that.checkpointId) &&
+        Objects.equals(offset, that.offset);
+  }
+
+  @Override
+  public int hashCode() {
+    return Objects.hash(checkpointId, offset);
+  }
+}
diff --git a/samza-api/src/main/java/org/apache/samza/storage/StorageEngine.java b/samza-api/src/main/java/org/apache/samza/storage/StorageEngine.java
index 804a250..8add1de 100644
--- a/samza-api/src/main/java/org/apache/samza/storage/StorageEngine.java
+++ b/samza-api/src/main/java/org/apache/samza/storage/StorageEngine.java
@@ -22,6 +22,8 @@ package org.apache.samza.storage;
 
 import java.nio.file.Path;
 import java.util.Optional;
+import org.apache.samza.annotation.InterfaceStability;
+import org.apache.samza.checkpoint.CheckpointId;
 import org.apache.samza.system.ChangelogSSPIterator;
 
 /**
@@ -55,7 +57,8 @@ public interface StorageEngine {
   /**
    * Checkpoint store snapshots.
    */
-  Optional<Path> checkpoint(String id);
+  @InterfaceStability.Unstable
+  Optional<Path> checkpoint(CheckpointId id);
 
   /**
    * Close the storage engine
diff --git a/samza-api/src/main/java/org/apache/samza/storage/kv/KeyValueStore.java b/samza-api/src/main/java/org/apache/samza/storage/kv/KeyValueStore.java
index d262e29..41faac3 100644
--- a/samza-api/src/main/java/org/apache/samza/storage/kv/KeyValueStore.java
+++ b/samza-api/src/main/java/org/apache/samza/storage/kv/KeyValueStore.java
@@ -25,6 +25,7 @@ import java.util.List;
 import java.util.Map;
 import java.util.Optional;
 import org.apache.samza.annotation.InterfaceStability;
+import org.apache.samza.checkpoint.CheckpointId;
 
 
 /**
@@ -150,6 +151,6 @@ public interface KeyValueStore<K, V> {
    * Create a persistent checkpoint / snapshot of the current store state and return it's path.
    * @return the path of the persistent store checkpoint, or an empty optional if checkpoints are not supported.
    */
-  @InterfaceStability.Evolving
-  Optional<Path> checkpoint(String id);
+  @InterfaceStability.Unstable
+  Optional<Path> checkpoint(CheckpointId id);
 }
diff --git a/samza-core/src/main/java/org/apache/samza/config/TaskConfig.java b/samza-core/src/main/java/org/apache/samza/config/TaskConfig.java
index 22e0fa9..b02f6c9 100644
--- a/samza-core/src/main/java/org/apache/samza/config/TaskConfig.java
+++ b/samza-core/src/main/java/org/apache/samza/config/TaskConfig.java
@@ -110,8 +110,8 @@ public class TaskConfig extends MapConfig {
   private static final boolean DEFAULT_TRANSACTIONAL_STATE_CHECKPOINT_ENABLED = true;
   public static final String TRANSACTIONAL_STATE_RESTORE_ENABLED = "task.transactional.state.restore.enabled";
   private static final boolean DEFAULT_TRANSACTIONAL_STATE_RESTORE_ENABLED = false;
-  public static final String TRANSACTIONAL_STATE_RETAIN_EXISTING_CHANGELOG_STATE =
-      "task.transactional.state.retain.existing.changelog.state";
+  public static final String TRANSACTIONAL_STATE_RETAIN_EXISTING_STATE =
+      "task.transactional.state.retain.existing.state";
   private static final boolean DEFAULT_TRANSACTIONAL_STATE_RETAIN_EXISTING_STATE = true;
 
   public TaskConfig(Config config) {
@@ -313,7 +313,7 @@ public class TaskConfig extends MapConfig {
     return getBoolean(TRANSACTIONAL_STATE_RESTORE_ENABLED, DEFAULT_TRANSACTIONAL_STATE_RESTORE_ENABLED);
   }
 
-  public boolean getTransactionalStateRetainExistingChangelogState() {
-    return getBoolean(TRANSACTIONAL_STATE_RETAIN_EXISTING_CHANGELOG_STATE, DEFAULT_TRANSACTIONAL_STATE_RETAIN_EXISTING_STATE);
+  public boolean getTransactionalStateRetainExistingState() {
+    return getBoolean(TRANSACTIONAL_STATE_RETAIN_EXISTING_STATE, DEFAULT_TRANSACTIONAL_STATE_RETAIN_EXISTING_STATE);
   }
 }
diff --git a/samza-core/src/main/java/org/apache/samza/operators/util/InternalInMemoryStore.java b/samza-core/src/main/java/org/apache/samza/operators/util/InternalInMemoryStore.java
index 2ad25eb..6a5ebf8 100644
--- a/samza-core/src/main/java/org/apache/samza/operators/util/InternalInMemoryStore.java
+++ b/samza-core/src/main/java/org/apache/samza/operators/util/InternalInMemoryStore.java
@@ -19,6 +19,7 @@
 
 package org.apache.samza.operators.util;
 
+import org.apache.samza.checkpoint.CheckpointId;
 import org.apache.samza.storage.kv.Entry;
 import org.apache.samza.storage.kv.KeyValueSnapshot;
 import org.apache.samza.storage.kv.KeyValueIterator;
@@ -140,7 +141,7 @@ public class InternalInMemoryStore<K, V> implements KeyValueStore<K, V> {
   }
 
   @Override
-  public Optional<Path> checkpoint(String id) {
+  public Optional<Path> checkpoint(CheckpointId id) {
     return Optional.empty();
   }
 }
diff --git a/samza-core/src/main/java/org/apache/samza/storage/TransactionalStateTaskRestoreManager.java b/samza-core/src/main/java/org/apache/samza/storage/TransactionalStateTaskRestoreManager.java
index 4dd7f59..1e54ea1 100644
--- a/samza-core/src/main/java/org/apache/samza/storage/TransactionalStateTaskRestoreManager.java
+++ b/samza-core/src/main/java/org/apache/samza/storage/TransactionalStateTaskRestoreManager.java
@@ -31,9 +31,12 @@ import java.util.HashMap;
 import java.util.List;
 import java.util.Map;
 import java.util.Optional;
+import org.apache.commons.lang3.StringUtils;
 import org.apache.samza.Partition;
 import org.apache.samza.SamzaException;
+import org.apache.samza.checkpoint.CheckpointedChangelogOffset;
 import org.apache.samza.config.Config;
+import org.apache.samza.config.StorageConfig;
 import org.apache.samza.config.TaskConfig;
 import org.apache.samza.container.TaskName;
 import org.apache.samza.job.model.TaskMode;
@@ -230,8 +233,16 @@ public class TransactionalStateTaskRestoreManager implements TaskRestoreManager
         SystemStreamPartitionMetadata changelogSSPMetadata = currentChangelogOffsets.get(changelogSSP);
         String oldestOffset = changelogSSPMetadata.getOldestOffset();
         String newestOffset = changelogSSPMetadata.getNewestOffset();
-        String checkpointedOffset = checkpointedChangelogOffsets.get(changelogSSP);
 
+        String checkpointMessage = checkpointedChangelogOffsets.get(changelogSSP);
+        String checkpointedOffset = null;  // can be null if no message, or message has null offset
+        long timeSinceLastCheckpointInMs = Long.MAX_VALUE;
+        if (StringUtils.isNotBlank(checkpointMessage)) {
+          CheckpointedChangelogOffset checkpointedChangelogOffset = CheckpointedChangelogOffset.fromString(checkpointMessage);
+          checkpointedOffset = checkpointedChangelogOffset.getOffset();
+          timeSinceLastCheckpointInMs = System.currentTimeMillis() -
+              checkpointedChangelogOffset.getCheckpointId().getMillis();
+        }
 
         Optional<File> currentDirOptional;
         Optional<List<File>> checkpointDirsOptional;
@@ -255,21 +266,27 @@ public class TransactionalStateTaskRestoreManager implements TaskRestoreManager
             storeDirsToDelete.put(storeName, currentDir);
           });
 
-        // first check if checkpointed offset is invalid (i.e., out of range of current offsets, or null)
         if (checkpointedOffset == null && oldestOffset != null) {
           // this can mean that either this is the initial migration for this feature and there are no previously
           // checkpointed changelog offsets, or that this is a new store or changelog topic after the initial migration.
 
           // if this is the first time migration, it might be desirable to retain existing data.
-          // if this is new store or topic, it's possible that the container previously died after writing some data to
-          // the changelog but before a commit, so it's desirable to delete the store, not restore anything and
+          // if this is new store or topic, it is possible that the container previously died after writing some data to
+          // the changelog but before a commit, so it is desirable to delete the store, not restore anything and
           // trim the changelog
 
-          // since we can't easily tell the difference b/w the two scenarios by just looking at the store and changelogs,
+          // since we can't tell the difference b/w the two scenarios by just looking at the store and changelogs,
           // we'll request users to indicate whether to retain existing data using a config flag. this flag should only
           // be set during migrations, and turned off after the first successful commit of the new container (i.e. next
           // deploy). for simplicity, we'll always delete the local store, and restore from changelog if necessary.
 
+          // the former scenario should not be common. the recommended way to opt-in to the transactional state feature
+          // is to first upgrade to the latest samza version but keep the transactional state restore config off.
+          // this will create the store checkpoint directories and write the changelog offset to the checkpoint, but
+          // will not use them during restore. once this is done (i.e. at least one commit after upgrade), the
+          // transactional state restore feature can be turned on on subsequent deploys. this code path exists as a
+          // fail-safe against clearing changelogs in case users do not follow upgrade instructions and enable the
+          // feature directly.
           checkpointDirsOptional.ifPresent(checkpointDirs ->
               checkpointDirs.forEach(checkpointDir -> {
                   LOG.info("Marking checkpoint directory: {} for store: {} in task: {} for deletion since checkpointed " +
@@ -278,7 +295,7 @@ public class TransactionalStateTaskRestoreManager implements TaskRestoreManager
                   storeDirsToDelete.put(storeName, checkpointDir);
                 }));
 
-          if (new TaskConfig(config).getTransactionalStateRetainExistingChangelogState()) {
+          if (new TaskConfig(config).getTransactionalStateRetainExistingState()) {
             // mark for restore from (oldest, newest) to recreate local state.
             LOG.warn("Checkpointed offset for store: {} in task: {} is null. Since retain existing state is true, " +
                 "local state will be fully restored from current changelog contents. " +
@@ -290,7 +307,7 @@ public class TransactionalStateTaskRestoreManager implements TaskRestoreManager
             // mark for restore from (oldest, null) to trim entire changelog.
             storesToRestore.put(storeName, new RestoreOffsets(oldestOffset, null));
           }
-        } else if (// check if the checkpointed offset is in range of current oldest and newest offsets
+        } else if (// check if the checkpointed offset is out of range of current oldest and newest offsets
             admin.offsetComparator(oldestOffset, checkpointedOffset) > 0 ||
             admin.offsetComparator(checkpointedOffset, newestOffset) > 0) {
           // checkpointed offset is out of range. this could mean that this is a TTL topic and the checkpointed
@@ -312,12 +329,29 @@ public class TransactionalStateTaskRestoreManager implements TaskRestoreManager
                 "will be fully restored from current changelog contents.", storeName);
             storesToRestore.put(storeName, new RestoreOffsets(oldestOffset, checkpointedOffset));
           } else { // persistent logged store
-            // if there exists a valid store checkpoint directory with oldest offset <= local offset <= checkpointed offset,
+            String targetOffset;
+
+            // check checkpoint time against min.compaction.lag.ms. if older, restore from checkpointed offset to newest
+            // with no trim. be conservative. allow 10% safety margin to avoid deletions when the downtime is close
+            // to min.compaction.lag.ms
+            long minCompactionLagMs = new StorageConfig(config).getChangelogMinCompactionLagMs(storeName);
+            if (timeSinceLastCheckpointInMs > .9 * minCompactionLagMs) {
+              LOG.warn("Checkpointed offset for store: {} in task: {} is: {}. It is in range of oldest: {} and " +
+                  "newest: {} changelog offset. However, time since last checkpoint is: {}, which is greater than " +
+                  "0.9 * min.compaction.lag.ms: {} for the changelog topic. Since there is a chance that" +
+                  "the changelog topic has been compacted, restoring store to the end of the current changelog contents." +
+                  "There is no transactional local state guarantee.", storeName, taskName, checkpointedOffset,
+                  oldestOffset, newestOffset, timeSinceLastCheckpointInMs, minCompactionLagMs);
+              targetOffset = newestOffset;
+            } else {
+              targetOffset = checkpointedOffset;
+            }
+
+            // if there exists a valid store checkpoint directory with oldest offset <= local offset <= target offset,
             // retain it and restore the delta. delete all other checkpoint directories for the store. if more than one such
             // checkpoint directory exists, retain the one with the highest local offset and delete the rest.
             boolean hasValidCheckpointDir = false;
             for (File checkpointDir: checkpointDirsOptional.get()) {
-              // TODO HIGH pmaheshw: should validation check / warn for compact lag config staleness too?
               if (storageManagerUtil.isLoggedStoreValid(
                   storeName, checkpointDir, config, storeChangelogs, taskModel, clock, storeEngines)) {
                 String localOffset = storageManagerUtil.readOffsetFile(
@@ -326,17 +360,17 @@ public class TransactionalStateTaskRestoreManager implements TaskRestoreManager
                     checkpointDir, taskName);
 
                 if (admin.offsetComparator(localOffset, oldestOffset) >= 0 &&
-                    admin.offsetComparator(localOffset, checkpointedOffset) <= 0 &&
+                    admin.offsetComparator(localOffset, targetOffset) <= 0 &&
                     (storesToRestore.get(storeName) == null ||
                         admin.offsetComparator(localOffset, storesToRestore.get(storeName).startingOffset) > 0)) {
                   hasValidCheckpointDir = true;
                   LOG.info("Temporarily marking checkpoint dir: {} for store: {} in task: {} for retention. " +
-                          "May be overridden later.", checkpointDir, storeName, taskName);
+                      "May be overridden later.", checkpointDir, storeName, taskName);
                   storeDirToRetain.put(storeName, checkpointDir);
                   // mark for restore even if local == checkpointed, so that the changelog gets trimmed.
                   LOG.info("Temporarily marking store: {} in task: {} for restore from beginning offset: {} to " +
-                          "ending offset: {}. May be overridden later", storeName, taskName, localOffset, checkpointedOffset);
-                  storesToRestore.put(storeName, new RestoreOffsets(localOffset, checkpointedOffset));
+                      "ending offset: {}. May be overridden later", storeName, taskName, localOffset, targetOffset);
+                  storesToRestore.put(storeName, new RestoreOffsets(localOffset, targetOffset));
                 }
               }
             }
@@ -353,7 +387,7 @@ public class TransactionalStateTaskRestoreManager implements TaskRestoreManager
 
             // if the store had not valid checkpoint dirs to retain, restore from changelog
             if (!hasValidCheckpointDir) {
-              storesToRestore.put(storeName, new RestoreOffsets(oldestOffset, checkpointedOffset));
+              storesToRestore.put(storeName, new RestoreOffsets(oldestOffset, targetOffset));
             }
           }
         }
diff --git a/samza-core/src/main/scala/org/apache/samza/container/TaskInstance.scala b/samza-core/src/main/scala/org/apache/samza/container/TaskInstance.scala
index a87c535..2a4f1d6 100644
--- a/samza-core/src/main/scala/org/apache/samza/container/TaskInstance.scala
+++ b/samza-core/src/main/scala/org/apache/samza/container/TaskInstance.scala
@@ -24,7 +24,7 @@ import java.util.{Objects, Optional}
 import java.util.concurrent.ScheduledExecutorService
 
 import org.apache.samza.SamzaException
-import org.apache.samza.checkpoint.{Checkpoint, OffsetManager}
+import org.apache.samza.checkpoint.{Checkpoint, CheckpointId, CheckpointedChangelogOffset, OffsetManager}
 import org.apache.samza.config.{Config, StreamConfig, TaskConfig}
 import org.apache.samza.context._
 import org.apache.samza.job.model.{JobModel, TaskModel}
@@ -253,23 +253,26 @@ class TaskInstance(
       trace("Flushing state stores for taskName: %s" format taskName)
       newestChangelogOffsets = storageManager.flush()
       trace("Got newest changelog offsets for taskName: %s as: %s " format(taskName, newestChangelogOffsets))
+    }
+
+    val checkpointId = CheckpointId.create()
+    if (storageManager != null && newestChangelogOffsets != null) {
+      trace("Checkpointing stores for taskName: %s with checkpoint id: %s" format (taskName, checkpointId))
+      storageManager.checkpoint(checkpointId, newestChangelogOffsets.toMap)
+    }
+
+    if (newestChangelogOffsets != null) {
       newestChangelogOffsets.foreach {case (ssp, newestOffsetOption) =>
-        allCheckpointOffsets.put(ssp, newestOffsetOption.orNull)
+        val offset = new CheckpointedChangelogOffset(checkpointId, newestOffsetOption.orNull).toString
+        allCheckpointOffsets.put(ssp, offset)
       }
     }
-
     val checkpoint = new Checkpoint(allCheckpointOffsets)
     trace("Got combined checkpoint offsets for taskName: %s as: %s" format (taskName, allCheckpointOffsets))
 
-    var checkpointId: String = null
-    if (storageManager != null && newestChangelogOffsets != null) {
-      trace("Checkpointing stores for taskName: %s" format taskName)
-      checkpointId = storageManager.checkpoint(newestChangelogOffsets.toMap)
-    }
-
     offsetManager.writeCheckpoint(taskName, checkpoint)
 
-    if (storageManager != null && checkpointId != null) {
+    if (storageManager != null) {
       trace("Remove old checkpoint stores for taskName: %s" format taskName)
       storageManager.removeOldCheckpoints(checkpointId)
     }
diff --git a/samza-core/src/main/scala/org/apache/samza/storage/NonTransactionalStateTaskStorageManager.scala b/samza-core/src/main/scala/org/apache/samza/storage/NonTransactionalStateTaskStorageManager.scala
index cfca0f7..7b38749 100644
--- a/samza-core/src/main/scala/org/apache/samza/storage/NonTransactionalStateTaskStorageManager.scala
+++ b/samza-core/src/main/scala/org/apache/samza/storage/NonTransactionalStateTaskStorageManager.scala
@@ -23,6 +23,7 @@ import java.io._
 
 import com.google.common.annotations.VisibleForTesting
 import com.google.common.collect.ImmutableSet
+import org.apache.samza.checkpoint.CheckpointId
 import org.apache.samza.container.TaskName
 import org.apache.samza.job.model.TaskMode
 import org.apache.samza.system._
@@ -57,11 +58,10 @@ class NonTransactionalStateTaskStorageManager(
     newestChangelogSSPOffsets
   }
 
-  def checkpoint(newestChangelogOffsets: Map[SystemStreamPartition, Option[String]]): String = {
-    null
-  }
+  override def checkpoint(checkpointId: CheckpointId,
+    newestChangelogOffsets: Map[SystemStreamPartition, Option[String]]): Unit = {}
 
-  override def removeOldCheckpoints(checkpointId: String): Unit =  {}
+  override def removeOldCheckpoints(checkpointId: CheckpointId): Unit = {}
 
   @VisibleForTesting
   def stop() {
diff --git a/samza-core/src/main/scala/org/apache/samza/storage/TaskStorageManager.scala b/samza-core/src/main/scala/org/apache/samza/storage/TaskStorageManager.scala
index c98461b..50d6418 100644
--- a/samza-core/src/main/scala/org/apache/samza/storage/TaskStorageManager.scala
+++ b/samza-core/src/main/scala/org/apache/samza/storage/TaskStorageManager.scala
@@ -19,6 +19,7 @@
 
 package org.apache.samza.storage
 
+import org.apache.samza.checkpoint.CheckpointId
 import org.apache.samza.system.SystemStreamPartition
 
 trait TaskStorageManager {
@@ -27,9 +28,9 @@ trait TaskStorageManager {
 
   def flush(): Map[SystemStreamPartition, Option[String]]
 
-  def checkpoint(newestChangelogOffsets: Map[SystemStreamPartition, Option[String]]): String
+  def checkpoint(checkpointId: CheckpointId, newestChangelogOffsets: Map[SystemStreamPartition, Option[String]]): Unit
 
-  def removeOldCheckpoints(checkpointId: String): Unit
+  def removeOldCheckpoints(checkpointId: CheckpointId): Unit
 
   def stop(): Unit
 
diff --git a/samza-core/src/main/scala/org/apache/samza/storage/TransactionalStateTaskStorageManager.scala b/samza-core/src/main/scala/org/apache/samza/storage/TransactionalStateTaskStorageManager.scala
index c808866..20c7271 100644
--- a/samza-core/src/main/scala/org/apache/samza/storage/TransactionalStateTaskStorageManager.scala
+++ b/samza-core/src/main/scala/org/apache/samza/storage/TransactionalStateTaskStorageManager.scala
@@ -26,6 +26,7 @@ import com.google.common.annotations.VisibleForTesting
 import com.google.common.collect.ImmutableSet
 import org.apache.commons.io.FileUtils
 import org.apache.commons.io.filefilter.WildcardFileFilter
+import org.apache.samza.checkpoint.CheckpointId
 import org.apache.samza.{Partition, SamzaException}
 import org.apache.samza.container.TaskName
 import org.apache.samza.job.model.TaskMode
@@ -56,15 +57,14 @@ class TransactionalStateTaskStorageManager(
     getNewestChangelogSSPOffsets(taskName, storeChangelogs, partition, systemAdmins)
   }
 
-  def checkpoint(newestChangelogOffsets: Map[SystemStreamPartition, Option[String]]): String = {
+  def checkpoint(checkpointId: CheckpointId, newestChangelogOffsets: Map[SystemStreamPartition, Option[String]]): Unit = {
     debug("Checkpointing stores.")
 
-    val id = System.currentTimeMillis().toString
     val checkpointPaths = containerStorageManager.getAllStores(taskName).asScala
       .filter { case (storeName, storeEngine) =>
         storeEngine.getStoreProperties.isLoggedStore && storeEngine.getStoreProperties.isPersistedToDisk}
       .flatMap { case (storeName, storeEngine) => {
-        val pathOptional = storeEngine.checkpoint(id)
+        val pathOptional = storeEngine.checkpoint(checkpointId)
         if (pathOptional.isPresent) {
           Some(storeName, pathOptional.get())
         } else {
@@ -74,11 +74,9 @@ class TransactionalStateTaskStorageManager(
       .toMap
 
     writeChangelogOffsetFiles(checkpointPaths, storeChangelogs, newestChangelogOffsets)
-
-    id
   }
 
-  def removeOldCheckpoints(latestCheckpointId: String): Unit = {
+  def removeOldCheckpoints(latestCheckpointId: CheckpointId): Unit = {
     if (latestCheckpointId != null) {
       debug("Removing older checkpoints before " + latestCheckpointId)
 
@@ -93,7 +91,7 @@ class TransactionalStateTaskStorageManager(
             val checkpointDirs = storeDir.listFiles(fileFilter)
 
             checkpointDirs
-              .filter(!_.getName.contains(latestCheckpointId))
+              .filter(!_.getName.contains(latestCheckpointId.toString))
               .foreach(checkpointDir => {
                 FileUtils.deleteDirectory(checkpointDir)
               })
diff --git a/samza-core/src/test/java/org/apache/samza/operators/impl/store/TestInMemoryStore.java b/samza-core/src/test/java/org/apache/samza/operators/impl/store/TestInMemoryStore.java
index c742409..0022cc7 100644
--- a/samza-core/src/test/java/org/apache/samza/operators/impl/store/TestInMemoryStore.java
+++ b/samza-core/src/test/java/org/apache/samza/operators/impl/store/TestInMemoryStore.java
@@ -19,6 +19,8 @@
 package org.apache.samza.operators.impl.store;
 
 import com.google.common.primitives.UnsignedBytes;
+
+import org.apache.samza.checkpoint.CheckpointId;
 import org.apache.samza.serializers.Serde;
 import org.apache.samza.storage.kv.Entry;
 import org.apache.samza.storage.kv.KeyValueSnapshot;
@@ -132,7 +134,7 @@ public class TestInMemoryStore<K, V> implements KeyValueStore<K, V> {
   }
 
   @Override
-  public Optional<Path> checkpoint(String id) {
+  public Optional<Path> checkpoint(CheckpointId id) {
     return Optional.empty();
   }
 
diff --git a/samza-core/src/test/java/org/apache/samza/storage/MockStorageEngine.java b/samza-core/src/test/java/org/apache/samza/storage/MockStorageEngine.java
index 405abd7..2fdb81a 100644
--- a/samza-core/src/test/java/org/apache/samza/storage/MockStorageEngine.java
+++ b/samza-core/src/test/java/org/apache/samza/storage/MockStorageEngine.java
@@ -26,6 +26,7 @@ import java.util.Collections;
 
 import java.util.List;
 import java.util.Optional;
+import org.apache.samza.checkpoint.CheckpointId;
 import org.apache.samza.system.ChangelogSSPIterator;
 import org.apache.samza.system.IncomingMessageEnvelope;
 import org.apache.samza.system.SystemStreamPartition;
@@ -63,7 +64,7 @@ public class MockStorageEngine implements StorageEngine {
   }
 
   @Override
-  public Optional<Path> checkpoint(String id) {
+  public Optional<Path> checkpoint(CheckpointId id) {
     return Optional.empty();
   }
 
diff --git a/samza-core/src/test/java/org/apache/samza/storage/TestTransactionalStateTaskRestoreManager.java b/samza-core/src/test/java/org/apache/samza/storage/TestTransactionalStateTaskRestoreManager.java
index 789c0e7..879f8d5 100644
--- a/samza-core/src/test/java/org/apache/samza/storage/TestTransactionalStateTaskRestoreManager.java
+++ b/samza-core/src/test/java/org/apache/samza/storage/TestTransactionalStateTaskRestoreManager.java
@@ -31,6 +31,8 @@ import java.util.HashMap;
 import java.util.Map;
 import java.util.Set;
 import org.apache.samza.Partition;
+import org.apache.samza.checkpoint.CheckpointId;
+import org.apache.samza.checkpoint.CheckpointedChangelogOffset;
 import org.apache.samza.config.Config;
 import org.apache.samza.config.MapConfig;
 import org.apache.samza.config.TaskConfig;
@@ -180,8 +182,10 @@ public class TestTransactionalStateTaskRestoreManager {
     Map<String, SystemStream> mockStoreChangelogs = ImmutableMap.of(store1Name, changelog1SystemStream);
 
     String changelog1CheckpointedOffset = "5";
+    CheckpointedChangelogOffset changelog1CheckpointMessage =
+        new CheckpointedChangelogOffset(CheckpointId.create(), changelog1CheckpointedOffset);
     ImmutableMap<SystemStreamPartition, String> mockCheckpointedChangelogOffset =
-        ImmutableMap.of(changelog1SSP, changelog1CheckpointedOffset);
+        ImmutableMap.of(changelog1SSP, changelog1CheckpointMessage.toString());
     Map<SystemStreamPartition, SystemStreamPartitionMetadata> mockCurrentChangelogOffsets =
         ImmutableMap.of(changelog1SSP, changelog1SSPMetadata);
 
@@ -242,9 +246,11 @@ public class TestTransactionalStateTaskRestoreManager {
     Map<String, SystemStream> mockStoreChangelogs = ImmutableMap.of(store1Name, changelog1SystemStream);
 
     String changelog1CheckpointedOffset = "21";
+    CheckpointedChangelogOffset changelog1CheckpointMessage =
+        new CheckpointedChangelogOffset(CheckpointId.create(), changelog1CheckpointedOffset);
     Map<SystemStreamPartition, String> mockCheckpointedChangelogOffset =
         new HashMap<SystemStreamPartition, String>() { {
-          put(changelog1SSP, changelog1CheckpointedOffset);
+          put(changelog1SSP, changelog1CheckpointMessage.toString());
         } };
     Map<SystemStreamPartition, SystemStreamPartitionMetadata> mockCurrentChangelogOffsets =
         ImmutableMap.of(changelog1SSP, changelog1SSPMetadata);
@@ -307,9 +313,79 @@ public class TestTransactionalStateTaskRestoreManager {
     Map<String, SystemStream> mockStoreChangelogs = ImmutableMap.of(store1Name, changelog1SystemStream);
 
     String changelog1CheckpointedOffset = "5";
+    CheckpointedChangelogOffset changelog1CheckpointMessage =
+        new CheckpointedChangelogOffset(CheckpointId.create(), changelog1CheckpointedOffset);
     Map<SystemStreamPartition, String> mockCheckpointedChangelogOffset =
         new HashMap<SystemStreamPartition, String>() { {
-          put(changelog1SSP, changelog1CheckpointedOffset);
+          put(changelog1SSP, changelog1CheckpointMessage.toString());
+        } };
+    Map<SystemStreamPartition, SystemStreamPartitionMetadata> mockCurrentChangelogOffsets =
+        ImmutableMap.of(changelog1SSP, changelog1SSPMetadata);
+
+    SystemAdmins mockSystemAdmins = mock(SystemAdmins.class);
+    SystemAdmin mockSystemAdmin = mock(SystemAdmin.class);
+    when(mockSystemAdmins.getSystemAdmin(changelog1SSP.getSystem())).thenReturn(mockSystemAdmin);
+    StorageManagerUtil mockStorageManagerUtil = mock(StorageManagerUtil.class);
+    File mockLoggedStoreBaseDir = mock(File.class);
+    File mockNonLoggedStoreBaseDir = mock(File.class);
+    Config mockConfig = mock(Config.class);
+    Clock mockClock = mock(Clock.class);
+
+    Mockito.when(mockSystemAdmin.offsetComparator(anyString(), anyString()))
+        .thenAnswer((Answer<Integer>) invocation -> {
+            String offset1 = (String) invocation.getArguments()[0];
+            String offset2 = (String) invocation.getArguments()[1];
+            return Long.valueOf(offset1).compareTo(Long.valueOf(offset2));
+          });
+
+    StoreActions storeActions = TransactionalStateTaskRestoreManager.getStoreActions(
+        mockTaskModel, mockStoreEngines, mockStoreChangelogs, mockCheckpointedChangelogOffset,
+        mockCurrentChangelogOffsets, mockSystemAdmins, mockStorageManagerUtil,
+        mockLoggedStoreBaseDir, mockNonLoggedStoreBaseDir, mockConfig, mockClock);
+
+    // ensure that there is nothing to retain or delete
+    assertEquals(0, storeActions.storeDirsToDelete.size());
+    assertEquals(0, storeActions.storeDirsToRetain.size());
+    // ensure that we mark the store for full restore (from current oldest to current newest)
+    assertEquals("10", storeActions.storesToRestore.get(store1Name).startingOffset);
+    assertEquals("20", storeActions.storesToRestore.get(store1Name).endingOffset);
+  }
+
+  /**
+   * This can happen if the changelog offset is valid but the checkpoint is older than min compaction lag ms. E.g., when
+   * the job/container shut down and restarted after a long time.
+   */
+  @Test
+  public void testGetStoreActionsForLoggedNonPersistentStore_FullRestoreIfCheckpointedOffsetInRangeButMaybeCompacted() {
+    TaskModel mockTaskModel = mock(TaskModel.class);
+    TaskName taskName = new TaskName("Partition 0");
+    when(mockTaskModel.getTaskName()).thenReturn(taskName);
+    Partition taskChangelogPartition = new Partition(0);
+    when(mockTaskModel.getChangelogPartition()).thenReturn(taskChangelogPartition);
+
+    String store1Name = "store1";
+    StorageEngine store1Engine = mock(StorageEngine.class);
+    StoreProperties mockStore1Properties = mock(StoreProperties.class);
+    when(store1Engine.getStoreProperties()).thenReturn(mockStore1Properties);
+    when(mockStore1Properties.isLoggedStore()).thenReturn(true);
+    when(mockStore1Properties.isPersistedToDisk()).thenReturn(false); // non-persistent store
+    Map<String, StorageEngine> mockStoreEngines = ImmutableMap.of(store1Name, store1Engine);
+
+    String changelog1SystemName = "system1";
+    String changelog1StreamName = "store1Changelog";
+    SystemStream changelog1SystemStream = new SystemStream(changelog1SystemName, changelog1StreamName);
+    SystemStreamPartition changelog1SSP = new SystemStreamPartition(changelog1SystemStream, taskChangelogPartition);
+    // checkpointed changelog offset > newest offset (e.g. changelog topic got changed)
+    SystemStreamPartitionMetadata changelog1SSPMetadata = new SystemStreamPartitionMetadata("10", "20", "21");
+    Map<String, SystemStream> mockStoreChangelogs = ImmutableMap.of(store1Name, changelog1SystemStream);
+
+    String changelog1CheckpointedOffset = "5";
+    CheckpointId checkpointId = CheckpointId.fromString("0-0"); // checkpoint id older than default min.compaction.lag.ms
+    CheckpointedChangelogOffset changelog1CheckpointMessage =
+        new CheckpointedChangelogOffset(checkpointId, changelog1CheckpointedOffset);
+    Map<SystemStreamPartition, String> mockCheckpointedChangelogOffset =
+        new HashMap<SystemStreamPartition, String>() { {
+          put(changelog1SSP, changelog1CheckpointMessage.toString());
         } };
     Map<SystemStreamPartition, SystemStreamPartitionMetadata> mockCurrentChangelogOffsets =
         ImmutableMap.of(changelog1SSP, changelog1SSPMetadata);
@@ -375,9 +451,11 @@ public class TestTransactionalStateTaskRestoreManager {
     Map<String, SystemStream> mockStoreChangelogs = ImmutableMap.of(store1Name, changelog1SystemStream);
 
     String changelog1CheckpointedOffset = null;
+    CheckpointedChangelogOffset changelog1CheckpointMessage =
+        new CheckpointedChangelogOffset(CheckpointId.create(), changelog1CheckpointedOffset);
     Map<SystemStreamPartition, String> mockCheckpointedChangelogOffset =
         new HashMap<SystemStreamPartition, String>() { {
-          put(changelog1SSP, changelog1CheckpointedOffset);
+          put(changelog1SSP, changelog1CheckpointMessage.toString());
         } };
     Map<SystemStreamPartition, SystemStreamPartitionMetadata> mockCurrentChangelogOffsets =
         ImmutableMap.of(changelog1SSP, changelog1SSPMetadata);
@@ -389,7 +467,7 @@ public class TestTransactionalStateTaskRestoreManager {
     File mockLoggedStoreBaseDir = mock(File.class);
     File mockNonLoggedStoreBaseDir = mock(File.class);
     HashMap<String, String> configMap = new HashMap<>();
-    configMap.put(TaskConfig.TRANSACTIONAL_STATE_RETAIN_EXISTING_CHANGELOG_STATE, "false");
+    configMap.put(TaskConfig.TRANSACTIONAL_STATE_RETAIN_EXISTING_STATE, "false");
     Config mockConfig = new MapConfig(configMap);
     Clock mockClock = mock(Clock.class);
 
@@ -437,9 +515,11 @@ public class TestTransactionalStateTaskRestoreManager {
     Map<String, SystemStream> mockStoreChangelogs = ImmutableMap.of(store1Name, changelog1SystemStream);
 
     String changelog1CheckpointedOffset = null;
+    CheckpointedChangelogOffset changelog1CheckpointMessage =
+        new CheckpointedChangelogOffset(CheckpointId.create(), changelog1CheckpointedOffset);
     Map<SystemStreamPartition, String> mockCheckpointedChangelogOffset =
         new HashMap<SystemStreamPartition, String>() { {
-          put(changelog1SSP, changelog1CheckpointedOffset);
+          put(changelog1SSP, changelog1CheckpointMessage.toString());
         } };
     Map<SystemStreamPartition, SystemStreamPartitionMetadata> mockCurrentChangelogOffsets =
         ImmutableMap.of(changelog1SSP, changelog1SSPMetadata);
@@ -451,7 +531,7 @@ public class TestTransactionalStateTaskRestoreManager {
     File mockLoggedStoreBaseDir = mock(File.class);
     File mockNonLoggedStoreBaseDir = mock(File.class);
     HashMap<String, String> configMap = new HashMap<>();
-    configMap.put(TaskConfig.TRANSACTIONAL_STATE_RETAIN_EXISTING_CHANGELOG_STATE, "true");
+    configMap.put(TaskConfig.TRANSACTIONAL_STATE_RETAIN_EXISTING_STATE, "true");
     Config mockConfig = new MapConfig(configMap);
     Clock mockClock = mock(Clock.class);
 
@@ -499,8 +579,10 @@ public class TestTransactionalStateTaskRestoreManager {
     Map<String, SystemStream> mockStoreChangelogs = ImmutableMap.of(store1Name, changelog1SystemStream);
 
     String changelog1CheckpointedOffset = "5";
+    CheckpointedChangelogOffset changelog1CheckpointMessage =
+        new CheckpointedChangelogOffset(CheckpointId.create(), changelog1CheckpointedOffset);
     ImmutableMap<SystemStreamPartition, String> mockCheckpointedChangelogOffset =
-        ImmutableMap.of(changelog1SSP, changelog1CheckpointedOffset);
+        ImmutableMap.of(changelog1SSP, changelog1CheckpointMessage.toString());
     Map<SystemStreamPartition, SystemStreamPartitionMetadata> mockCurrentChangelogOffsets =
         ImmutableMap.of(changelog1SSP, changelog1SSPMetadata);
 
@@ -562,8 +644,10 @@ public class TestTransactionalStateTaskRestoreManager {
     Map<String, SystemStream> mockStoreChangelogs = ImmutableMap.of(store1Name, changelog1SystemStream);
 
     String changelog1CheckpointedOffset = "5";
+    CheckpointedChangelogOffset changelog1CheckpointMessage =
+        new CheckpointedChangelogOffset(CheckpointId.create(), changelog1CheckpointedOffset);
     ImmutableMap<SystemStreamPartition, String> mockCheckpointedChangelogOffset =
-        ImmutableMap.of(changelog1SSP, changelog1CheckpointedOffset);
+        ImmutableMap.of(changelog1SSP, changelog1CheckpointMessage.toString());
     Map<SystemStreamPartition, SystemStreamPartitionMetadata> mockCurrentChangelogOffsets =
         ImmutableMap.of(changelog1SSP, changelog1SSPMetadata);
 
@@ -643,8 +727,10 @@ public class TestTransactionalStateTaskRestoreManager {
     Map<String, SystemStream> mockStoreChangelogs = ImmutableMap.of(store1Name, changelog1SystemStream);
 
     String changelog1CheckpointedOffset = "5";
+    CheckpointedChangelogOffset changelog1CheckpointMessage =
+        new CheckpointedChangelogOffset(CheckpointId.create(), changelog1CheckpointedOffset);
     ImmutableMap<SystemStreamPartition, String> mockCheckpointedChangelogOffset =
-        ImmutableMap.of(changelog1SSP, changelog1CheckpointedOffset);
+        ImmutableMap.of(changelog1SSP, changelog1CheckpointMessage.toString());
     Map<SystemStreamPartition, SystemStreamPartitionMetadata> mockCurrentChangelogOffsets =
         ImmutableMap.of(changelog1SSP, changelog1SSPMetadata);
 
@@ -727,8 +813,10 @@ public class TestTransactionalStateTaskRestoreManager {
     Map<String, SystemStream> mockStoreChangelogs = ImmutableMap.of(store1Name, changelog1SystemStream);
 
     String changelog1CheckpointedOffset = "5";
+    CheckpointedChangelogOffset changelog1CheckpointMessage =
+        new CheckpointedChangelogOffset(CheckpointId.create(), changelog1CheckpointedOffset);
     ImmutableMap<SystemStreamPartition, String> mockCheckpointedChangelogOffset =
-        ImmutableMap.of(changelog1SSP, changelog1CheckpointedOffset);
+        ImmutableMap.of(changelog1SSP, changelog1CheckpointMessage.toString());
     Map<SystemStreamPartition, SystemStreamPartitionMetadata> mockCurrentChangelogOffsets =
         ImmutableMap.of(changelog1SSP, changelog1SSPMetadata);
 
@@ -810,8 +898,10 @@ public class TestTransactionalStateTaskRestoreManager {
     Map<String, SystemStream> mockStoreChangelogs = ImmutableMap.of(store1Name, changelog1SystemStream);
 
     String changelog1CheckpointedOffset = "5";
+    CheckpointedChangelogOffset changelog1CheckpointMessage =
+        new CheckpointedChangelogOffset(CheckpointId.create(), changelog1CheckpointedOffset);
     ImmutableMap<SystemStreamPartition, String> mockCheckpointedChangelogOffset =
-        ImmutableMap.of(changelog1SSP, changelog1CheckpointedOffset);
+        ImmutableMap.of(changelog1SSP, changelog1CheckpointMessage.toString());
     Map<SystemStreamPartition, SystemStreamPartitionMetadata> mockCurrentChangelogOffsets =
         ImmutableMap.of(changelog1SSP, changelog1SSPMetadata);
 
@@ -893,8 +983,10 @@ public class TestTransactionalStateTaskRestoreManager {
     Map<String, SystemStream> mockStoreChangelogs = ImmutableMap.of(store1Name, changelog1SystemStream);
 
     String changelog1CheckpointedOffset = "5";
+    CheckpointedChangelogOffset changelog1CheckpointMessage =
+        new CheckpointedChangelogOffset(CheckpointId.create(), changelog1CheckpointedOffset);
     ImmutableMap<SystemStreamPartition, String> mockCheckpointedChangelogOffset =
-        ImmutableMap.of(changelog1SSP, changelog1CheckpointedOffset);
+        ImmutableMap.of(changelog1SSP, changelog1CheckpointMessage.toString());
     Map<SystemStreamPartition, SystemStreamPartitionMetadata> mockCurrentChangelogOffsets =
         ImmutableMap.of(changelog1SSP, changelog1SSPMetadata);
 
@@ -979,9 +1071,11 @@ public class TestTransactionalStateTaskRestoreManager {
     Map<String, SystemStream> mockStoreChangelogs = ImmutableMap.of(store1Name, changelog1SystemStream);
 
     String changelog1CheckpointedOffset = null;
+    CheckpointedChangelogOffset changelog1CheckpointMessage =
+        new CheckpointedChangelogOffset(CheckpointId.create(), changelog1CheckpointedOffset);
     Map<SystemStreamPartition, String> mockCheckpointedChangelogOffset =
         new HashMap<SystemStreamPartition, String>() { {
-          put(changelog1SSP, changelog1CheckpointedOffset);
+          put(changelog1SSP, changelog1CheckpointMessage.toString());
         } };
     Map<SystemStreamPartition, SystemStreamPartitionMetadata> mockCurrentChangelogOffsets =
         ImmutableMap.of(changelog1SSP, changelog1SSPMetadata);
@@ -993,7 +1087,7 @@ public class TestTransactionalStateTaskRestoreManager {
     File mockLoggedStoreBaseDir = mock(File.class);
     File mockNonLoggedStoreBaseDir = mock(File.class);
     HashMap<String, String> configMap = new HashMap<>();
-    configMap.put(TaskConfig.TRANSACTIONAL_STATE_RETAIN_EXISTING_CHANGELOG_STATE, "true");
+    configMap.put(TaskConfig.TRANSACTIONAL_STATE_RETAIN_EXISTING_STATE, "true");
     Config mockConfig = new MapConfig(configMap);
     Clock mockClock = mock(Clock.class);
 
@@ -1072,9 +1166,11 @@ public class TestTransactionalStateTaskRestoreManager {
     Map<String, SystemStream> mockStoreChangelogs = ImmutableMap.of(store1Name, changelog1SystemStream);
 
     String changelog1CheckpointedOffset = null;
+    CheckpointedChangelogOffset changelog1CheckpointMessage =
+        new CheckpointedChangelogOffset(CheckpointId.create(), changelog1CheckpointedOffset);
     Map<SystemStreamPartition, String> mockCheckpointedChangelogOffset =
         new HashMap<SystemStreamPartition, String>() { {
-          put(changelog1SSP, changelog1CheckpointedOffset);
+          put(changelog1SSP, changelog1CheckpointMessage.toString());
         } };
     Map<SystemStreamPartition, SystemStreamPartitionMetadata> mockCurrentChangelogOffsets =
         ImmutableMap.of(changelog1SSP, changelog1SSPMetadata);
@@ -1086,7 +1182,7 @@ public class TestTransactionalStateTaskRestoreManager {
     File mockLoggedStoreBaseDir = mock(File.class);
     File mockNonLoggedStoreBaseDir = mock(File.class);
     HashMap<String, String> configMap = new HashMap<>();
-    configMap.put(TaskConfig.TRANSACTIONAL_STATE_RETAIN_EXISTING_CHANGELOG_STATE, "false");
+    configMap.put(TaskConfig.TRANSACTIONAL_STATE_RETAIN_EXISTING_STATE, "false");
     Config mockConfig = new MapConfig(configMap);
     Clock mockClock = mock(Clock.class);
 
@@ -1163,9 +1259,11 @@ public class TestTransactionalStateTaskRestoreManager {
     Map<String, SystemStream> mockStoreChangelogs = ImmutableMap.of(store1Name, changelog1SystemStream);
 
     String changelog1CheckpointedOffset = "5";
+    CheckpointedChangelogOffset changelog1CheckpointMessage =
+        new CheckpointedChangelogOffset(CheckpointId.create(), changelog1CheckpointedOffset);
     Map<SystemStreamPartition, String> mockCheckpointedChangelogOffset =
         new HashMap<SystemStreamPartition, String>() { {
-          put(changelog1SSP, changelog1CheckpointedOffset);
+          put(changelog1SSP, changelog1CheckpointMessage.toString());
         } };
     Map<SystemStreamPartition, SystemStreamPartitionMetadata> mockCurrentChangelogOffsets =
         ImmutableMap.of(changelog1SSP, changelog1SSPMetadata);
@@ -1177,7 +1275,7 @@ public class TestTransactionalStateTaskRestoreManager {
     File mockLoggedStoreBaseDir = mock(File.class);
     File mockNonLoggedStoreBaseDir = mock(File.class);
     HashMap<String, String> configMap = new HashMap<>();
-    configMap.put(TaskConfig.TRANSACTIONAL_STATE_RETAIN_EXISTING_CHANGELOG_STATE, "true"); // should not matter
+    configMap.put(TaskConfig.TRANSACTIONAL_STATE_RETAIN_EXISTING_STATE, "true"); // should not matter
     Config mockConfig = new MapConfig(configMap);
     Clock mockClock = mock(Clock.class);
 
@@ -1250,9 +1348,11 @@ public class TestTransactionalStateTaskRestoreManager {
     Map<String, SystemStream> mockStoreChangelogs = ImmutableMap.of(store1Name, changelog1SystemStream);
 
     String changelog1CheckpointedOffset = null;
+    CheckpointedChangelogOffset changelog1CheckpointMessage =
+        new CheckpointedChangelogOffset(CheckpointId.create(), changelog1CheckpointedOffset);
     Map<SystemStreamPartition, String> mockCheckpointedChangelogOffset =
         new HashMap<SystemStreamPartition, String>() { {
-          put(changelog1SSP, changelog1CheckpointedOffset);
+          put(changelog1SSP, changelog1CheckpointMessage.toString());
         } };
     Map<SystemStreamPartition, SystemStreamPartitionMetadata> mockCurrentChangelogOffsets =
         ImmutableMap.of(changelog1SSP, changelog1SSPMetadata);
@@ -1264,7 +1364,7 @@ public class TestTransactionalStateTaskRestoreManager {
     File mockLoggedStoreBaseDir = mock(File.class);
     File mockNonLoggedStoreBaseDir = mock(File.class);
     HashMap<String, String> configMap = new HashMap<>();
-    configMap.put(TaskConfig.TRANSACTIONAL_STATE_RETAIN_EXISTING_CHANGELOG_STATE, "true"); // should not matter
+    configMap.put(TaskConfig.TRANSACTIONAL_STATE_RETAIN_EXISTING_STATE, "true"); // should not matter
     Config mockConfig = new MapConfig(configMap);
     Clock mockClock = mock(Clock.class);
 
@@ -1343,9 +1443,11 @@ public class TestTransactionalStateTaskRestoreManager {
     Map<String, SystemStream> mockStoreChangelogs = ImmutableMap.of(store1Name, changelog1SystemStream);
 
     String changelog1CheckpointedOffset = "5";
+    CheckpointedChangelogOffset changelog1CheckpointMessage =
+        new CheckpointedChangelogOffset(CheckpointId.create(), changelog1CheckpointedOffset);
     Map<SystemStreamPartition, String> mockCheckpointedChangelogOffset =
         new HashMap<SystemStreamPartition, String>() { {
-          put(changelog1SSP, changelog1CheckpointedOffset);
+          put(changelog1SSP, changelog1CheckpointMessage.toString());
         } };
     Map<SystemStreamPartition, SystemStreamPartitionMetadata> mockCurrentChangelogOffsets =
         ImmutableMap.of(changelog1SSP, changelog1SSPMetadata);
@@ -1403,6 +1505,97 @@ public class TestTransactionalStateTaskRestoreManager {
   }
 
   /**
+   * This can happen if the changelog offset is valid but the checkpoint is older than min compaction lag ms. E.g., when
+   * the job/container shut down and restarted after a long time.
+   */
+  @Test
+  public void testGetStoreActionsForLoggedPersistentStore_RestoreFromLocalToNewestIfCheckpointedOffsetInRangeButMaybeCompacted() {
+    TaskModel mockTaskModel = mock(TaskModel.class);
+    TaskName taskName = new TaskName("Partition 0");
+    when(mockTaskModel.getTaskName()).thenReturn(taskName);
+    Partition taskChangelogPartition = new Partition(0);
+    when(mockTaskModel.getChangelogPartition()).thenReturn(taskChangelogPartition);
+
+    String store1Name = "store1";
+    StorageEngine store1Engine = mock(StorageEngine.class);
+    StoreProperties mockStore1Properties = mock(StoreProperties.class);
+    when(store1Engine.getStoreProperties()).thenReturn(mockStore1Properties);
+    when(mockStore1Properties.isLoggedStore()).thenReturn(true);
+    when(mockStore1Properties.isPersistedToDisk()).thenReturn(true);
+    Map<String, StorageEngine> mockStoreEngines = ImmutableMap.of(store1Name, store1Engine);
+
+    String changelog1SystemName = "system1";
+    String changelog1StreamName = "store1Changelog";
+    SystemStream changelog1SystemStream = new SystemStream(changelog1SystemName, changelog1StreamName);
+    SystemStreamPartition changelog1SSP = new SystemStreamPartition(changelog1SystemStream, taskChangelogPartition);
+    // checkpointed changelog offset is valid
+    SystemStreamPartitionMetadata changelog1SSPMetadata = new SystemStreamPartitionMetadata("4", "20", "21");
+    Map<String, SystemStream> mockStoreChangelogs = ImmutableMap.of(store1Name, changelog1SystemStream);
+
+    String changelog1CheckpointedOffset = "5";
+    CheckpointId checkpointId = CheckpointId.fromString("0-0"); // checkpoint timestamp older than default min compaction lag
+    CheckpointedChangelogOffset changelog1CheckpointMessage =
+        new CheckpointedChangelogOffset(checkpointId, changelog1CheckpointedOffset);
+    Map<SystemStreamPartition, String> mockCheckpointedChangelogOffset =
+        new HashMap<SystemStreamPartition, String>() { {
+          put(changelog1SSP, changelog1CheckpointMessage.toString());
+        } };
+    Map<SystemStreamPartition, SystemStreamPartitionMetadata> mockCurrentChangelogOffsets =
+        ImmutableMap.of(changelog1SSP, changelog1SSPMetadata);
+
+    SystemAdmins mockSystemAdmins = mock(SystemAdmins.class);
+    SystemAdmin mockSystemAdmin = mock(SystemAdmin.class);
+    when(mockSystemAdmins.getSystemAdmin(changelog1SSP.getSystem())).thenReturn(mockSystemAdmin);
+    StorageManagerUtil mockStorageManagerUtil = mock(StorageManagerUtil.class);
+    File mockLoggedStoreBaseDir = mock(File.class);
+    File mockNonLoggedStoreBaseDir = mock(File.class);
+    Config mockConfig = mock(Config.class);
+    Clock mockClock = mock(Clock.class);
+
+    File mockCurrentStoreDir = mock(File.class);
+    File mockStoreNewerCheckpointDir = mock(File.class);
+    File mockStoreOlderCheckpointDir = mock(File.class);
+    String olderCheckpointDirLocalOffset = "3";
+    String newerCheckpointDirLocalOffset = "5";
+    when(mockStorageManagerUtil.getTaskStoreDir(eq(mockLoggedStoreBaseDir), eq(store1Name), eq(taskName), any()))
+        .thenReturn(mockCurrentStoreDir);
+    when(mockStorageManagerUtil.getTaskStoreCheckpointDirs(eq(mockLoggedStoreBaseDir), eq(store1Name), eq(taskName), any()))
+        .thenReturn(ImmutableList.of(mockStoreNewerCheckpointDir, mockStoreOlderCheckpointDir));
+    when(mockStorageManagerUtil.isLoggedStoreValid(eq(store1Name), eq(mockStoreNewerCheckpointDir), any(),
+        eq(mockStoreChangelogs), eq(mockTaskModel), any(), eq(mockStoreEngines))).thenReturn(true);
+    when(mockStorageManagerUtil.isLoggedStoreValid(eq(store1Name), eq(mockStoreOlderCheckpointDir), any(),
+        eq(mockStoreChangelogs), eq(mockTaskModel), any(), eq(mockStoreEngines))).thenReturn(true);
+    Set<SystemStreamPartition> mockChangelogSSPs = ImmutableSet.of(changelog1SSP);
+    when(mockStorageManagerUtil.readOffsetFile(eq(mockStoreNewerCheckpointDir), eq(mockChangelogSSPs), eq(false)))
+        .thenReturn(ImmutableMap.of(changelog1SSP, newerCheckpointDirLocalOffset));
+    when(mockStorageManagerUtil.readOffsetFile(eq(mockStoreOlderCheckpointDir), eq(mockChangelogSSPs), eq(false)))
+        .thenReturn(ImmutableMap.of(changelog1SSP, olderCheckpointDirLocalOffset)); // less than checkpointed offset (5)
+
+    Mockito.when(mockSystemAdmin.offsetComparator(anyString(), anyString()))
+        .thenAnswer((Answer<Integer>) invocation -> {
+            String offset1 = (String) invocation.getArguments()[0];
+            String offset2 = (String) invocation.getArguments()[1];
+            return Long.valueOf(offset1).compareTo(Long.valueOf(offset2));
+          });
+
+    StoreActions storeActions = TransactionalStateTaskRestoreManager.getStoreActions(
+        mockTaskModel, mockStoreEngines, mockStoreChangelogs, mockCheckpointedChangelogOffset,
+        mockCurrentChangelogOffsets, mockSystemAdmins, mockStorageManagerUtil,
+        mockLoggedStoreBaseDir, mockNonLoggedStoreBaseDir, mockConfig, mockClock);
+
+    // ensure that the current store dir and older checkpoint dir are marked for deletion
+    assertEquals(2, storeActions.storeDirsToDelete.get(store1Name).size());
+    assertTrue(storeActions.storeDirsToDelete.get(store1Name).contains(mockCurrentStoreDir));
+    assertTrue(storeActions.storeDirsToDelete.get(store1Name).contains(mockStoreOlderCheckpointDir));
+    // ensure that newer checkpoint dir is retained
+    assertEquals(1, storeActions.storeDirsToRetain.size());
+    assertEquals(mockStoreNewerCheckpointDir, storeActions.storeDirsToRetain.get(store1Name));
+    // ensure that we mark the store for restore to head (from local checkpoint to current newest)
+    assertEquals("5", storeActions.storesToRestore.get(store1Name).startingOffset);
+    assertEquals("20", storeActions.storesToRestore.get(store1Name).endingOffset);
+  }
+
+  /**
    * This can happen if the changelog topic was manually deleted and recreated, and the checkpointed/local changelog
    * offset is not valid anymore.
    */
@@ -1431,9 +1624,11 @@ public class TestTransactionalStateTaskRestoreManager {
     Map<String, SystemStream> mockStoreChangelogs = ImmutableMap.of(store1Name, changelog1SystemStream);
 
     String changelog1CheckpointedOffset = "21";
+    CheckpointedChangelogOffset changelog1CheckpointMessage =
+        new CheckpointedChangelogOffset(CheckpointId.create(), changelog1CheckpointedOffset);
     Map<SystemStreamPartition, String> mockCheckpointedChangelogOffset =
         new HashMap<SystemStreamPartition, String>() { {
-          put(changelog1SSP, changelog1CheckpointedOffset);
+          put(changelog1SSP, changelog1CheckpointMessage.toString());
         } };
     Map<SystemStreamPartition, SystemStreamPartitionMetadata> mockCurrentChangelogOffsets =
         ImmutableMap.of(changelog1SSP, changelog1SSPMetadata);
diff --git a/samza-core/src/test/scala/org/apache/samza/container/TestTaskInstance.scala b/samza-core/src/test/scala/org/apache/samza/container/TestTaskInstance.scala
index 85cccf8..a54ae72 100644
--- a/samza-core/src/test/scala/org/apache/samza/container/TestTaskInstance.scala
+++ b/samza-core/src/test/scala/org/apache/samza/container/TestTaskInstance.scala
@@ -22,7 +22,7 @@ package org.apache.samza.container
 import java.util.Collections
 
 import org.apache.samza.{Partition, SamzaException}
-import org.apache.samza.checkpoint.{Checkpoint, OffsetManager}
+import org.apache.samza.checkpoint.{Checkpoint, CheckpointedChangelogOffset, OffsetManager}
 import org.apache.samza.config.MapConfig
 import org.apache.samza.context.{TaskContext => _, _}
 import org.apache.samza.job.model.TaskModel
@@ -216,10 +216,9 @@ class TestTaskInstance extends AssertionsForJUnit with MockitoSugar {
     val inputOffsets = new Checkpoint(Map(SYSTEM_STREAM_PARTITION -> "4").asJava)
     val changelogSSP = new SystemStreamPartition(new SystemStream(SYSTEM_NAME, "test-changelog-stream"), new Partition(0))
     val changelogOffsets = Map(changelogSSP -> Some("5"))
-    val checkpointId = "1234"
     when(this.offsetManager.buildCheckpoint(TASK_NAME)).thenReturn(inputOffsets)
     when(this.taskStorageManager.flush()).thenReturn(changelogOffsets)
-    when(this.taskStorageManager.checkpoint(any[Map[SystemStreamPartition, Option[String]]])).thenReturn(checkpointId)
+    doNothing().when(this.taskStorageManager).checkpoint(any(), any[Map[SystemStreamPartition, Option[String]]])
     taskInstance.commit
 
     val mockOrder = inOrder(this.offsetManager, this.collector, this.taskTableManager, this.taskStorageManager)
@@ -238,7 +237,7 @@ class TestTaskInstance extends AssertionsForJUnit with MockitoSugar {
     mockOrder.verify(this.taskStorageManager).flush()
 
     // Stores checkpoints should be created next with the newest changelog offsets
-    mockOrder.verify(this.taskStorageManager).checkpoint(changelogOffsets)
+    mockOrder.verify(this.taskStorageManager).checkpoint(any(), Matchers.eq(changelogOffsets))
 
     // Input checkpoint should be written with the snapshot captured at the beginning of commit and the
     // newest changelog offset captured during storage manager flush
@@ -246,10 +245,10 @@ class TestTaskInstance extends AssertionsForJUnit with MockitoSugar {
     mockOrder.verify(offsetManager).writeCheckpoint(any(), captor.capture)
     val cp = captor.getValue
     assertEquals("4", cp.getOffsets.get(SYSTEM_STREAM_PARTITION))
-    assertEquals("5", cp.getOffsets.get(changelogSSP))
+    assertEquals("5", CheckpointedChangelogOffset.fromString(cp.getOffsets.get(changelogSSP)).getOffset)
 
     // Old checkpointed stores should be cleared
-    mockOrder.verify(this.taskStorageManager).removeOldCheckpoints(checkpointId)
+    mockOrder.verify(this.taskStorageManager).removeOldCheckpoints(any())
     verify(commitsCounter).inc()
   }
 
@@ -269,7 +268,10 @@ class TestTaskInstance extends AssertionsForJUnit with MockitoSugar {
     verify(offsetManager).writeCheckpoint(any(), captor.capture)
     val cp = captor.getValue
     assertEquals("4", cp.getOffsets.get(SYSTEM_STREAM_PARTITION))
-    assertEquals(null, cp.getOffsets.get(changelogSSP))
+    val message = cp.getOffsets.get(changelogSSP)
+    val checkpointedOffset = CheckpointedChangelogOffset.fromString(message)
+    assertNull(checkpointedOffset.getOffset)
+    assertNotNull(checkpointedOffset.getCheckpointId)
     verify(commitsCounter).inc()
   }
 
@@ -320,7 +322,7 @@ class TestTaskInstance extends AssertionsForJUnit with MockitoSugar {
     val inputOffsets = new Checkpoint(Map(SYSTEM_STREAM_PARTITION -> "4").asJava)
     when(this.offsetManager.buildCheckpoint(TASK_NAME)).thenReturn(inputOffsets)
     when(this.taskStorageManager.flush()).thenReturn(Map[SystemStreamPartition, Option[String]]())
-    when(this.taskStorageManager.checkpoint(any())).thenThrow(new SamzaException("Error creating store checkpoint"))
+    when(this.taskStorageManager.checkpoint(any(), any())).thenThrow(new SamzaException("Error creating store checkpoint"))
 
     try {
       taskInstance.commit
@@ -341,8 +343,8 @@ class TestTaskInstance extends AssertionsForJUnit with MockitoSugar {
     val inputOffsets = new Checkpoint(Map(SYSTEM_STREAM_PARTITION -> "4").asJava)
     when(this.offsetManager.buildCheckpoint(TASK_NAME)).thenReturn(inputOffsets)
     when(this.taskStorageManager.flush()).thenReturn(Map[SystemStreamPartition, Option[String]]())
-    when(this.taskStorageManager.checkpoint(any())).thenReturn("id")
-    when(this.taskStorageManager.removeOldCheckpoints("id"))
+    doNothing().when(this.taskStorageManager).checkpoint(any(), any())
+    when(this.taskStorageManager.removeOldCheckpoints(any()))
       .thenThrow(new SamzaException("Error clearing old checkpoints"))
 
     try {
diff --git a/samza-core/src/test/scala/org/apache/samza/storage/TestContainerStorageManager.java b/samza-core/src/test/scala/org/apache/samza/storage/TestContainerStorageManager.java
index 8f0fbb3..43872e9 100644
--- a/samza-core/src/test/scala/org/apache/samza/storage/TestContainerStorageManager.java
+++ b/samza-core/src/test/scala/org/apache/samza/storage/TestContainerStorageManager.java
@@ -171,7 +171,7 @@ public class TestContainerStorageManager {
     configMap.put("stores." + STORE_NAME + ".key.serde", "stringserde");
     configMap.put("stores." + STORE_NAME + ".msg.serde", "stringserde");
     configMap.put("serializers.registry.stringserde.class", StringSerdeFactory.class.getName());
-    configMap.put(TaskConfig.TRANSACTIONAL_STATE_RETAIN_EXISTING_CHANGELOG_STATE, "true");
+    configMap.put(TaskConfig.TRANSACTIONAL_STATE_RETAIN_EXISTING_STATE, "true");
     Config config = new MapConfig(configMap);
 
     Map<String, Serde<Object>> serdes = new HashMap<>();
diff --git a/samza-core/src/test/scala/org/apache/samza/storage/TestTransactionalStateTaskStorageManager.java b/samza-core/src/test/scala/org/apache/samza/storage/TestTransactionalStateTaskStorageManager.java
index 69a2379..f2d4972 100644
--- a/samza-core/src/test/scala/org/apache/samza/storage/TestTransactionalStateTaskStorageManager.java
+++ b/samza-core/src/test/scala/org/apache/samza/storage/TestTransactionalStateTaskStorageManager.java
@@ -32,6 +32,7 @@ import java.util.HashMap;
 import java.util.Optional;
 import org.apache.samza.Partition;
 import org.apache.samza.SamzaException;
+import org.apache.samza.checkpoint.CheckpointId;
 import org.apache.samza.container.TaskName;
 import org.apache.samza.job.model.TaskMode;
 import org.apache.samza.system.SystemAdmin;
@@ -49,7 +50,6 @@ import static org.junit.Assert.assertFalse;
 import static org.junit.Assert.fail;
 import static org.mockito.Matchers.any;
 import static org.mockito.Matchers.anyBoolean;
-import static org.mockito.Matchers.anyString;
 import static org.mockito.Matchers.eq;
 import static org.mockito.Mockito.doNothing;
 import static org.mockito.Mockito.doReturn;
@@ -272,7 +272,7 @@ public class TestTransactionalStateTaskStorageManager {
     when(lpStoreProps.isPersistedToDisk()).thenReturn(true);
     when(lpStoreProps.isLoggedStore()).thenReturn(true);
     Path mockPath = mock(Path.class);
-    when(mockLPStore.checkpoint(anyString())).thenReturn(Optional.of(mockPath));
+    when(mockLPStore.checkpoint(any())).thenReturn(Optional.of(mockPath));
 
     StorageEngine mockPStore = mock(StorageEngine.class);
     StoreProperties pStoreProps = mock(StoreProperties.class);
@@ -309,14 +309,14 @@ public class TestTransactionalStateTaskStorageManager {
         ImmutableMap.of(mock(SystemStreamPartition.class), Option.apply("1")));
 
     // invoke checkpoint
-    tsm.checkpoint(offsets);
+    tsm.checkpoint(CheckpointId.create(), offsets);
 
     // ensure that checkpoint is never called for non-logged persistent stores since they're
     // always cleared on restart.
-    verify(mockPStore, never()).checkpoint(anyString());
+    verify(mockPStore, never()).checkpoint(any());
     // ensure that checkpoint is never called for in-memory stores since they're not persistent.
-    verify(mockIStore, never()).checkpoint(anyString());
-    verify(mockLIStore, never()).checkpoint(anyString());
+    verify(mockIStore, never()).checkpoint(any());
+    verify(mockLIStore, never()).checkpoint(any());
     verify(tsm).writeChangelogOffsetFiles(checkpointPathsCaptor.capture(), any(), eq(offsets));
     Map<String, Path> checkpointPaths = checkpointPathsCaptor.getValue();
     assertEquals(1, checkpointPaths.size());
@@ -332,7 +332,7 @@ public class TestTransactionalStateTaskStorageManager {
     when(mockLPStore.getStoreProperties()).thenReturn(lpStoreProps);
     when(lpStoreProps.isPersistedToDisk()).thenReturn(true);
     when(lpStoreProps.isLoggedStore()).thenReturn(true);
-    when(mockLPStore.checkpoint(anyString())).thenThrow(new IllegalStateException());
+    when(mockLPStore.checkpoint(any())).thenThrow(new IllegalStateException());
     java.util.Map<String, StorageEngine> taskStores =
         ImmutableMap.of("loggedPersistentStore", mockLPStore);
     when(csm.getAllStores(any())).thenReturn(taskStores);
@@ -343,7 +343,7 @@ public class TestTransactionalStateTaskStorageManager {
         ImmutableMap.of(mock(SystemStreamPartition.class), Option.apply("1")));
 
     // invoke checkpoint
-    tsm.checkpoint(offsets);
+    tsm.checkpoint(CheckpointId.create(), offsets);
     verify(tsm, never()).writeChangelogOffsetFiles(any(), any(), any());
     fail("Should have thrown an exception if error creating store checkpoint");
   }
@@ -358,7 +358,7 @@ public class TestTransactionalStateTaskStorageManager {
     when(lpStoreProps.isPersistedToDisk()).thenReturn(true);
     when(lpStoreProps.isLoggedStore()).thenReturn(true);
     Path mockPath = mock(Path.class);
-    when(mockLPStore.checkpoint(anyString())).thenReturn(Optional.of(mockPath));
+    when(mockLPStore.checkpoint(any())).thenReturn(Optional.of(mockPath));
     java.util.Map<String, StorageEngine> taskStores =
         ImmutableMap.of("loggedPersistentStore", mockLPStore);
     when(csm.getAllStores(any())).thenReturn(taskStores);
@@ -371,7 +371,7 @@ public class TestTransactionalStateTaskStorageManager {
         ImmutableMap.of(mock(SystemStreamPartition.class), Option.apply("1")));
 
     // invoke checkpoint
-    tsm.checkpoint(offsets);
+    tsm.checkpoint(CheckpointId.create(), offsets);
 
     fail("Should have thrown an exception if error writing offset file.");
   }
diff --git a/samza-kv-inmemory/src/main/scala/org/apache/samza/storage/kv/inmemory/InMemoryKeyValueStore.scala b/samza-kv-inmemory/src/main/scala/org/apache/samza/storage/kv/inmemory/InMemoryKeyValueStore.scala
index d482c15..2d26d29 100644
--- a/samza-kv-inmemory/src/main/scala/org/apache/samza/storage/kv/inmemory/InMemoryKeyValueStore.scala
+++ b/samza-kv-inmemory/src/main/scala/org/apache/samza/storage/kv/inmemory/InMemoryKeyValueStore.scala
@@ -25,6 +25,8 @@ import java.nio.file.Path
 import java.util
 import java.util.Optional
 
+import org.apache.samza.checkpoint.CheckpointId
+
 /**
  * In memory implementation of a key value store.
  *
@@ -127,7 +129,7 @@ class InMemoryKeyValueStore(val metrics: KeyValueStoreMetrics = new KeyValueStor
     }
   }
 
-  override def checkpoint(id: String): Optional[Path] = {
+  override def checkpoint(id: CheckpointId): Optional[Path] = {
     // No checkpoint being persisted. State restores from Changelog.
     Optional.empty()
   }
diff --git a/samza-kv-rocksdb/src/main/scala/org/apache/samza/storage/kv/RocksDbKeyValueStore.scala b/samza-kv-rocksdb/src/main/scala/org/apache/samza/storage/kv/RocksDbKeyValueStore.scala
index a2ae8b0..300177a 100644
--- a/samza-kv-rocksdb/src/main/scala/org/apache/samza/storage/kv/RocksDbKeyValueStore.scala
+++ b/samza-kv-rocksdb/src/main/scala/org/apache/samza/storage/kv/RocksDbKeyValueStore.scala
@@ -21,15 +21,13 @@ package org.apache.samza.storage.kv
 
 import java.io.File
 import java.nio.file.{Path, Paths}
-import java.util
-import java.util.{Comparator, Optional}
 import java.util.concurrent.TimeUnit
 import java.util.concurrent.locks.ReentrantReadWriteLock
+import java.util.{Comparator, Optional}
 
-import org.apache.commons.io.FileUtils
-import org.apache.samza.{SamzaException, checkpoint}
+import org.apache.samza.SamzaException
+import org.apache.samza.checkpoint.CheckpointId
 import org.apache.samza.config.Config
-import org.apache.samza.serializers.CheckpointSerde
 import org.apache.samza.util.Logging
 import org.rocksdb.{TtlDB, _}
 
@@ -239,9 +237,9 @@ class RocksDbKeyValueStore(
     trace("Flushed store: %s" format storeName)
   }
 
-  override def checkpoint(id: String): Optional[Path] = {
+  override def checkpoint(id: CheckpointId): Optional[Path] = {
     val checkpoint = Checkpoint.create(db)
-    val checkpointPath = dir.getPath + "-" + id
+    val checkpointPath = dir.getPath + "-" + id.toString
     checkpoint.createCheckpoint(checkpointPath)
     Optional.of(Paths.get(checkpointPath))
   }
diff --git a/samza-kv/src/main/java/org/apache/samza/storage/kv/LargeMessageSafeStore.java b/samza-kv/src/main/java/org/apache/samza/storage/kv/LargeMessageSafeStore.java
index 7e514e7..177a986 100644
--- a/samza-kv/src/main/java/org/apache/samza/storage/kv/LargeMessageSafeStore.java
+++ b/samza-kv/src/main/java/org/apache/samza/storage/kv/LargeMessageSafeStore.java
@@ -22,6 +22,7 @@ import java.nio.file.Path;
 import java.util.ArrayList;
 import java.util.List;
 import java.util.Optional;
+import org.apache.samza.checkpoint.CheckpointId;
 import org.apache.samza.metrics.MetricsRegistryMap;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
@@ -126,7 +127,7 @@ public class LargeMessageSafeStore implements KeyValueStore<byte[], byte[]> {
   }
 
   @Override
-  public Optional<Path> checkpoint(String id) {
+  public Optional<Path> checkpoint(CheckpointId id) {
     return store.checkpoint(id);
   }
 
diff --git a/samza-kv/src/main/scala/org/apache/samza/storage/kv/AccessLoggedStore.scala b/samza-kv/src/main/scala/org/apache/samza/storage/kv/AccessLoggedStore.scala
index ace7aa5..8c32793 100644
--- a/samza-kv/src/main/scala/org/apache/samza/storage/kv/AccessLoggedStore.scala
+++ b/samza-kv/src/main/scala/org/apache/samza/storage/kv/AccessLoggedStore.scala
@@ -24,6 +24,7 @@ import java.nio.file.Path
 import java.util
 import java.util.Optional
 
+import org.apache.samza.checkpoint.CheckpointId
 import org.apache.samza.config.StorageConfig
 import org.apache.samza.task.MessageCollector
 import org.apache.samza.util.Logging
@@ -163,7 +164,7 @@ class AccessLoggedStore[K, V](
     bytes
   }
 
-  override def checkpoint(id: String): Optional[Path] = {
+  override def checkpoint(id: CheckpointId): Optional[Path] = {
     store.checkpoint(id)
   }
 }
diff --git a/samza-kv/src/main/scala/org/apache/samza/storage/kv/CachedStore.scala b/samza-kv/src/main/scala/org/apache/samza/storage/kv/CachedStore.scala
index 41d2d9f..5c1961c 100644
--- a/samza-kv/src/main/scala/org/apache/samza/storage/kv/CachedStore.scala
+++ b/samza-kv/src/main/scala/org/apache/samza/storage/kv/CachedStore.scala
@@ -20,10 +20,13 @@
 package org.apache.samza.storage.kv
 
 import org.apache.samza.util.Logging
+
 import scala.collection._
 import java.nio.file.Path
 import java.util.{Arrays, Optional}
 
+import org.apache.samza.checkpoint.CheckpointId
+
 /**
  * A write-behind caching layer around the rocksdb store. The purpose of this cache is three-fold:
  * 1. Batch together writes to rocksdb, this turns out to be a great optimization
@@ -293,7 +296,7 @@ class CachedStore[K, V](
     store.snapshot(from, to)
   }
 
-  override def checkpoint(id: String): Optional[Path] = {
+  override def checkpoint(id: CheckpointId): Optional[Path] = {
     store.checkpoint(id)
   }
 }
diff --git a/samza-kv/src/main/scala/org/apache/samza/storage/kv/KeyValueStorageEngine.scala b/samza-kv/src/main/scala/org/apache/samza/storage/kv/KeyValueStorageEngine.scala
index 61ff059..bc6778e 100644
--- a/samza-kv/src/main/scala/org/apache/samza/storage/kv/KeyValueStorageEngine.scala
+++ b/samza-kv/src/main/scala/org/apache/samza/storage/kv/KeyValueStorageEngine.scala
@@ -29,6 +29,8 @@ import org.apache.samza.util.TimerUtil
 import java.nio.file.Path
 import java.util.Optional
 
+import org.apache.samza.checkpoint.CheckpointId
+
 /**
  * A key value store.
  *
@@ -208,7 +210,7 @@ class KeyValueStorageEngine[K, V](
     }
   }
 
-  def checkpoint(id: String): Optional[Path] = {
+  def checkpoint(id: CheckpointId): Optional[Path] = {
     updateTimer(metrics.checkpointNs) {
       trace("Checkpointing.")
       metrics.checkpoints.inc
diff --git a/samza-kv/src/main/scala/org/apache/samza/storage/kv/LoggedStore.scala b/samza-kv/src/main/scala/org/apache/samza/storage/kv/LoggedStore.scala
index 4c238bb..320e801 100644
--- a/samza-kv/src/main/scala/org/apache/samza/storage/kv/LoggedStore.scala
+++ b/samza-kv/src/main/scala/org/apache/samza/storage/kv/LoggedStore.scala
@@ -22,6 +22,7 @@ package org.apache.samza.storage.kv
 import java.nio.file.Path
 import java.util.Optional
 
+import org.apache.samza.checkpoint.CheckpointId
 import org.apache.samza.util.Logging
 import org.apache.samza.system.{OutgoingMessageEnvelope, SystemStreamPartition}
 import org.apache.samza.task.MessageCollector
@@ -121,7 +122,7 @@ class LoggedStore[K, V](
     store.snapshot(from, to)
   }
 
-  override def checkpoint(id: String): Optional[Path] = {
+  override def checkpoint(id: CheckpointId): Optional[Path] = {
     store.checkpoint(id)
   }
 }
\ No newline at end of file
diff --git a/samza-kv/src/main/scala/org/apache/samza/storage/kv/NullSafeKeyValueStore.scala b/samza-kv/src/main/scala/org/apache/samza/storage/kv/NullSafeKeyValueStore.scala
index 3bc4674..8bb6fa2 100644
--- a/samza-kv/src/main/scala/org/apache/samza/storage/kv/NullSafeKeyValueStore.scala
+++ b/samza-kv/src/main/scala/org/apache/samza/storage/kv/NullSafeKeyValueStore.scala
@@ -22,6 +22,8 @@ package org.apache.samza.storage.kv
 import java.nio.file.Path
 import java.util.Optional
 
+import org.apache.samza.checkpoint.CheckpointId
+
 import scala.collection.JavaConverters._
 
 object NullSafeKeyValueStore {
@@ -99,7 +101,7 @@ class NullSafeKeyValueStore[K, V](store: KeyValueStore[K, V]) extends KeyValueSt
     store.snapshot(from, to)
   }
 
-  override def checkpoint(id: String): Optional[Path] = {
+  override def checkpoint(id: CheckpointId): Optional[Path] = {
     store.checkpoint(id)
   }
 }
diff --git a/samza-kv/src/main/scala/org/apache/samza/storage/kv/SerializedKeyValueStore.scala b/samza-kv/src/main/scala/org/apache/samza/storage/kv/SerializedKeyValueStore.scala
index 169452c..5b3456c 100644
--- a/samza-kv/src/main/scala/org/apache/samza/storage/kv/SerializedKeyValueStore.scala
+++ b/samza-kv/src/main/scala/org/apache/samza/storage/kv/SerializedKeyValueStore.scala
@@ -21,6 +21,8 @@ package org.apache.samza.storage.kv
 
 import java.nio.file.Path
 import java.util.Optional
+
+import org.apache.samza.checkpoint.CheckpointId
 import org.apache.samza.util.Logging
 import org.apache.samza.serializers._
 
@@ -166,7 +168,7 @@ class SerializedKeyValueStore[K, V](
     }
   }
 
-  override def checkpoint(id: String): Optional[Path] = {
+  override def checkpoint(id: CheckpointId): Optional[Path] = {
     store.checkpoint(id)
   }
 }
diff --git a/samza-kv/src/test/scala/org/apache/samza/storage/kv/MockKeyValueStore.scala b/samza-kv/src/test/scala/org/apache/samza/storage/kv/MockKeyValueStore.scala
index c20c2c5..c0fc080 100644
--- a/samza-kv/src/test/scala/org/apache/samza/storage/kv/MockKeyValueStore.scala
+++ b/samza-kv/src/test/scala/org/apache/samza/storage/kv/MockKeyValueStore.scala
@@ -24,6 +24,8 @@ import java.util
 import java.nio.file.Path
 import java.util.Optional
 
+import org.apache.samza.checkpoint.CheckpointId
+
 /**
  * A mock key-value store wrapper that handles serialization
  */
@@ -76,7 +78,7 @@ class MockKeyValueStore extends KeyValueStore[String, String] {
     throw new UnsupportedOperationException("iterator() not supported")
   }
 
-  override def checkpoint(id: String): Optional[Path] = {
+  override def checkpoint(id: CheckpointId): Optional[Path] = {
     Optional.empty()
   }
 }
\ No newline at end of file
diff --git a/samza-test/src/test/java/org/apache/samza/storage/kv/TransactionalStateIntegrationTest.java b/samza-test/src/test/java/org/apache/samza/storage/kv/TransactionalStateIntegrationTest.java
index 67a4de8..05e6737 100644
--- a/samza-test/src/test/java/org/apache/samza/storage/kv/TransactionalStateIntegrationTest.java
+++ b/samza-test/src/test/java/org/apache/samza/storage/kv/TransactionalStateIntegrationTest.java
@@ -89,7 +89,7 @@ public class TransactionalStateIntegrationTest extends StreamApplicationIntegrat
       put(TaskConfig.CHECKPOINT_MANAGER_FACTORY, "org.apache.samza.checkpoint.kafka.KafkaCheckpointManagerFactory");
       put(TaskConfig.COMMIT_MS, "-1"); // manual commit only
       put(TaskConfig.TRANSACTIONAL_STATE_RESTORE_ENABLED, "true");
-      put(TaskConfig.TRANSACTIONAL_STATE_RETAIN_EXISTING_CHANGELOG_STATE, "true");
+      put(TaskConfig.TRANSACTIONAL_STATE_RETAIN_EXISTING_STATE, "true");
       put(KafkaConfig.CHECKPOINT_REPLICATION_FACTOR(), "1");
       put(JobConfig.JOB_LOGGED_STORE_BASE_DIR, LOGGED_STORE_BASE_DIR);
     } };
diff --git a/samza-test/src/test/java/org/apache/samza/storage/kv/TransactionalStateMultiStoreIntegrationTest.java b/samza-test/src/test/java/org/apache/samza/storage/kv/TransactionalStateMultiStoreIntegrationTest.java
index 0c678b6..41eb1ab 100644
--- a/samza-test/src/test/java/org/apache/samza/storage/kv/TransactionalStateMultiStoreIntegrationTest.java
+++ b/samza-test/src/test/java/org/apache/samza/storage/kv/TransactionalStateMultiStoreIntegrationTest.java
@@ -88,7 +88,7 @@ public class TransactionalStateMultiStoreIntegrationTest extends StreamApplicati
       put(TaskConfig.CHECKPOINT_MANAGER_FACTORY, "org.apache.samza.checkpoint.kafka.KafkaCheckpointManagerFactory");
       put(TaskConfig.COMMIT_MS, "-1"); // manual commit only
       put(TaskConfig.TRANSACTIONAL_STATE_RESTORE_ENABLED, "true");
-      put(TaskConfig.TRANSACTIONAL_STATE_RETAIN_EXISTING_CHANGELOG_STATE, "true");
+      put(TaskConfig.TRANSACTIONAL_STATE_RETAIN_EXISTING_STATE, "true");
       put(KafkaConfig.CHECKPOINT_REPLICATION_FACTOR(), "1");
       put(JobConfig.JOB_LOGGED_STORE_BASE_DIR, LOGGED_STORE_BASE_DIR);
     } };