You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@samza.apache.org by GitBox <gi...@apache.org> on 2020/09/09 21:48:38 UTC

[GitHub] [samza] dxichen opened a new pull request #1429: SAMZA-2591: API updates for TaskStorageBackupManager

dxichen opened a new pull request #1429:
URL: https://github.com/apache/samza/pull/1429


   **Changes:** Updated TaskStorageBackupManager API in order to accommodate async commit to remote stores ([SAMZA-2591](https://issues.apache.org/jira/browse/SAMZA-2590))
   
   **API Changes:** Internal TaskStorageManager changes
   
   **Tests:** Tested with existing unit tests
   
   **Upgrade Instructions:** None
   
   **Usage Instructions:** None


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [samza] dxichen commented on a change in pull request #1429: SAMZA-2591: API updates for TaskStorageBackupManager

Posted by GitBox <gi...@apache.org>.
dxichen commented on a change in pull request #1429:
URL: https://github.com/apache/samza/pull/1429#discussion_r540379609



##########
File path: samza-api/src/main/java/org/apache/samza/checkpoint/StateCheckpointMarker.java
##########
@@ -17,21 +17,16 @@
  * under the License.
  */
 
-package org.apache.samza.storage
+package org.apache.samza.checkpoint;
 
-import org.apache.samza.checkpoint.CheckpointId
-import org.apache.samza.system.SystemStreamPartition
+import org.apache.samza.storage.StateBackendFactory;
 
-trait TaskStorageManager {
 
-  def getStore(storeName: String): Option[StorageEngine]
-
-  def flush(): Map[SystemStreamPartition, Option[String]]
-
-  def checkpoint(checkpointId: CheckpointId, newestChangelogOffsets: Map[SystemStreamPartition, Option[String]]): Unit
-
-  def removeOldCheckpoints(checkpointId: CheckpointId): Unit
-
-  def stop(): Unit
+/**
+ * Interface for State Checkpoint Marker for all TaskStorageBackupManagers
+ */
+public interface StateCheckpointMarker {
+  String getFactoryName();
 
-}
\ No newline at end of file
+  StateBackendFactory getFactory();

Review comment:
       You are correct, this could be inferred with the name, removed




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [samza] mynameborat commented on pull request #1429: SAMZA-2591: API updates for TaskStorageBackupManager

Posted by GitBox <gi...@apache.org>.
mynameborat commented on pull request #1429:
URL: https://github.com/apache/samza/pull/1429#issuecomment-690569338


   Can you attach the design document to the ticket (parent or the child one) to give more context on this change? Maybe add an excerpt to this PR description too.


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [samza] dxichen commented on a change in pull request #1429: SAMZA-2591: API updates for TaskStorageBackupManager

Posted by GitBox <gi...@apache.org>.
dxichen commented on a change in pull request #1429:
URL: https://github.com/apache/samza/pull/1429#discussion_r540380247



##########
File path: samza-api/src/main/java/org/apache/samza/storage/TaskStorageBackupManager.java
##########
@@ -0,0 +1,76 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.samza.storage
+
+import java.util.Map;
+import java.util.concurrent.CompletableFuture
+
+import org.apache.samza.checkpoint.CheckpointId;
+import org.apache.samza.checkpoint.StateCheckpointMarker;
+
+/**
+ * <p>
+ * TaskStorageBackupManager is the interface that must be implemented for
+ * any remote system that Samza persists its state to. The interface will be
+ * evoked in the following way:
+ * </p>
+ *
+ * <ul>
+ *   <li>Snapshot will be called before Upload.</li>
+ *   <li>persistToFilesystem will be called after Upload is completed</li>
+ *   <li>Cleanup is only called after Upload and persistToFilesystem has successfully completed</li>
+ * </ul>
+ */
+public interface TaskStorageBackupManager {
+
+  /**
+   * Commit operation that is synchronous to processing
+   * @param checkpointId Checkpoint id of the current commit
+   * @return The storename to checkpoint of the snapshotted local store
+   */
+  Map<String, StateCheckpointMarker> snapshot(CheckpointId checkpointId);
+
+  /**
+   * Commit operation that is asynchronous to message processing,
+   * @param checkpointId Checkpoint id of the current commit
+   * @param stateCheckpointMarkers The map of storename to checkpoint makers returned by the snapshot
+   * @return The future of storename to checkpoint map of the uploaded local store
+   */
+  CompletableFuture<Map<String, StateCheckpointMarker>> upload(CheckpointId checkpointId, Map<String, StateCheckpointMarker> stateCheckpointMarkers);
+
+  /**
+   * Persist the state locally to the file system
+   * @param checkpointId The id of the checkpoint to be committed
+   * @param stateCheckpointMarkers Uploaded storename to checkpoints markers to be persisted locally
+   */
+  void persistToFilesystem(CheckpointId checkpointId, Map<String, StateCheckpointMarker> stateCheckpointMarkers);
+
+  /**
+   * Cleanup any local or remote state for obsolete checkpoint information that are older than checkpointId
+   * @param checkpointId The id of the latest successfully committed checkpoint
+   */
+  void cleanUp(CheckpointId checkpointId);
+
+  /**
+   * Used for testing as a shutdown hook to cleanup any allocated resources
+   */
+  void stop();

Review comment:
       Changed to close, added init()




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [samza] dxichen commented on a change in pull request #1429: SAMZA-2591: API updates for TaskStorageBackupManager

Posted by GitBox <gi...@apache.org>.
dxichen commented on a change in pull request #1429:
URL: https://github.com/apache/samza/pull/1429#discussion_r540381378



##########
File path: samza-api/src/main/java/org/apache/samza/storage/TaskStorageAdmin.java
##########
@@ -0,0 +1,25 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.samza.storage;
+
+public interface TaskStorageAdmin {
+
+  void createResources();

Review comment:
       added




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [samza] prateekm commented on a change in pull request #1429: SAMZA-2591: API updates for TaskStorageBackupManager

Posted by GitBox <gi...@apache.org>.
prateekm commented on a change in pull request #1429:
URL: https://github.com/apache/samza/pull/1429#discussion_r594678327



##########
File path: samza-core/src/main/java/org/apache/samza/storage/TaskStorageCommitManager.java
##########
@@ -181,27 +176,26 @@ public void persistToLocalFileSystem(Checkpoint checkpoint) {
   }
 
   /**
-   * // TODO MED dchen: fix documentation
-   * Cleanup the commit state for each of the task backup managers
+   * Cleanup on the commit state from the {@link #commit(CheckpointId)} call. Evokes the cleanup the commit state
+   * for each of the task backup managers. Deletes all the directories of checkpoints older than the
+   * latestCheckpointId.
+   *
    * @param latestCheckpointId CheckpointId of the most recent successful commit
    * @param stateCheckpointMarkers map of map(stateBackendFactoryName to map(storeName to state checkpoint markers) from
-   *                              the latest commit
+   *                               the latest commit
    */
   public void cleanUp(CheckpointId latestCheckpointId, Map<String, Map<String, String>> stateCheckpointMarkers) {
     // Call cleanup on each backup manager
     stateCheckpointMarkers.forEach((factoryName, storeSCMs) -> {
       if (stateBackendToBackupManager.containsKey(factoryName)) {
+        LOG.warn("Ignored cleanup for scm: {} due to unknown factory: {} ", storeSCMs, factoryName);
         TaskBackupManager backupManager = stateBackendToBackupManager.get(factoryName);
-        if (backupManager != null) {
-          backupManager.cleanUp(latestCheckpointId, storeSCMs);
-        }
-        // TODO HIGH dchen when is it ok for backupmanager to be null? should we throw an exception and fail loudly?
+        backupManager.cleanUp(latestCheckpointId, storeSCMs);
       } else {
-        // throw an error and fail instead?
-        LOG.warn("Ignored cleanup for scm: {} due to unknown factory: {} ", storeSCMs, factoryName);
+        throw new SamzaException(String.format("Checkpointed factory %s not found or initiated for task name %s",

Review comment:
       Maybe better to keep this as warn. I.e. what happens when we go from state backends == (kafka, ambry) to ambry only? Let's document the use case when this is acceptable. 




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [samza] prateekm commented on pull request #1429: SAMZA-2591: API updates for TaskStorageBackupManager

Posted by GitBox <gi...@apache.org>.
prateekm commented on pull request #1429:
URL: https://github.com/apache/samza/pull/1429#issuecomment-840691618


   @dxichen Let's close this PR.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [samza] prateekm commented on a change in pull request #1429: SAMZA-2591: API updates for TaskStorageBackupManager

Posted by GitBox <gi...@apache.org>.
prateekm commented on a change in pull request #1429:
URL: https://github.com/apache/samza/pull/1429#discussion_r594679611



##########
File path: samza-api/src/main/java/org/apache/samza/storage/TaskBackupManager.java
##########
@@ -43,32 +45,40 @@
 public interface TaskBackupManager {
 
   /**
-   * Initializes the TaskBackupManager instance
-   * @param checkpoint Last recorded checkpoint from the CheckpointManager or null if no last checkpoint was found
+   * Initializes the TaskBackupManager instance.
+   *
+   * @param checkpoint last recorded checkpoint from the CheckpointManager or null if no last checkpoint was found
    */
   void init(@Nullable Checkpoint checkpoint);
 
   /**
-   * Commit operation that is synchronous to processing
+   *  Snapshot is used to capture the current state of the stores in order to persist it to the backup manager in the
+   *  {@link #upload(CheckpointId, Map)} phase. Performs the commit operation that is synchronous
+   *  to processing. Returns the per store name state snapshotted checkpoints to be used in upload.

Review comment:
       "per store state checkpoint markers"




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [samza] prateekm commented on a change in pull request #1429: SAMZA-2591: API updates for TaskStorageBackupManager

Posted by GitBox <gi...@apache.org>.
prateekm commented on a change in pull request #1429:
URL: https://github.com/apache/samza/pull/1429#discussion_r594678327



##########
File path: samza-core/src/main/java/org/apache/samza/storage/TaskStorageCommitManager.java
##########
@@ -181,27 +176,26 @@ public void persistToLocalFileSystem(Checkpoint checkpoint) {
   }
 
   /**
-   * // TODO MED dchen: fix documentation
-   * Cleanup the commit state for each of the task backup managers
+   * Cleanup on the commit state from the {@link #commit(CheckpointId)} call. Evokes the cleanup the commit state
+   * for each of the task backup managers. Deletes all the directories of checkpoints older than the
+   * latestCheckpointId.
+   *
    * @param latestCheckpointId CheckpointId of the most recent successful commit
    * @param stateCheckpointMarkers map of map(stateBackendFactoryName to map(storeName to state checkpoint markers) from
-   *                              the latest commit
+   *                               the latest commit
    */
   public void cleanUp(CheckpointId latestCheckpointId, Map<String, Map<String, String>> stateCheckpointMarkers) {
     // Call cleanup on each backup manager
     stateCheckpointMarkers.forEach((factoryName, storeSCMs) -> {
       if (stateBackendToBackupManager.containsKey(factoryName)) {
+        LOG.warn("Ignored cleanup for scm: {} due to unknown factory: {} ", storeSCMs, factoryName);
         TaskBackupManager backupManager = stateBackendToBackupManager.get(factoryName);
-        if (backupManager != null) {
-          backupManager.cleanUp(latestCheckpointId, storeSCMs);
-        }
-        // TODO HIGH dchen when is it ok for backupmanager to be null? should we throw an exception and fail loudly?
+        backupManager.cleanUp(latestCheckpointId, storeSCMs);
       } else {
-        // throw an error and fail instead?
-        LOG.warn("Ignored cleanup for scm: {} due to unknown factory: {} ", storeSCMs, factoryName);
+        throw new SamzaException(String.format("Checkpointed factory %s not found or initiated for task name %s",

Review comment:
       Maybe better to keep this as warn. I.e. what happens when we go from (kafka, ambry) -> ambry only? Let's document the use case when this is acceptable. 




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [samza] dxichen commented on a change in pull request #1429: SAMZA-2591: API updates for TaskStorageBackupManager

Posted by GitBox <gi...@apache.org>.
dxichen commented on a change in pull request #1429:
URL: https://github.com/apache/samza/pull/1429#discussion_r540375692



##########
File path: samza-api/src/main/java/org/apache/samza/checkpoint/Checkpoint.java
##########
@@ -55,18 +82,18 @@ public boolean equals(Object o) {
 
     Checkpoint that = (Checkpoint) o;
 
-    if (offsets != null ? !offsets.equals(that.offsets) : that.offsets != null) return false;
-
-    return true;
+    return (checkpointId.equals(that.checkpointId)) &&
+        (Objects.equals(inputOffsets, that.inputOffsets)) &&

Review comment:
       it performs a deep comparison




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [samza] dxichen commented on a change in pull request #1429: SAMZA-2591: API updates for TaskStorageBackupManager

Posted by GitBox <gi...@apache.org>.
dxichen commented on a change in pull request #1429:
URL: https://github.com/apache/samza/pull/1429#discussion_r578000163



##########
File path: samza-api/src/main/java/org/apache/samza/storage/TaskBackupManager.java
##########
@@ -0,0 +1,81 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.samza.storage;
+
+import java.util.Map;
+import java.util.concurrent.CompletableFuture;
+
+import org.apache.samza.checkpoint.CheckpointId;
+import org.apache.samza.checkpoint.StateCheckpointMarker;
+
+/**
+ * <p>
+ * TaskBackupManager is the interface that must be implemented for
+ * any remote system that Samza persists its state to. The interface will be
+ * evoked in the following way:
+ * </p>
+ *
+ * <ul>
+ *   <li>Snapshot will be called before Upload.</li>
+ *   <li>persistToFilesystem will be called after Upload is completed</li>
+ *   <li>Cleanup is only called after Upload and persistToFilesystem has successfully completed</li>
+ * </ul>
+ */
+public interface TaskBackupManager {
+
+  /**
+   * Initiates the TaskBackupManagerIntance
+   */
+  default void init() {}

Review comment:
       The default is because currently the KafkaBackupManagers do not need to be initiated, so I don't think the BackupManager should be forced to implement this

##########
File path: samza-api/src/main/java/org/apache/samza/checkpoint/Checkpoint.java
##########
@@ -30,22 +33,53 @@
  * of restarting a failed container within a running job.
  */
 public class Checkpoint {
-  private final Map<SystemStreamPartition, String> offsets;
+  private static final short PROTOCOL_VERSION = 1;
+  private final CheckpointId checkpointId;
+  private final Map<SystemStreamPartition, String> inputOffsets;
+  private final Map<String, List<StateCheckpointMarker>> stateCheckpoints;
 
   /**
-   * Constructs a new checkpoint based off a map of Samza stream offsets.
-   * @param offsets Map of Samza streams to their current offset.
+   * Constructs a new checkpoint based off a map of Samza stream offsets, using a default checkpoint id
+   * @param inputOffsets Map of Samza streams to their current offset.
    */
-  public Checkpoint(Map<SystemStreamPartition, String> offsets) {
-    this.offsets = offsets;
+  public Checkpoint(Map<SystemStreamPartition, String>  inputOffsets) {
+    this(CheckpointId.getPlaceholderCheckpointId(), inputOffsets, new HashMap<>());

Review comment:
       This is useful when we are using the new reading scheme which looks for the checkpoint id on every checkpoint.

##########
File path: samza-kafka/src/main/scala/org/apache/samza/storage/KafkaNonTransactionalStateTaskBackupManager.scala
##########
@@ -20,62 +20,70 @@
 package org.apache.samza.storage
 
 import java.io._
+import java.util
+import java.util.concurrent.CompletableFuture
 
 import com.google.common.annotations.VisibleForTesting
 import com.google.common.collect.ImmutableSet
-import org.apache.samza.checkpoint.CheckpointId
+import org.apache.samza.checkpoint.kafka.KafkaStateCheckpointMarker
+import org.apache.samza.checkpoint.{CheckpointId, StateCheckpointMarker}
 import org.apache.samza.container.TaskName
 import org.apache.samza.job.model.TaskMode
 import org.apache.samza.system._
 import org.apache.samza.util.Logging
-import org.apache.samza.util.ScalaJavaUtil.JavaOptionals
 import org.apache.samza.{Partition, SamzaException}
 
+import scala.collection.JavaConversions.mapAsScalaMap
 import scala.collection.JavaConverters._
+import scala.collection.mutable
 
 /**
  * Manage all the storage engines for a given task
  */
-class NonTransactionalStateTaskStorageManager(
+class KafkaNonTransactionalStateTaskBackupManager(
   taskName: TaskName,
-  containerStorageManager: ContainerStorageManager,
-  storeChangelogs: Map[String, SystemStream] = Map(),
+  taskStores: util.Map[String, StorageEngine],
+  storeChangelogs: util.Map[String, SystemStream] = new util.HashMap[String, SystemStream](),
   systemAdmins: SystemAdmins,
   loggedStoreBaseDir: File = new File(System.getProperty("user.dir"), "state"),
-  partition: Partition) extends Logging with TaskStorageManager {
+  partition: Partition) extends Logging with TaskBackupManager {
 
   private val storageManagerUtil = new StorageManagerUtil
-  private val persistedStores = containerStorageManager.getAllStores(taskName).asScala
+  private val persistedStores = taskStores.asScala
     .filter { case (storeName, storageEngine) => storageEngine.getStoreProperties.isPersistedToDisk }
 
-  def getStore(storeName: String): Option[StorageEngine] =  JavaOptionals.toRichOptional(containerStorageManager.getStore(taskName, storeName)).toOption
-
-  def flush(): Map[SystemStreamPartition, Option[String]] = {
+  override def snapshot(checkpointId: CheckpointId): util.Map[String, StateCheckpointMarker] = {
     debug("Flushing stores.")
-    containerStorageManager.getAllStores(taskName).asScala.values.foreach(_.flush)
+    taskStores.asScala.values.foreach(_.flush)
     val newestChangelogSSPOffsets = getNewestChangelogSSPOffsets()
-    writeChangelogOffsetFiles(newestChangelogSSPOffsets)
+    writeChangelogOffsetFiles(KafkaStateCheckpointMarker
+      .stateCheckpointMarkerToSSPmap(newestChangelogSSPOffsets))
     newestChangelogSSPOffsets
   }
 
-  override def checkpoint(checkpointId: CheckpointId,
-    newestChangelogOffsets: Map[SystemStreamPartition, Option[String]]): Unit = {}
+  override def upload(checkpointId: CheckpointId,
+    stateCheckpointMarkers: util.Map[String, StateCheckpointMarker]): CompletableFuture[util.Map[String, StateCheckpointMarker]] = {
+     CompletableFuture.completedFuture(stateCheckpointMarkers)
+  }
+
+  override def persistToFilesystem(checkpointId: CheckpointId,

Review comment:
       it was originally in the snapshot call above, moved it to this method

##########
File path: samza-api/src/main/java/org/apache/samza/checkpoint/KafkaStateChangelogOffset.java
##########
@@ -26,57 +26,58 @@
  * Checkpointed changelog offset has the format: [checkpointId, offset], separated by a colon.
  */
 @InterfaceStability.Unstable
-public class CheckpointedChangelogOffset {
+public class KafkaStateChangelogOffset {

Review comment:
       This is kept for backwards compatibility, the old checkpoints in the kafka topic will be using this format

##########
File path: samza-kafka/src/main/java/org/apache/samza/storage/KafkaChangelogStateBackendFactory.java
##########
@@ -0,0 +1,127 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.samza.storage;
+
+import java.io.File;
+import java.time.Duration;
+import java.util.Collections;
+import java.util.Map;
+import java.util.Set;
+import org.apache.samza.SamzaException;
+import org.apache.samza.config.Config;
+import org.apache.samza.config.JobConfig;
+import org.apache.samza.config.StorageConfig;
+import org.apache.samza.config.TaskConfig;
+import org.apache.samza.container.SamzaContainer;
+import org.apache.samza.job.model.ContainerModel;
+import org.apache.samza.job.model.JobModel;
+import org.apache.samza.job.model.TaskModel;
+import org.apache.samza.system.SSPMetadataCache;
+import org.apache.samza.system.StreamMetadataCache;
+import org.apache.samza.system.SystemAdmins;
+import org.apache.samza.system.SystemStream;
+import org.apache.samza.system.SystemStreamPartition;
+import org.apache.samza.util.Clock;
+
+
+public class KafkaChangelogStateBackendFactory implements StateBackendFactory {
+  private static StreamMetadataCache streamCache;
+  private static SSPMetadataCache sspCache;
+
+  private static StreamMetadataCache getStreamCache(SystemAdmins admins, Clock clock) {
+    if (streamCache == null) {
+      streamCache = new StreamMetadataCache(admins, 5000, clock);

Review comment:
       Good point, since this is called for the SamzaContainer from a single thread, it does not need to be thread safe. May change with further pathces

##########
File path: samza-kafka/src/main/java/org/apache/samza/storage/KafkaChangelogStateBackendFactory.java
##########
@@ -0,0 +1,127 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.samza.storage;
+
+import java.io.File;
+import java.time.Duration;
+import java.util.Collections;
+import java.util.Map;
+import java.util.Set;
+import org.apache.samza.SamzaException;
+import org.apache.samza.config.Config;
+import org.apache.samza.config.JobConfig;
+import org.apache.samza.config.StorageConfig;
+import org.apache.samza.config.TaskConfig;
+import org.apache.samza.container.SamzaContainer;
+import org.apache.samza.job.model.ContainerModel;
+import org.apache.samza.job.model.JobModel;
+import org.apache.samza.job.model.TaskModel;
+import org.apache.samza.system.SSPMetadataCache;
+import org.apache.samza.system.StreamMetadataCache;
+import org.apache.samza.system.SystemAdmins;
+import org.apache.samza.system.SystemStream;
+import org.apache.samza.system.SystemStreamPartition;
+import org.apache.samza.util.Clock;
+
+
+public class KafkaChangelogStateBackendFactory implements StateBackendFactory {
+  private static StreamMetadataCache streamCache;
+  private static SSPMetadataCache sspCache;
+
+  private static StreamMetadataCache getStreamCache(SystemAdmins admins, Clock clock) {
+    if (streamCache == null) {
+      streamCache = new StreamMetadataCache(admins, 5000, clock);
+    }
+    return streamCache;
+  }
+
+  private static SSPMetadataCache getSspCache(SystemAdmins admins, Clock clock, Set<SystemStreamPartition> ssps) {
+    if (sspCache == null) {
+      sspCache = new SSPMetadataCache(admins, Duration.ofSeconds(5), clock, ssps);
+    }
+    return sspCache;
+  }
+
+  @Override
+  public TaskBackupManager getBackupManager(JobModel jobModel, ContainerModel containerModel, TaskModel taskModel,
+      Map<String, StorageEngine> taskStores, Config config, Clock clock) {
+    SystemAdmins systemAdmins = new SystemAdmins(config);
+    StorageConfig storageConfig = new StorageConfig(config);
+    Map<String, SystemStream> storeChangelogs = storageConfig.getStoreChangelogs();
+
+    File defaultFileDir = new File(System.getProperty("user.dir"), "state");
+    File loggedStoreBaseDir = SamzaContainer.getLoggedStorageBaseDir(new JobConfig(config), defaultFileDir);
+
+    if (new TaskConfig(config).getTransactionalStateCheckpointEnabled()) {
+      return new KafkaTransactionalStateTaskBackupManager(taskModel.getTaskName(),
+          taskStores, storeChangelogs, systemAdmins, loggedStoreBaseDir, taskModel.getChangelogPartition(),
+          taskModel.getTaskMode(), new StorageManagerUtil());
+    } else {
+      return new KafkaNonTransactionalStateTaskBackupManager(taskModel.getTaskName(),
+          taskStores, storeChangelogs, systemAdmins, loggedStoreBaseDir, taskModel.getChangelogPartition());
+    }
+  }
+
+  @Override
+  public TaskRestoreManager getRestoreManager(JobModel jobModel, ContainerModel containerModel, TaskModel taskModel, Map<String, StorageEngine> taskStores, Config config, Clock clock) {
+    SystemAdmins systemAdmins = new SystemAdmins(config);
+    Map<String, SystemStream> storeChangelogs = new StorageConfig(config).getStoreChangelogs();
+    Map<String, SystemStream> filteredStoreChangelogs = ContainerStorageManager
+        .getChangelogSystemStreams(containerModel, storeChangelogs, null);
+
+    File defaultFileDir = new File(System.getProperty("user.dir"), "state");
+    File loggedStoreBaseDir = SamzaContainer.getLoggedStorageBaseDir(new JobConfig(config), defaultFileDir);
+    File nonLoggedStoreBaseDir = SamzaContainer.getNonLoggedStorageBaseDir(new JobConfig(config), defaultFileDir);
+
+    if (new TaskConfig(config).getTransactionalStateRestoreEnabled()) {
+      return new TransactionalStateTaskRestoreManager(
+          taskModel,
+          taskStores,
+          filteredStoreChangelogs,
+          systemAdmins,
+          null, // TODO @dchen have the restore managers create and manage Kafka consume lifecycle
+          KafkaChangelogStateBackendFactory
+              .getSspCache(systemAdmins, clock, Collections.emptySet()),
+          loggedStoreBaseDir,
+          nonLoggedStoreBaseDir,
+          config,
+          clock
+      );
+    } else {
+      return new NonTransactionalStateTaskRestoreManager(
+          taskModel,
+          filteredStoreChangelogs,
+          taskStores,
+          systemAdmins,
+          KafkaChangelogStateBackendFactory.getStreamCache(systemAdmins, clock),
+          null, // TODO  @dchen have the restore managers create and manage Kafka consume lifecycle

Review comment:
       This path is not yet used, but will eventually need to when the restore managers are refactored

##########
File path: samza-core/src/main/scala/org/apache/samza/container/TaskInstance.scala
##########
@@ -224,8 +228,8 @@ class TaskInstance(
     val allCheckpointOffsets = new java.util.HashMap[SystemStreamPartition, String]()
     val inputCheckpoint = offsetManager.buildCheckpoint(taskName)
     if (inputCheckpoint != null) {
-      trace("Got input offsets for taskName: %s as: %s" format(taskName, inputCheckpoint.getOffsets))
-      allCheckpointOffsets.putAll(inputCheckpoint.getOffsets)
+      trace("Got input offsets for taskName: %s as: %s" format(taskName, inputCheckpoint.getInputOffsets))
+      allCheckpointOffsets.putAll(inputCheckpoint.getInputOffsets)

Review comment:
       removed allCheckpointOffsets

##########
File path: samza-core/src/main/scala/org/apache/samza/container/TaskInstance.scala
##########
@@ -73,8 +76,9 @@ class TaskInstance(
 
   private val kvStoreSupplier = ScalaJavaUtil.toJavaFunction(
     (storeName: String) => {
-      if (storageManager != null && storageManager.getStore(storeName).isDefined) {
-        storageManager.getStore(storeName).get.asInstanceOf[KeyValueStore[_, _]]
+      if (containerStorageManager != null) {
+        val storeOption = JavaOptionals.toRichOptional(containerStorageManager.getStore(taskName, storeName)).toOption

Review comment:
       removed java optional




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [samza] prateekm commented on a change in pull request #1429: SAMZA-2591: API updates for TaskStorageBackupManager

Posted by GitBox <gi...@apache.org>.
prateekm commented on a change in pull request #1429:
URL: https://github.com/apache/samza/pull/1429#discussion_r594677421



##########
File path: samza-core/src/main/java/org/apache/samza/storage/TaskStorageCommitManager.java
##########
@@ -181,27 +176,26 @@ public void persistToLocalFileSystem(Checkpoint checkpoint) {
   }
 
   /**
-   * // TODO MED dchen: fix documentation
-   * Cleanup the commit state for each of the task backup managers
+   * Cleanup on the commit state from the {@link #commit(CheckpointId)} call. Evokes the cleanup the commit state
+   * for each of the task backup managers. Deletes all the directories of checkpoints older than the
+   * latestCheckpointId.
+   *
    * @param latestCheckpointId CheckpointId of the most recent successful commit
    * @param stateCheckpointMarkers map of map(stateBackendFactoryName to map(storeName to state checkpoint markers) from
-   *                              the latest commit
+   *                               the latest commit
    */
   public void cleanUp(CheckpointId latestCheckpointId, Map<String, Map<String, String>> stateCheckpointMarkers) {
     // Call cleanup on each backup manager
     stateCheckpointMarkers.forEach((factoryName, storeSCMs) -> {
       if (stateBackendToBackupManager.containsKey(factoryName)) {
+        LOG.warn("Ignored cleanup for scm: {} due to unknown factory: {} ", storeSCMs, factoryName);

Review comment:
       Incorrect log?




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [samza] dxichen commented on a change in pull request #1429: SAMZA-2591: API updates for TaskStorageBackupManager

Posted by GitBox <gi...@apache.org>.
dxichen commented on a change in pull request #1429:
URL: https://github.com/apache/samza/pull/1429#discussion_r521827747



##########
File path: samza-core/src/main/scala/org/apache/samza/storage/TaskStorageBackupManager.scala
##########
@@ -22,11 +22,31 @@ package org.apache.samza.storage
 import org.apache.samza.checkpoint.CheckpointId
 import org.apache.samza.system.SystemStreamPartition
 
-trait TaskStorageManager {
+trait TaskStorageBackupManager {

Review comment:
       I have removed getStore and added the TaskCommitManager, let me know what you think




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [samza] dxichen closed pull request #1429: SAMZA-2591: API updates for TaskStorageBackupManager

Posted by GitBox <gi...@apache.org>.
dxichen closed pull request #1429:
URL: https://github.com/apache/samza/pull/1429


   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [samza] dxichen commented on a change in pull request #1429: SAMZA-2591: API updates for TaskStorageBackupManager

Posted by GitBox <gi...@apache.org>.
dxichen commented on a change in pull request #1429:
URL: https://github.com/apache/samza/pull/1429#discussion_r540377717



##########
File path: samza-api/src/main/java/org/apache/samza/checkpoint/KafkaStateChangelogOffset.java
##########
@@ -26,57 +26,58 @@
  * Checkpointed changelog offset has the format: [checkpointId, offset], separated by a colon.
  */
 @InterfaceStability.Unstable
-public class CheckpointedChangelogOffset {
+public class KafkaStateChangelogOffset {
   public static final String SEPARATOR = ":";
 
   private final CheckpointId checkpointId;

Review comment:
       this file will be deprecated in favor of `KafkaStateCheckpointMarker`




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [samza] dxichen commented on pull request #1429: SAMZA-2591: API updates for TaskStorageBackupManager

Posted by GitBox <gi...@apache.org>.
dxichen commented on pull request #1429:
URL: https://github.com/apache/samza/pull/1429#issuecomment-845598775


   Merged to feature branch `state-backend-async-commit` in #1489 #1490 #1491 


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [samza] dxichen commented on a change in pull request #1429: SAMZA-2591: API updates for TaskStorageBackupManager

Posted by GitBox <gi...@apache.org>.
dxichen commented on a change in pull request #1429:
URL: https://github.com/apache/samza/pull/1429#discussion_r521827481



##########
File path: samza-core/src/main/scala/org/apache/samza/storage/KafkaTransactionalStateTaskStorageBackupManager.scala
##########
@@ -39,24 +39,28 @@ import scala.collection.JavaConverters._
 /**
  * Manage all the storage engines for a given task
  */
-class TransactionalStateTaskStorageManager(
+class KafkaTransactionalStateTaskStorageBackupManager(

Review comment:
       We don't foresee any shared logic for now, we will change this in the future if we notice any common paths with the new remote store implementation




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [samza] prateekm commented on a change in pull request #1429: SAMZA-2591: API updates for TaskStorageBackupManager

Posted by GitBox <gi...@apache.org>.
prateekm commented on a change in pull request #1429:
URL: https://github.com/apache/samza/pull/1429#discussion_r585971931



##########
File path: samza-core/src/main/java/org/apache/samza/storage/TransactionalStateTaskRestoreManager.java
##########
@@ -263,10 +245,11 @@ static StoreActions getStoreActions(
       String checkpointedOffset = null;  // can be null if no message, or message has null offset
       long timeSinceLastCheckpointInMs = Long.MAX_VALUE;
       if (StringUtils.isNotBlank(checkpointMessage)) {
+        // TODO HIGH dchen fix Checkpoint version handling with stateCheckpointMarker

Review comment:
       Ideally the CheckpointV1 (with KafkaStateChangelogOffset) -> CheckpointV2 (with KafkaSCM) format conversion will happen in getCheckpointedChangelogOffset, and this method will get a Map<Store Name, Kafka SCM>.

##########
File path: samza-core/src/main/java/org/apache/samza/serializers/CheckpointV2Serde.java
##########
@@ -54,60 +59,65 @@
  *   }
  * }
  * </code>
- *
  */
-public class StatefulCheckpointSerde implements Serde<Checkpoint> {
-  private static final StateCheckpointMarkerSerde STATE_CHECKPOINT_MARKER_SERDE = new StateCheckpointMarkerSerde();
+public class CheckpointV2Serde implements Serde<CheckpointV2> {
+  private static final StateCheckpointMarkerSerde SCM_SERDE = new StateCheckpointMarkerSerde();
+
   private final Serde<JsonCheckpoint> jsonCheckpointSerde;
 
-  public StatefulCheckpointSerde() {
+  public CheckpointV2Serde() {
     this.jsonCheckpointSerde = new JsonSerdeV2<>(JsonCheckpoint.class);
   }
 
   @Override
-  public Checkpoint fromBytes(byte[] bytes) {
+  public CheckpointV2 fromBytes(byte[] bytes) {
     try {
       JsonCheckpoint jsonCheckpoint = jsonCheckpointSerde.fromBytes(bytes);
       Map<SystemStreamPartition, String> sspOffsets = new HashMap<>();
       Map<String, List<StateCheckpointMarker>> stateCheckpoints = new HashMap<>();
 
       jsonCheckpoint.getInputOffsets().forEach((sspName, m) -> {

Review comment:
       Minor: s/m/sspInfo or something similar. Maybe add a comment explaining what it contains and why we serialize it in this format. Also extract constants for the key names.

##########
File path: samza-core/src/main/java/org/apache/samza/checkpoint/kafka/KafkaStateCheckpointMarker.java
##########
@@ -78,28 +77,21 @@ public String getChangelogOffset() {
   /**
    * Builds the SSP to kafka offset mapping from map of store name to list of StateCheckpointMarkers
    * containing a KafkaStateCheckpointMarker
-   * @param storeToStateBackendStateMarkers Map of store name to list of StateCheckpointMarkers containing a KafkaStateCheckpointMarker
+   * @param factoryStateBackendStateMarkersMap Map of store name to list of StateCheckpointMarkers containing a
+   *                                           KafkaStateCheckpointMarker
    * @return Map of ssp to option of Kafka offset
    */
   // TODO HIGH dchen add unit tests, fix javadocs
   public static Map<SystemStreamPartition, Option<String>> scmsToSSPOffsetMap(
-      Map<String, List<StateCheckpointMarker>> storeToStateBackendStateMarkers) {
-    Map<String, String> storeToKafkaStateMarker = new HashMap<>();
-    storeToStateBackendStateMarkers.forEach((storeName, scms) -> {
-      String kafkaStateMarker = null;
-      for (StateCheckpointMarker scm : scms) {
-        if (KafkaChangelogStateBackendFactory.class.getName().equals(scm.getStateBackendFactoryName())) {
-          kafkaStateMarker = scm.getStateCheckpointMarker();
-          break; // there should be only one KafkaStateCheckpointMarker per store
-        }
-      }
-      storeToKafkaStateMarker.put(storeName, kafkaStateMarker);
-    });
-    return scmToSSPOffsetMap(storeToKafkaStateMarker);
+      Map<String, Map<String, String>> factoryStateBackendStateMarkersMap) {
+    return scmToSSPOffsetMap(factoryStateBackendStateMarkersMap
+        .getOrDefault(KAFKA_BACKEND_FACTORY_NAME, null));

Review comment:
       Don't return null collections, return empty collections instead.

##########
File path: samza-core/src/main/scala/org/apache/samza/storage/TaskStorageCommitManager.java
##########
@@ -62,37 +65,60 @@ public void start() {
    * @return Committed StoreName to StateCheckpointMarker mappings of the committed SSPs
    */
   public Map<String, List<StateCheckpointMarker>> commit(TaskName taskName, CheckpointId checkpointId) {
-    Map<String, StateCheckpointMarker> snapshot = storageBackupManager.snapshot(checkpointId);
-    LOG.trace("Returned StateCheckpointMarkers from snapshot: {} for taskName: {} checkpoint id: {}", snapshot, taskName, checkpointId);
-    CompletableFuture<Map<String, StateCheckpointMarker>>
-        uploadFuture = storageBackupManager.upload(checkpointId, snapshot);
-
-    try {
-      // TODO: Make async with andThen and add thread management for concurrency and add timeouts
-      Map<String, StateCheckpointMarker> uploadMap = uploadFuture.get();
-      LOG.trace("Returned StateCheckpointMarkers from upload: {} for taskName: {} checkpoint id: {}", uploadMap, taskName, checkpointId);
-      if (uploadMap != null) {
-        LOG.trace("Persisting stores to file system for taskName: {} with checkpoint id: {}", taskName, checkpointId);
-        storageBackupManager.persistToFilesystem(checkpointId, uploadMap);
-      }
+    List<Map<String, StateCheckpointMarker>> stateCheckpoints = new ArrayList<>();

Review comment:
       Minor: add a comment for what this is: (list == one entry per backend factory, map == store name to SCM)

##########
File path: samza-core/src/main/scala/org/apache/samza/storage/TaskStorageCommitManager.java
##########
@@ -62,37 +65,60 @@ public void start() {
    * @return Committed StoreName to StateCheckpointMarker mappings of the committed SSPs
    */
   public Map<String, List<StateCheckpointMarker>> commit(TaskName taskName, CheckpointId checkpointId) {
-    Map<String, StateCheckpointMarker> snapshot = storageBackupManager.snapshot(checkpointId);
-    LOG.trace("Returned StateCheckpointMarkers from snapshot: {} for taskName: {} checkpoint id: {}", snapshot, taskName, checkpointId);
-    CompletableFuture<Map<String, StateCheckpointMarker>>
-        uploadFuture = storageBackupManager.upload(checkpointId, snapshot);
-
-    try {
-      // TODO: Make async with andThen and add thread management for concurrency and add timeouts
-      Map<String, StateCheckpointMarker> uploadMap = uploadFuture.get();
-      LOG.trace("Returned StateCheckpointMarkers from upload: {} for taskName: {} checkpoint id: {}", uploadMap, taskName, checkpointId);
-      if (uploadMap != null) {
-        LOG.trace("Persisting stores to file system for taskName: {} with checkpoint id: {}", taskName, checkpointId);
-        storageBackupManager.persistToFilesystem(checkpointId, uploadMap);
-      }
+    List<Map<String, StateCheckpointMarker>> stateCheckpoints = new ArrayList<>();
+    backendFactoryBackupManagerMap.values().forEach(storageBackupManager -> {
+      Map<String, StateCheckpointMarker> snapshotSCMs = storageBackupManager.snapshot(checkpointId);

Review comment:
       Now that we have multiple state backends, it makes sense to extract store flush and checkpoint dir creation out of the backup manager implementations and move it here as a one time thing.

##########
File path: samza-core/src/main/java/org/apache/samza/checkpoint/kafka/KafkaStateCheckpointMarker.java
##########
@@ -78,28 +77,21 @@ public String getChangelogOffset() {
   /**
    * Builds the SSP to kafka offset mapping from map of store name to list of StateCheckpointMarkers
    * containing a KafkaStateCheckpointMarker
-   * @param storeToStateBackendStateMarkers Map of store name to list of StateCheckpointMarkers containing a KafkaStateCheckpointMarker
+   * @param factoryStateBackendStateMarkersMap Map of store name to list of StateCheckpointMarkers containing a

Review comment:
       Fix param name and description to reflect map contents. 

##########
File path: samza-core/src/main/scala/org/apache/samza/storage/TaskStorageCommitManager.java
##########
@@ -62,37 +65,60 @@ public void start() {
    * @return Committed StoreName to StateCheckpointMarker mappings of the committed SSPs
    */
   public Map<String, List<StateCheckpointMarker>> commit(TaskName taskName, CheckpointId checkpointId) {
-    Map<String, StateCheckpointMarker> snapshot = storageBackupManager.snapshot(checkpointId);
-    LOG.trace("Returned StateCheckpointMarkers from snapshot: {} for taskName: {} checkpoint id: {}", snapshot, taskName, checkpointId);
-    CompletableFuture<Map<String, StateCheckpointMarker>>
-        uploadFuture = storageBackupManager.upload(checkpointId, snapshot);
-
-    try {
-      // TODO: Make async with andThen and add thread management for concurrency and add timeouts
-      Map<String, StateCheckpointMarker> uploadMap = uploadFuture.get();
-      LOG.trace("Returned StateCheckpointMarkers from upload: {} for taskName: {} checkpoint id: {}", uploadMap, taskName, checkpointId);
-      if (uploadMap != null) {
-        LOG.trace("Persisting stores to file system for taskName: {} with checkpoint id: {}", taskName, checkpointId);
-        storageBackupManager.persistToFilesystem(checkpointId, uploadMap);
-      }
+    List<Map<String, StateCheckpointMarker>> stateCheckpoints = new ArrayList<>();
+    backendFactoryBackupManagerMap.values().forEach(storageBackupManager -> {
+      Map<String, StateCheckpointMarker> snapshotSCMs = storageBackupManager.snapshot(checkpointId);
+      LOG.debug("Found snapshot SCMs for taskName: {} checkpoint id: {} to be: {}", taskName, checkpointId, snapshotSCMs);
 
-      // TODO: call commit on multiple backup managers when available
-      return mergeCheckpoints(taskName, Collections.singletonList(uploadMap));
-    } catch (Exception e) {
-      throw new SamzaException("Upload commit portion could not be completed for taskName", e);
-    }
+      CompletableFuture<Map<String, StateCheckpointMarker>> uploadFuture = storageBackupManager.upload(checkpointId, snapshotSCMs);
+
+      try {
+        // TODO: HIGH dchen Make async with andThen and add thread management for concurrency and add timeouts,
+        // need to make upload theads independent
+        Map<String, StateCheckpointMarker> uploadSCMs = uploadFuture.get();
+        LOG.debug("Found uplaod SCMs for taskName: {} checkpoint id: {} to be: {}", taskName, checkpointId, uploadSCMs);
+
+        if (uploadSCMs != null) {
+          LOG.debug("Persisting SCMs to store checkpoint directory for taskName: {} with checkpoint id: {}", taskName,
+              checkpointId);
+          storageBackupManager.persistToFilesystem(checkpointId, uploadSCMs);

Review comment:
       Should we move this (tagging checkpoint dir with a file containing the serialized "Checkpoint/SCM map") out of each backup manager impl and do it once in this class?

##########
File path: samza-core/src/main/java/org/apache/samza/checkpoint/kafka/KafkaStateCheckpointMarker.java
##########
@@ -78,28 +77,21 @@ public String getChangelogOffset() {
   /**
    * Builds the SSP to kafka offset mapping from map of store name to list of StateCheckpointMarkers
    * containing a KafkaStateCheckpointMarker
-   * @param storeToStateBackendStateMarkers Map of store name to list of StateCheckpointMarkers containing a KafkaStateCheckpointMarker
+   * @param factoryStateBackendStateMarkersMap Map of store name to list of StateCheckpointMarkers containing a
+   *                                           KafkaStateCheckpointMarker
    * @return Map of ssp to option of Kafka offset
    */
   // TODO HIGH dchen add unit tests, fix javadocs
   public static Map<SystemStreamPartition, Option<String>> scmsToSSPOffsetMap(
-      Map<String, List<StateCheckpointMarker>> storeToStateBackendStateMarkers) {
-    Map<String, String> storeToKafkaStateMarker = new HashMap<>();
-    storeToStateBackendStateMarkers.forEach((storeName, scms) -> {
-      String kafkaStateMarker = null;
-      for (StateCheckpointMarker scm : scms) {
-        if (KafkaChangelogStateBackendFactory.class.getName().equals(scm.getStateBackendFactoryName())) {
-          kafkaStateMarker = scm.getStateCheckpointMarker();
-          break; // there should be only one KafkaStateCheckpointMarker per store
-        }
-      }
-      storeToKafkaStateMarker.put(storeName, kafkaStateMarker);
-    });
-    return scmToSSPOffsetMap(storeToKafkaStateMarker);
+      Map<String, Map<String, String>> factoryStateBackendStateMarkersMap) {
+    return scmToSSPOffsetMap(factoryStateBackendStateMarkersMap
+        .getOrDefault(KAFKA_BACKEND_FACTORY_NAME, null));
   }
 
   /**
    * Builds a SSP to Kafka offset mapping from map of store name to KafkaStateCheckpointMarkers
+   * @param storeToKafkaStateMarker storeName to serialized KafkaStateCheckpointMarker
+   * @return Map of SSP to Optional offset

Review comment:
       Document which ssps (store changelogs), and when the optional will be empty.

##########
File path: samza-api/src/main/java/org/apache/samza/checkpoint/CheckpointV2.java
##########
@@ -37,17 +36,18 @@
 
   private final CheckpointId checkpointId;
   private final Map<SystemStreamPartition, String> inputOffsets;
-  private final Map<String, List<StateCheckpointMarker>> stateCheckpointMarkers;
+  private final Map<String, Map<String, String>> stateCheckpointMarkers;
 
   /**
    * Constructs the checkpoint with separated input and state offsets
    * @param checkpointId CheckpointId associated with this checkpoint
    * @param inputOffsets Map of Samza system stream partition to offset of the checkpoint
-   * @param stateCheckpointMarkers Map of local state store names and StateCheckpointMarkers for each state backend system
+   * @param stateCheckpointMarkers Map of state backend factory name to map of local state store names
+   *                               to StateCheckpointMarkers

Review comment:
       s/StateCheckpointMarkers/state checkpoint markers everywhere since its not a class anymore.

##########
File path: samza-api/src/main/java/org/apache/samza/checkpoint/CheckpointV2.java
##########
@@ -85,8 +85,7 @@ public CheckpointId getCheckpointId() {
    *
    * @return The state checkpoint markers for the checkpoint
    */
-  public Map<String, List<StateCheckpointMarker>> getStateCheckpointMarkers() {
-    // TODO HIGH dchen it might be a lot simpler to make this a Map<StateBackend, Map<StoreName, Marker>>
+  public Map<String, Map<String, String>> getStateCheckpointMarkers() {

Review comment:
       Document map contents.

##########
File path: samza-api/src/main/java/org/apache/samza/storage/TaskBackupManager.java
##########
@@ -79,7 +79,7 @@
    * @param checkpointId The {@link CheckpointId} of the last successfully committed checkpoint
    * @param stateCheckpointMarkers A map of store name to state checkpoint markers from returned by {@link #upload(CheckpointId, Map)} upload}

Review comment:
       s/from returned by/returned by/

##########
File path: samza-core/src/main/scala/org/apache/samza/storage/TaskStorageCommitManager.java
##########
@@ -139,29 +127,4 @@ public void close() {
       }
     });
   }
-
-  // TODO HIGH dchen add javadocs for what this method is doing
-  // TODO HIGH dchen add unit tests.
-  private Map<String, List<StateCheckpointMarker>> getStoreSCMs(TaskName taskName,

Review comment:
       Nice cleanup, thanks!

##########
File path: samza-core/src/main/scala/org/apache/samza/storage/TaskStorageCommitManager.java
##########
@@ -101,33 +96,26 @@ public void start() {
       }
     });
 
-    return getStoreSCMs(taskName, backendFactoryStoreStateMarkers);
+    return backendFactoryStoreStateMarkers;
   }
 
-  // TODO HIGH dchen add javadocs explaining what the params are.
-  public void cleanUp(CheckpointId checkpointId, Map<String, List<StateCheckpointMarker>> stateCheckpointMarkers) {
-    // { state backend factory -> { store name -> state checkpoint marker)
-    Map<String, Map<String, StateCheckpointMarker>> stateBackendToStoreSCMs = new HashMap<>();
-
-    // The number of backend factories is equal to the length of the stateCheckpointMarker per store list
-    stateBackendToBackupManager.keySet().forEach((stateBackendFactoryName) -> {
-      stateBackendToStoreSCMs.put(stateBackendFactoryName, new HashMap<>());
-    });
-
-    stateCheckpointMarkers.forEach((storeName, scmList) -> {
-      scmList.forEach(scm -> {
-        if (stateBackendToStoreSCMs.containsKey(scm.getStateBackendFactoryName())) {
-          stateBackendToStoreSCMs.get(scm.getStateBackendFactoryName()).put(storeName, scm);
-        } else {
-          LOG.warn("Ignored cleanup for scm: {} due to unknown factory: {} ", scm, scm.getStateBackendFactoryName());
+  /**
+   * Cleanup  each of the task backup managers
+   * @param checkpointId CheckpointId of the most recent successful commit
+   * @param stateCheckpointMarkers map of map(stateBackendFactoryName to map(storeName to StateCheckpointMarkers) from

Review comment:
       Fix param description. Will be more readable to use the format you used in JsonCheckpoint.

##########
File path: samza-core/src/main/scala/org/apache/samza/storage/TaskStorageCommitManager.java
##########
@@ -101,33 +96,26 @@ public void start() {
       }
     });
 
-    return getStoreSCMs(taskName, backendFactoryStoreStateMarkers);
+    return backendFactoryStoreStateMarkers;
   }
 
-  // TODO HIGH dchen add javadocs explaining what the params are.
-  public void cleanUp(CheckpointId checkpointId, Map<String, List<StateCheckpointMarker>> stateCheckpointMarkers) {
-    // { state backend factory -> { store name -> state checkpoint marker)
-    Map<String, Map<String, StateCheckpointMarker>> stateBackendToStoreSCMs = new HashMap<>();
-
-    // The number of backend factories is equal to the length of the stateCheckpointMarker per store list
-    stateBackendToBackupManager.keySet().forEach((stateBackendFactoryName) -> {
-      stateBackendToStoreSCMs.put(stateBackendFactoryName, new HashMap<>());
-    });
-
-    stateCheckpointMarkers.forEach((storeName, scmList) -> {
-      scmList.forEach(scm -> {
-        if (stateBackendToStoreSCMs.containsKey(scm.getStateBackendFactoryName())) {
-          stateBackendToStoreSCMs.get(scm.getStateBackendFactoryName()).put(storeName, scm);
-        } else {
-          LOG.warn("Ignored cleanup for scm: {} due to unknown factory: {} ", scm, scm.getStateBackendFactoryName());
+  /**
+   * Cleanup  each of the task backup managers
+   * @param checkpointId CheckpointId of the most recent successful commit
+   * @param stateCheckpointMarkers map of map(stateBackendFactoryName to map(storeName to StateCheckpointMarkers) from
+   *                              the latest commit
+   */
+  public void cleanUp(CheckpointId checkpointId, Map<String, Map<String, String>> stateCheckpointMarkers) {
+    stateCheckpointMarkers.entrySet().forEach((factoryNameToSCM) -> {
+      String factoryName = factoryNameToSCM.getKey();
+      if (stateBackendToBackupManager.containsKey(factoryName)) {
+        TaskBackupManager backupManager = stateBackendToBackupManager.get(factoryName);
+        if (backupManager != null) {
+          backupManager.cleanUp(checkpointId, factoryNameToSCM.getValue());
         }

Review comment:
       Else throw exception? When would backupManager be null?

##########
File path: samza-core/src/main/java/org/apache/samza/serializers/JsonCheckpoint.java
##########
@@ -29,6 +29,7 @@
 public class JsonCheckpoint {
   private String checkpointId;
   private Map<String, Map<String, String>> inputOffsets;
+  // Map<StorageBackendFactoryName, Map<StoreName, StateCheckpointMarker>>

Review comment:
       +1, maybe use this convention in javadocs at-params/at-returns and comments describing field/variables as well. More readable than the natural language description IMHO.

##########
File path: samza-core/src/main/java/org/apache/samza/serializers/CheckpointV2Serde.java
##########
@@ -54,60 +59,65 @@
  *   }
  * }
  * </code>
- *
  */
-public class StatefulCheckpointSerde implements Serde<Checkpoint> {
-  private static final StateCheckpointMarkerSerde STATE_CHECKPOINT_MARKER_SERDE = new StateCheckpointMarkerSerde();
+public class CheckpointV2Serde implements Serde<CheckpointV2> {
+  private static final StateCheckpointMarkerSerde SCM_SERDE = new StateCheckpointMarkerSerde();
+
   private final Serde<JsonCheckpoint> jsonCheckpointSerde;
 
-  public StatefulCheckpointSerde() {
+  public CheckpointV2Serde() {
     this.jsonCheckpointSerde = new JsonSerdeV2<>(JsonCheckpoint.class);
   }
 
   @Override
-  public Checkpoint fromBytes(byte[] bytes) {
+  public CheckpointV2 fromBytes(byte[] bytes) {
     try {
       JsonCheckpoint jsonCheckpoint = jsonCheckpointSerde.fromBytes(bytes);
       Map<SystemStreamPartition, String> sspOffsets = new HashMap<>();
       Map<String, List<StateCheckpointMarker>> stateCheckpoints = new HashMap<>();
 
       jsonCheckpoint.getInputOffsets().forEach((sspName, m) -> {

Review comment:
       Might not be necessary if you're planning to switch to Jackson Deserializers later.

##########
File path: samza-core/src/main/scala/org/apache/samza/storage/TaskStorageCommitManager.java
##########
@@ -65,9 +61,9 @@ public void start() {
   /**
    * Commits the local state on the remote backup implementation
    * TODO BLOCKER dchen add comments / docs for what all these Map keys and value are.
-   * @return Committed StoreName to StateCheckpointMarker mappings of the committed SSPs
+   * @return Committed Map of FactoryName to (Map of StoreName to StateCheckpointMarker) mappings of the committed SSPs

Review comment:
       Clean up redundant description. (Committed map of .. mappings of the committed SSPs)

##########
File path: samza-core/src/main/java/org/apache/samza/checkpoint/kafka/KafkaStateCheckpointMarker.java
##########
@@ -78,28 +77,21 @@ public String getChangelogOffset() {
   /**
    * Builds the SSP to kafka offset mapping from map of store name to list of StateCheckpointMarkers
    * containing a KafkaStateCheckpointMarker
-   * @param storeToStateBackendStateMarkers Map of store name to list of StateCheckpointMarkers containing a KafkaStateCheckpointMarker
+   * @param factoryStateBackendStateMarkersMap Map of store name to list of StateCheckpointMarkers containing a

Review comment:
       Update rest of the javadocs as well.




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [samza] prateekm commented on a change in pull request #1429: SAMZA-2591: API updates for TaskStorageBackupManager

Posted by GitBox <gi...@apache.org>.
prateekm commented on a change in pull request #1429:
URL: https://github.com/apache/samza/pull/1429#discussion_r595449482



##########
File path: samza-test/src/test/java/org/apache/samza/storage/kv/TransactionalStateIntegrationTest.java
##########
@@ -116,35 +116,53 @@ public void setUp() {
   @Test
   public void testStopAndRestart() {
     List<String> inputMessagesOnInitialRun = Arrays.asList("1", "2", "3", "2", "97", "-97", ":98", ":99", ":crash_once");
+    // double check collectors.flush
     List<String> expectedChangelogMessagesOnInitialRun = Arrays.asList("1", "2", "3", "2", "97", null, "98", "99");
     initialRun(inputMessagesOnInitialRun, expectedChangelogMessagesOnInitialRun);
 
-    // first two are reverts for uncommitted messages from last run
-    List<String> expectedChangelogMessagesOnSecondRun =
+    // first two are reverts for uncommitted messages from last run for keys 98 and 99
+    List<String> expectedChangelogMessagesAfterSecondRun =
         Arrays.asList(null, null, "98", "99", "4", "5", "5");
     List<String> expectedInitialStoreContentsOnSecondRun = Arrays.asList("1", "2", "3");
     secondRun(CHANGELOG_TOPIC,
-        expectedChangelogMessagesOnSecondRun, expectedInitialStoreContentsOnSecondRun);
+        expectedChangelogMessagesAfterSecondRun, expectedInitialStoreContentsOnSecondRun, CONFIGS);
+  }
+
+  @Test
+  public void testStopAndRestartCheckpointV2() {

Review comment:
       Since this is an unrelated scenario, can you extract this to a new class (CheckpointVersionIntegrationTest?).




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [samza] dxichen commented on a change in pull request #1429: SAMZA-2591: API updates for TaskStorageBackupManager

Posted by GitBox <gi...@apache.org>.
dxichen commented on a change in pull request #1429:
URL: https://github.com/apache/samza/pull/1429#discussion_r540381625



##########
File path: samza-api/src/main/java/org/apache/samza/storage/StateBackendFactory.java
##########
@@ -0,0 +1,43 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.samza.storage;
+
+import java.util.Map;
+import org.apache.samza.config.Config;
+import org.apache.samza.job.model.ContainerModel;
+import org.apache.samza.job.model.JobModel;
+import org.apache.samza.job.model.TaskModel;
+import org.apache.samza.util.Clock;
+
+
+public interface StateBackendFactory {
+  TaskStorageBackupManager getBackupManager(TaskModel taskModel,

Review comment:
       added




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [samza] prateekm commented on a change in pull request #1429: SAMZA-2591: API updates for TaskStorageBackupManager

Posted by GitBox <gi...@apache.org>.
prateekm commented on a change in pull request #1429:
URL: https://github.com/apache/samza/pull/1429#discussion_r536338915



##########
File path: samza-api/src/main/java/org/apache/samza/checkpoint/Checkpoint.java
##########
@@ -55,18 +82,18 @@ public boolean equals(Object o) {
 
     Checkpoint that = (Checkpoint) o;
 
-    if (offsets != null ? !offsets.equals(that.offsets) : that.offsets != null) return false;
-
-    return true;
+    return (checkpointId.equals(that.checkpointId)) &&
+        (Objects.equals(inputOffsets, that.inputOffsets)) &&
+        (Objects.equals(stateCheckpoint, that.stateCheckpoint));
   }
 
   @Override
   public int hashCode() {
-    return offsets != null ? offsets.hashCode() : 0;
+    return inputOffsets != null ? inputOffsets.hashCode() : 0;
   }
 
   @Override
   public String toString() {
-    return "Checkpoint [offsets=" + offsets + "]";
+    return "Checkpoint [inputOffsets=" + inputOffsets + "]";

Review comment:
       Print checkpointId, stateCheckpointMarkers as well?

##########
File path: samza-api/src/main/java/org/apache/samza/checkpoint/KafkaStateChangelogOffset.java
##########
@@ -26,57 +26,58 @@
  * Checkpointed changelog offset has the format: [checkpointId, offset], separated by a colon.
  */
 @InterfaceStability.Unstable
-public class CheckpointedChangelogOffset {
+public class KafkaStateChangelogOffset {
   public static final String SEPARATOR = ":";
 
   private final CheckpointId checkpointId;
-  private final String offset;
+  private final String changelogOffset;
 
-  public CheckpointedChangelogOffset(CheckpointId checkpointId, String offset) {
+  public KafkaStateChangelogOffset(CheckpointId checkpointId, String changelogOffset) {
     this.checkpointId = checkpointId;
-    this.offset = offset;
+    this.changelogOffset = changelogOffset;
   }
 
-  public static CheckpointedChangelogOffset fromString(String message) {
+  public static KafkaStateChangelogOffset fromString(String message) {
     if (StringUtils.isBlank(message)) {
       throw new IllegalArgumentException("Invalid checkpointed changelog message: " + message);
     }
-    String[] checkpointIdAndOffset = message.split(":");
-    if (checkpointIdAndOffset.length != 2) {
+    String[] checkpointIdAndOffset = message.split(SEPARATOR);
+    if (checkpointIdAndOffset.length < 2 || checkpointIdAndOffset.length > 3) {
       throw new IllegalArgumentException("Invalid checkpointed changelog offset: " + message);
     }
     CheckpointId checkpointId = CheckpointId.fromString(checkpointIdAndOffset[0]);
     String offset = null;
     if (!"null".equals(checkpointIdAndOffset[1])) {
       offset = checkpointIdAndOffset[1];
     }
-    return new CheckpointedChangelogOffset(checkpointId, offset);
+
+    return new KafkaStateChangelogOffset(checkpointId, offset);
   }
 
   public CheckpointId getCheckpointId() {
     return checkpointId;
   }
 
-  public String getOffset() {
-    return offset;
+  public String getChangelogOffset() {
+    return changelogOffset;
   }
 
   @Override
   public String toString() {
-    return String.format("%s%s%s", checkpointId, SEPARATOR, offset);
+    return String.format("%s%s%s%s%s", checkpointId, SEPARATOR, changelogOffset);

Review comment:
       Remove extra %s

##########
File path: samza-api/src/main/java/org/apache/samza/checkpoint/Checkpoint.java
##########
@@ -30,22 +32,47 @@
  * of restarting a failed container within a running job.
  */
 public class Checkpoint {
-  private final Map<SystemStreamPartition, String> offsets;
+  private static final short PROTOCOL_VERSION = 1;
+  private final CheckpointId checkpointId;
+  private final Map<SystemStreamPartition, String> inputOffsets;
+  private final Map<String, List<StateCheckpointMarker>> stateCheckpoint;
 
   /**
    * Constructs a new checkpoint based off a map of Samza stream offsets.
    * @param offsets Map of Samza streams to their current offset.
    */
   public Checkpoint(Map<SystemStreamPartition, String> offsets) {
-    this.offsets = offsets;
+    this(CheckpointId.create(), offsets, null);
+  }
+
+  public Checkpoint(CheckpointId id, Map<SystemStreamPartition, String> inputOffsets, Map<String, List<StateCheckpointMarker>> stateCheckpoint) {
+    this.checkpointId = id;
+    this.inputOffsets = inputOffsets;
+    this.stateCheckpoint = stateCheckpoint;
+  }
+
+  /**
+   * Gets the checkpoint id for the checkpoint
+   * @return The timestamp based checkpoint identifier associated with the checkpoint
+   */
+  public CheckpointId getCheckpointId() {
+    return checkpointId;
   }
 
   /**
    * Gets a unmodifiable view of the current Samza stream offsets.
    * @return A unmodifiable view of a Map of Samza streams to their recorded offsets.
    */
-  public Map<SystemStreamPartition, String> getOffsets() {
-    return Collections.unmodifiableMap(offsets);
+  public Map<SystemStreamPartition, String> getInputOffsets() {
+    return Collections.unmodifiableMap(inputOffsets);
+  }
+
+  /**
+   * Gets the stateCheckpointMarkers
+   * @return The state checkpoint markers for the checkpoint
+   */
+  public Map<String, List<StateCheckpointMarker>> getStateCheckpointMarkers() {
+    return stateCheckpoint;

Review comment:
       Try to avoid returning nulls. Either return an empty map here (preferred) or mark method as Nullable and clarify nullability in javadoc.

##########
File path: samza-api/src/main/java/org/apache/samza/checkpoint/Checkpoint.java
##########
@@ -30,22 +32,47 @@
  * of restarting a failed container within a running job.
  */
 public class Checkpoint {
-  private final Map<SystemStreamPartition, String> offsets;
+  private static final short PROTOCOL_VERSION = 1;
+  private final CheckpointId checkpointId;
+  private final Map<SystemStreamPartition, String> inputOffsets;
+  private final Map<String, List<StateCheckpointMarker>> stateCheckpoint;

Review comment:
       Minor: stateCheckpoints (plural), here and everywhere else.

##########
File path: samza-api/src/main/java/org/apache/samza/checkpoint/Checkpoint.java
##########
@@ -55,18 +82,18 @@ public boolean equals(Object o) {
 
     Checkpoint that = (Checkpoint) o;
 
-    if (offsets != null ? !offsets.equals(that.offsets) : that.offsets != null) return false;
-
-    return true;
+    return (checkpointId.equals(that.checkpointId)) &&
+        (Objects.equals(inputOffsets, that.inputOffsets)) &&
+        (Objects.equals(stateCheckpoint, that.stateCheckpoint));
   }
 
   @Override
   public int hashCode() {
-    return offsets != null ? offsets.hashCode() : 0;
+    return inputOffsets != null ? inputOffsets.hashCode() : 0;

Review comment:
       hashCode should also be based on checkpointId and stateCheckpoints to match equals implementation.

##########
File path: samza-api/src/main/java/org/apache/samza/checkpoint/Checkpoint.java
##########
@@ -55,18 +82,18 @@ public boolean equals(Object o) {
 
     Checkpoint that = (Checkpoint) o;
 
-    if (offsets != null ? !offsets.equals(that.offsets) : that.offsets != null) return false;
-
-    return true;
+    return (checkpointId.equals(that.checkpointId)) &&
+        (Objects.equals(inputOffsets, that.inputOffsets)) &&

Review comment:
       Does Objects.equals do a deep equality comparison for Map types or is it only using default Map.equals (comparing references)?

##########
File path: samza-api/src/main/java/org/apache/samza/checkpoint/KafkaStateChangelogOffset.java
##########
@@ -26,57 +26,58 @@
  * Checkpointed changelog offset has the format: [checkpointId, offset], separated by a colon.
  */
 @InterfaceStability.Unstable
-public class CheckpointedChangelogOffset {
+public class KafkaStateChangelogOffset {
   public static final String SEPARATOR = ":";
 
   private final CheckpointId checkpointId;

Review comment:
       Does this still need to contain the checkpointId? Is this for backcompat?

##########
File path: samza-api/src/main/java/org/apache/samza/checkpoint/Checkpoint.java
##########
@@ -30,22 +32,47 @@
  * of restarting a failed container within a running job.
  */
 public class Checkpoint {
-  private final Map<SystemStreamPartition, String> offsets;
+  private static final short PROTOCOL_VERSION = 1;
+  private final CheckpointId checkpointId;
+  private final Map<SystemStreamPartition, String> inputOffsets;
+  private final Map<String, List<StateCheckpointMarker>> stateCheckpoint;
 
   /**
    * Constructs a new checkpoint based off a map of Samza stream offsets.
    * @param offsets Map of Samza streams to their current offset.
    */
   public Checkpoint(Map<SystemStreamPartition, String> offsets) {

Review comment:
       Minor: inputOffsets (param name)

##########
File path: samza-api/src/main/java/org/apache/samza/checkpoint/KafkaStateChangelogOffset.java
##########
@@ -26,57 +26,58 @@
  * Checkpointed changelog offset has the format: [checkpointId, offset], separated by a colon.
  */
 @InterfaceStability.Unstable
-public class CheckpointedChangelogOffset {
+public class KafkaStateChangelogOffset {
   public static final String SEPARATOR = ":";
 
   private final CheckpointId checkpointId;
-  private final String offset;
+  private final String changelogOffset;
 
-  public CheckpointedChangelogOffset(CheckpointId checkpointId, String offset) {
+  public KafkaStateChangelogOffset(CheckpointId checkpointId, String changelogOffset) {
     this.checkpointId = checkpointId;
-    this.offset = offset;
+    this.changelogOffset = changelogOffset;
   }
 
-  public static CheckpointedChangelogOffset fromString(String message) {
+  public static KafkaStateChangelogOffset fromString(String message) {
     if (StringUtils.isBlank(message)) {
       throw new IllegalArgumentException("Invalid checkpointed changelog message: " + message);
     }
-    String[] checkpointIdAndOffset = message.split(":");
-    if (checkpointIdAndOffset.length != 2) {
+    String[] checkpointIdAndOffset = message.split(SEPARATOR);
+    if (checkpointIdAndOffset.length < 2 || checkpointIdAndOffset.length > 3) {

Review comment:
       Why relax this check?

##########
File path: samza-api/src/main/java/org/apache/samza/checkpoint/RemoteStoreMetadata.java
##########
@@ -0,0 +1,87 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.samza.checkpoint;
+
+import java.util.Objects;
+import org.apache.commons.lang3.StringUtils;
+
+
+public class RemoteStoreMetadata {
+  public static final String SEPARATOR = ";";
+  // backwards compatibility for this unstable api
+  private static final short PROTOCOL_VERSION = 0;
+
+  // blob store location id obtained after upload
+  private final String blobId;
+  // timestamp of when the upload was completed
+  private final long createdMillis;
+
+  public RemoteStoreMetadata(String blobId, long createdMillis) {
+    this.blobId = blobId;
+    this.createdMillis = createdMillis;
+  }
+
+  public String getBlobId() {
+    return blobId;
+  }
+
+  public long getCreatedMillis() {
+    return createdMillis;
+  }
+
+  public short getProtocolVersion() {
+    return PROTOCOL_VERSION;
+  }
+
+  public static RemoteStoreMetadata fromString(String message) {

Review comment:
       Do we still need this method if we have an explicit StateCheckpointMarkerSerde?

##########
File path: samza-api/src/main/java/org/apache/samza/checkpoint/RemoteStoreMetadata.java
##########
@@ -0,0 +1,87 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.samza.checkpoint;
+
+import java.util.Objects;
+import org.apache.commons.lang3.StringUtils;
+
+
+public class RemoteStoreMetadata {
+  public static final String SEPARATOR = ";";
+  // backwards compatibility for this unstable api
+  private static final short PROTOCOL_VERSION = 0;

Review comment:
       Minor: Start with 1.

##########
File path: samza-api/src/main/java/org/apache/samza/checkpoint/RemoteStoreMetadata.java
##########
@@ -0,0 +1,87 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.samza.checkpoint;
+
+import java.util.Objects;
+import org.apache.commons.lang3.StringUtils;
+
+
+public class RemoteStoreMetadata {
+  public static final String SEPARATOR = ";";
+  // backwards compatibility for this unstable api
+  private static final short PROTOCOL_VERSION = 0;
+
+  // blob store location id obtained after upload
+  private final String blobId;
+  // timestamp of when the upload was completed
+  private final long createdMillis;
+
+  public RemoteStoreMetadata(String blobId, long createdMillis) {
+    this.blobId = blobId;
+    this.createdMillis = createdMillis;
+  }
+
+  public String getBlobId() {
+    return blobId;
+  }
+
+  public long getCreatedMillis() {
+    return createdMillis;
+  }
+
+  public short getProtocolVersion() {
+    return PROTOCOL_VERSION;
+  }
+
+  public static RemoteStoreMetadata fromString(String message) {
+    if (StringUtils.isBlank(message)) {
+      throw new IllegalArgumentException("Invalid remote store checkpoint message: " + message);
+    }
+    String[] parts = message.split(SEPARATOR);
+    if (parts.length != 3) {
+      throw new IllegalArgumentException("Invalid checkpointed changelog offset: " + message);

Review comment:
       Fix message.

##########
File path: samza-api/src/main/java/org/apache/samza/checkpoint/RemoteStoreMetadata.java
##########
@@ -0,0 +1,87 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.samza.checkpoint;
+
+import java.util.Objects;
+import org.apache.commons.lang3.StringUtils;
+
+
+public class RemoteStoreMetadata {

Review comment:
       BlobStoreStateCheckpointMarker?

##########
File path: samza-api/src/main/java/org/apache/samza/checkpoint/StateCheckpointMarker.java
##########
@@ -17,21 +17,16 @@
  * under the License.
  */
 
-package org.apache.samza.storage
+package org.apache.samza.checkpoint;
 
-import org.apache.samza.checkpoint.CheckpointId
-import org.apache.samza.system.SystemStreamPartition
+import org.apache.samza.storage.StateBackendFactory;
 
-trait TaskStorageManager {
 
-  def getStore(storeName: String): Option[StorageEngine]
-
-  def flush(): Map[SystemStreamPartition, Option[String]]
-
-  def checkpoint(checkpointId: CheckpointId, newestChangelogOffsets: Map[SystemStreamPartition, Option[String]]): Unit
-
-  def removeOldCheckpoints(checkpointId: CheckpointId): Unit
-
-  def stop(): Unit
+/**
+ * Interface for State Checkpoint Marker for all TaskStorageBackupManagers
+ */
+public interface StateCheckpointMarker {

Review comment:
       Should KafkaCheckpointMarker and BlobStoreCheckpointMarker implement this interface?

##########
File path: samza-api/src/main/java/org/apache/samza/checkpoint/StateCheckpointMarker.java
##########
@@ -17,21 +17,16 @@
  * under the License.
  */
 
-package org.apache.samza.storage
+package org.apache.samza.checkpoint;
 
-import org.apache.samza.checkpoint.CheckpointId
-import org.apache.samza.system.SystemStreamPartition
+import org.apache.samza.storage.StateBackendFactory;
 
-trait TaskStorageManager {
 
-  def getStore(storeName: String): Option[StorageEngine]
-
-  def flush(): Map[SystemStreamPartition, Option[String]]
-
-  def checkpoint(checkpointId: CheckpointId, newestChangelogOffsets: Map[SystemStreamPartition, Option[String]]): Unit
-
-  def removeOldCheckpoints(checkpointId: CheckpointId): Unit
-
-  def stop(): Unit
+/**
+ * Interface for State Checkpoint Marker for all TaskStorageBackupManagers
+ */
+public interface StateCheckpointMarker {
+  String getFactoryName();
 
-}
\ No newline at end of file
+  StateBackendFactory getFactory();

Review comment:
       Why does this need to return the actual factory instance?

##########
File path: samza-api/src/main/java/org/apache/samza/storage/TaskStorageAdmin.java
##########
@@ -0,0 +1,25 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.samza.storage;
+
+public interface TaskStorageAdmin {
+
+  void createResources();

Review comment:
       Also add validateResources

##########
File path: samza-core/src/main/java/org/apache/samza/config/StorageConfig.java
##########
@@ -240,6 +251,10 @@ public long getChangelogMinCompactionLagMs(String storeName) {
     return getLong(minCompactLagConfigName, getDefaultChangelogMinCompactionLagMs());
   }
 
+  public String getStateRestoreBackupManager() {
+    return get(STATE_BACKUP_MANAGER_FACTORY, DEFAULT_STATE_BACKUP_MANAGER_FACTORY);

Review comment:
       s/getStateRestoreBackupManager/getStateBackupManager

##########
File path: samza-api/src/main/java/org/apache/samza/checkpoint/Checkpoint.java
##########
@@ -55,18 +82,18 @@ public boolean equals(Object o) {
 
     Checkpoint that = (Checkpoint) o;
 
-    if (offsets != null ? !offsets.equals(that.offsets) : that.offsets != null) return false;
-
-    return true;
+    return (checkpointId.equals(that.checkpointId)) &&

Review comment:
       Need to include protocol version in equals and hashcode?

##########
File path: samza-api/src/main/java/org/apache/samza/storage/StateBackendFactory.java
##########
@@ -0,0 +1,43 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.samza.storage;
+
+import java.util.Map;
+import org.apache.samza.config.Config;
+import org.apache.samza.job.model.ContainerModel;
+import org.apache.samza.job.model.JobModel;
+import org.apache.samza.job.model.TaskModel;
+import org.apache.samza.util.Clock;
+
+
+public interface StateBackendFactory {
+  TaskStorageBackupManager getBackupManager(TaskModel taskModel,

Review comment:
       Minor: s/TaskStorageBackupManager/TaskBackupManager or TaskStateBackupManager, and make it consistent with TaskRestoreManager naming convention.

##########
File path: samza-api/src/main/java/org/apache/samza/storage/TaskStorageBackupManager.java
##########
@@ -0,0 +1,76 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.samza.storage
+
+import java.util.Map;
+import java.util.concurrent.CompletableFuture
+
+import org.apache.samza.checkpoint.CheckpointId;
+import org.apache.samza.checkpoint.StateCheckpointMarker;
+
+/**
+ * <p>
+ * TaskStorageBackupManager is the interface that must be implemented for
+ * any remote system that Samza persists its state to. The interface will be
+ * evoked in the following way:
+ * </p>
+ *
+ * <ul>
+ *   <li>Snapshot will be called before Upload.</li>
+ *   <li>persistToFilesystem will be called after Upload is completed</li>
+ *   <li>Cleanup is only called after Upload and persistToFilesystem has successfully completed</li>
+ * </ul>
+ */
+public interface TaskStorageBackupManager {
+
+  /**
+   * Commit operation that is synchronous to processing
+   * @param checkpointId Checkpoint id of the current commit
+   * @return The storename to checkpoint of the snapshotted local store
+   */
+  Map<String, StateCheckpointMarker> snapshot(CheckpointId checkpointId);
+
+  /**
+   * Commit operation that is asynchronous to message processing,
+   * @param checkpointId Checkpoint id of the current commit
+   * @param stateCheckpointMarkers The map of storename to checkpoint makers returned by the snapshot
+   * @return The future of storename to checkpoint map of the uploaded local store
+   */
+  CompletableFuture<Map<String, StateCheckpointMarker>> upload(CheckpointId checkpointId, Map<String, StateCheckpointMarker> stateCheckpointMarkers);
+
+  /**
+   * Persist the state locally to the file system
+   * @param checkpointId The id of the checkpoint to be committed
+   * @param stateCheckpointMarkers Uploaded storename to checkpoints markers to be persisted locally
+   */
+  void persistToFilesystem(CheckpointId checkpointId, Map<String, StateCheckpointMarker> stateCheckpointMarkers);

Review comment:
       Do you need input offsets here too (e.g. to write offsets file). If so, might be worth passing the entire `Checkpoint` instance.

##########
File path: samza-api/src/main/java/org/apache/samza/storage/StateBackendFactory.java
##########
@@ -0,0 +1,43 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.samza.storage;
+
+import java.util.Map;
+import org.apache.samza.config.Config;
+import org.apache.samza.job.model.ContainerModel;
+import org.apache.samza.job.model.JobModel;
+import org.apache.samza.job.model.TaskModel;
+import org.apache.samza.util.Clock;
+
+
+public interface StateBackendFactory {
+  TaskStorageBackupManager getBackupManager(TaskModel taskModel,

Review comment:
       Also, pass the jobModel, containerModel and clock as well for consistency with restore manager interface.

##########
File path: samza-api/src/main/java/org/apache/samza/checkpoint/KafkaStateChangelogOffset.java
##########
@@ -26,57 +26,58 @@
  * Checkpointed changelog offset has the format: [checkpointId, offset], separated by a colon.
  */
 @InterfaceStability.Unstable
-public class CheckpointedChangelogOffset {
+public class KafkaStateChangelogOffset {

Review comment:
       IIUC, this should implement StateCheckpointMarker interface?

##########
File path: samza-api/src/main/java/org/apache/samza/storage/TaskStorageBackupManager.java
##########
@@ -0,0 +1,76 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.samza.storage
+
+import java.util.Map;
+import java.util.concurrent.CompletableFuture
+
+import org.apache.samza.checkpoint.CheckpointId;
+import org.apache.samza.checkpoint.StateCheckpointMarker;
+
+/**
+ * <p>
+ * TaskStorageBackupManager is the interface that must be implemented for
+ * any remote system that Samza persists its state to. The interface will be
+ * evoked in the following way:
+ * </p>
+ *
+ * <ul>
+ *   <li>Snapshot will be called before Upload.</li>
+ *   <li>persistToFilesystem will be called after Upload is completed</li>
+ *   <li>Cleanup is only called after Upload and persistToFilesystem has successfully completed</li>
+ * </ul>
+ */
+public interface TaskStorageBackupManager {
+
+  /**
+   * Commit operation that is synchronous to processing
+   * @param checkpointId Checkpoint id of the current commit
+   * @return The storename to checkpoint of the snapshotted local store
+   */
+  Map<String, StateCheckpointMarker> snapshot(CheckpointId checkpointId);
+
+  /**
+   * Commit operation that is asynchronous to message processing,
+   * @param checkpointId Checkpoint id of the current commit
+   * @param stateCheckpointMarkers The map of storename to checkpoint makers returned by the snapshot
+   * @return The future of storename to checkpoint map of the uploaded local store
+   */
+  CompletableFuture<Map<String, StateCheckpointMarker>> upload(CheckpointId checkpointId, Map<String, StateCheckpointMarker> stateCheckpointMarkers);
+
+  /**
+   * Persist the state locally to the file system
+   * @param checkpointId The id of the checkpoint to be committed
+   * @param stateCheckpointMarkers Uploaded storename to checkpoints markers to be persisted locally
+   */
+  void persistToFilesystem(CheckpointId checkpointId, Map<String, StateCheckpointMarker> stateCheckpointMarkers);
+
+  /**
+   * Cleanup any local or remote state for obsolete checkpoint information that are older than checkpointId
+   * @param checkpointId The id of the latest successfully committed checkpoint
+   */
+  void cleanUp(CheckpointId checkpointId);

Review comment:
       Need markers here? E.g. to find index blobIds etc.

##########
File path: samza-api/src/main/java/org/apache/samza/checkpoint/RemoteStoreMetadata.java
##########
@@ -0,0 +1,87 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.samza.checkpoint;
+
+import java.util.Objects;
+import org.apache.commons.lang3.StringUtils;
+
+
+public class RemoteStoreMetadata {

Review comment:
       implements StateCheckpointMarker?

##########
File path: samza-core/src/main/scala/org/apache/samza/container/SamzaContainer.scala
##########
@@ -543,6 +540,10 @@ object SamzaContainer extends Logging {
 
     storeWatchPaths.addAll(containerStorageManager.getStoreDirectoryPaths)
 
+    val stateStorageBackendFactory = {

Review comment:
       Minor: Don't need curly braces.

##########
File path: samza-api/src/main/java/org/apache/samza/storage/TaskStorageBackupManager.java
##########
@@ -0,0 +1,76 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.samza.storage
+
+import java.util.Map;
+import java.util.concurrent.CompletableFuture
+
+import org.apache.samza.checkpoint.CheckpointId;
+import org.apache.samza.checkpoint.StateCheckpointMarker;
+
+/**
+ * <p>
+ * TaskStorageBackupManager is the interface that must be implemented for
+ * any remote system that Samza persists its state to. The interface will be
+ * evoked in the following way:
+ * </p>
+ *
+ * <ul>
+ *   <li>Snapshot will be called before Upload.</li>
+ *   <li>persistToFilesystem will be called after Upload is completed</li>
+ *   <li>Cleanup is only called after Upload and persistToFilesystem has successfully completed</li>
+ * </ul>
+ */
+public interface TaskStorageBackupManager {
+
+  /**
+   * Commit operation that is synchronous to processing
+   * @param checkpointId Checkpoint id of the current commit
+   * @return The storename to checkpoint of the snapshotted local store
+   */
+  Map<String, StateCheckpointMarker> snapshot(CheckpointId checkpointId);
+
+  /**
+   * Commit operation that is asynchronous to message processing,
+   * @param checkpointId Checkpoint id of the current commit
+   * @param stateCheckpointMarkers The map of storename to checkpoint makers returned by the snapshot
+   * @return The future of storename to checkpoint map of the uploaded local store
+   */
+  CompletableFuture<Map<String, StateCheckpointMarker>> upload(CheckpointId checkpointId, Map<String, StateCheckpointMarker> stateCheckpointMarkers);
+
+  /**
+   * Persist the state locally to the file system
+   * @param checkpointId The id of the checkpoint to be committed
+   * @param stateCheckpointMarkers Uploaded storename to checkpoints markers to be persisted locally
+   */
+  void persistToFilesystem(CheckpointId checkpointId, Map<String, StateCheckpointMarker> stateCheckpointMarkers);
+
+  /**
+   * Cleanup any local or remote state for obsolete checkpoint information that are older than checkpointId
+   * @param checkpointId The id of the latest successfully committed checkpoint
+   */
+  void cleanUp(CheckpointId checkpointId);
+
+  /**
+   * Used for testing as a shutdown hook to cleanup any allocated resources
+   */
+  void stop();

Review comment:
       Should also have a corresponding start method. Prefer init/close for consistency with most other samza methods.

##########
File path: samza-api/src/main/java/org/apache/samza/storage/StorageEngine.java
##########
@@ -38,6 +42,11 @@
  */
 public interface StorageEngine {
 
+  /**
+   * Initiate storage engine
+   */
+  void init(ContainerContext containerContext, JobContext jobContext, ExternalContext externalContext);

Review comment:
       Nitpick: Order of param: External, Job, Container, to follow the hierarchy. Same for StateBackendFactory method params (JobModel, ContainerModel, TaskModel)

##########
File path: samza-api/src/main/java/org/apache/samza/storage/kv/KeyValueStore.java
##########
@@ -35,6 +38,18 @@
  * @param <V> the type of values maintained by this key-value store.
  */
 public interface KeyValueStore<K, V> {
+
+  /**
+   * Initiates the KeyValueStore
+   *
+   * @param containerContext context of the KeyValueStore's container
+   * @param jobContext context of the job the KeyValueStore is in
+   * @param externalContext any external store required for initialization
+   */
+  default void init(ContainerContext containerContext, JobContext jobContext, ExternalContext externalContext) {
+    throw new UnsupportedOperationException("init() is not supported in " + this.getClass().getName());

Review comment:
       Is it better to make this a noop than throwing an exception? That way we can always call init at the right place in the store lifecycle.




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [samza] prateekm commented on a change in pull request #1429: SAMZA-2591: API updates for TaskStorageBackupManager

Posted by GitBox <gi...@apache.org>.
prateekm commented on a change in pull request #1429:
URL: https://github.com/apache/samza/pull/1429#discussion_r594675667



##########
File path: samza-core/src/main/java/org/apache/samza/storage/TaskSideInputStorageManager.java
##########
@@ -211,7 +211,7 @@ private boolean isPersistedStore(String storeName) {
 
   private void validateStoreConfiguration(Map<String, StorageEngine> stores) {
     stores.forEach((storeName, storageEngine) -> {
-      if (storageEngine.getStoreProperties().isLoggedStore()) {
+      if (storageEngine.getStoreProperties().isDurableStore()) {

Review comment:
       This should still be isLoggedStore. Side input stores are durable, this is just checking that they don't have a changelog too (since side input == changelog)




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [samza] dxichen commented on pull request #1429: SAMZA-2591: API updates for TaskStorageBackupManager

Posted by GitBox <gi...@apache.org>.
dxichen commented on pull request #1429:
URL: https://github.com/apache/samza/pull/1429#issuecomment-690771293


   @mynameborat Thanks for the reminderl Added the main design doc to [SAMZA-2591](https://issues.apache.org/jira/browse/SAMZA-2590)
   as well as an excerpt pertaining to this initial PR.


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [samza] dxichen commented on a change in pull request #1429: SAMZA-2591: API updates for TaskStorageBackupManager

Posted by GitBox <gi...@apache.org>.
dxichen commented on a change in pull request #1429:
URL: https://github.com/apache/samza/pull/1429#discussion_r540375692



##########
File path: samza-api/src/main/java/org/apache/samza/checkpoint/Checkpoint.java
##########
@@ -55,18 +82,18 @@ public boolean equals(Object o) {
 
     Checkpoint that = (Checkpoint) o;
 
-    if (offsets != null ? !offsets.equals(that.offsets) : that.offsets != null) return false;
-
-    return true;
+    return (checkpointId.equals(that.checkpointId)) &&
+        (Objects.equals(inputOffsets, that.inputOffsets)) &&

Review comment:
       it performs a deep comparison: https://docs.oracle.com/javase/8/docs/api/java/util/Map.html#equals-java.lang.Object-




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [samza] dxichen commented on a change in pull request #1429: SAMZA-2591: API updates for TaskStorageBackupManager

Posted by GitBox <gi...@apache.org>.
dxichen commented on a change in pull request #1429:
URL: https://github.com/apache/samza/pull/1429#discussion_r540379199



##########
File path: samza-api/src/main/java/org/apache/samza/checkpoint/StateCheckpointMarker.java
##########
@@ -17,21 +17,16 @@
  * under the License.
  */
 
-package org.apache.samza.storage
+package org.apache.samza.checkpoint;
 
-import org.apache.samza.checkpoint.CheckpointId
-import org.apache.samza.system.SystemStreamPartition
+import org.apache.samza.storage.StateBackendFactory;
 
-trait TaskStorageManager {
 
-  def getStore(storeName: String): Option[StorageEngine]
-
-  def flush(): Map[SystemStreamPartition, Option[String]]
-
-  def checkpoint(checkpointId: CheckpointId, newestChangelogOffsets: Map[SystemStreamPartition, Option[String]]): Unit
-
-  def removeOldCheckpoints(checkpointId: CheckpointId): Unit
-
-  def stop(): Unit
+/**
+ * Interface for State Checkpoint Marker for all TaskStorageBackupManagers
+ */
+public interface StateCheckpointMarker {

Review comment:
       KafkaStateCheckpointMarker and BlobStoreCheckpointMarker now implements this




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [samza] dxichen commented on a change in pull request #1429: SAMZA-2591: API updates for TaskStorageBackupManager

Posted by GitBox <gi...@apache.org>.
dxichen commented on a change in pull request #1429:
URL: https://github.com/apache/samza/pull/1429#discussion_r540381478



##########
File path: samza-api/src/main/java/org/apache/samza/storage/StorageEngine.java
##########
@@ -38,6 +42,11 @@
  */
 public interface StorageEngine {
 
+  /**
+   * Initiate storage engine
+   */
+  void init(ContainerContext containerContext, JobContext jobContext, ExternalContext externalContext);

Review comment:
       changed




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [samza] prateekm commented on a change in pull request #1429: SAMZA-2591: API updates for TaskStorageBackupManager

Posted by GitBox <gi...@apache.org>.
prateekm commented on a change in pull request #1429:
URL: https://github.com/apache/samza/pull/1429#discussion_r561204115



##########
File path: samza-api/src/main/java/org/apache/samza/checkpoint/Checkpoint.java
##########
@@ -30,22 +33,53 @@
  * of restarting a failed container within a running job.
  */
 public class Checkpoint {
-  private final Map<SystemStreamPartition, String> offsets;
+  private static final short PROTOCOL_VERSION = 1;
+  private final CheckpointId checkpointId;
+  private final Map<SystemStreamPartition, String> inputOffsets;
+  private final Map<String, List<StateCheckpointMarker>> stateCheckpoints;
 
   /**
-   * Constructs a new checkpoint based off a map of Samza stream offsets.
-   * @param offsets Map of Samza streams to their current offset.
+   * Constructs a new checkpoint based off a map of Samza stream offsets, using a default checkpoint id
+   * @param inputOffsets Map of Samza streams to their current offset.
    */
-  public Checkpoint(Map<SystemStreamPartition, String> offsets) {
-    this.offsets = offsets;
+  public Checkpoint(Map<SystemStreamPartition, String>  inputOffsets) {
+    this(CheckpointId.getPlaceholderCheckpointId(), inputOffsets, new HashMap<>());
+  }
+
+  /**
+   * Constructs the checkpoint with separated input and state offsets
+   * @param id CheckpointId associated with this checkpoint
+   * @param inputOffsets Map of Samza system stream partition to offset of the checkpoint
+   * @param stateCheckpoints Map of local state store names and StateCheckpointMarkers for each state backend system
+   */
+  public Checkpoint(CheckpointId id, Map<SystemStreamPartition, String> inputOffsets, Map<String, List<StateCheckpointMarker>> stateCheckpoints) {
+    this.checkpointId = id;
+    this.inputOffsets = inputOffsets;
+    this.stateCheckpoints = stateCheckpoints;

Review comment:
       Create an immutable copy of the map.

##########
File path: samza-api/src/main/java/org/apache/samza/checkpoint/Checkpoint.java
##########
@@ -30,22 +33,53 @@
  * of restarting a failed container within a running job.
  */
 public class Checkpoint {
-  private final Map<SystemStreamPartition, String> offsets;
+  private static final short PROTOCOL_VERSION = 1;
+  private final CheckpointId checkpointId;
+  private final Map<SystemStreamPartition, String> inputOffsets;
+  private final Map<String, List<StateCheckpointMarker>> stateCheckpoints;
 
   /**
-   * Constructs a new checkpoint based off a map of Samza stream offsets.
-   * @param offsets Map of Samza streams to their current offset.
+   * Constructs a new checkpoint based off a map of Samza stream offsets, using a default checkpoint id
+   * @param inputOffsets Map of Samza streams to their current offset.
    */
-  public Checkpoint(Map<SystemStreamPartition, String> offsets) {
-    this.offsets = offsets;
+  public Checkpoint(Map<SystemStreamPartition, String>  inputOffsets) {
+    this(CheckpointId.getPlaceholderCheckpointId(), inputOffsets, new HashMap<>());
+  }
+
+  /**
+   * Constructs the checkpoint with separated input and state offsets
+   * @param id CheckpointId associated with this checkpoint
+   * @param inputOffsets Map of Samza system stream partition to offset of the checkpoint
+   * @param stateCheckpoints Map of local state store names and StateCheckpointMarkers for each state backend system
+   */
+  public Checkpoint(CheckpointId id, Map<SystemStreamPartition, String> inputOffsets, Map<String, List<StateCheckpointMarker>> stateCheckpoints) {
+    this.checkpointId = id;
+    this.inputOffsets = inputOffsets;
+    this.stateCheckpoints = stateCheckpoints;
+  }
+
+  /**
+   * Gets the checkpoint id for the checkpoint
+   * @return The timestamp based checkpoint identifier associated with the checkpoint
+   */
+  public CheckpointId getCheckpointId() {
+    return checkpointId;
   }
 
   /**
    * Gets a unmodifiable view of the current Samza stream offsets.
    * @return A unmodifiable view of a Map of Samza streams to their recorded offsets.
    */
-  public Map<SystemStreamPartition, String> getOffsets() {
-    return Collections.unmodifiableMap(offsets);
+  public Map<SystemStreamPartition, String> getInputOffsets() {
+    return Collections.unmodifiableMap(inputOffsets);
+  }
+
+  /**
+   * Gets the stateCheckpointMarkers
+   * @return The state checkpoint markers for the checkpoint
+   */
+  public Map<String, List<StateCheckpointMarker>> getStateCheckpointMarkers() {
+    return stateCheckpoints;

Review comment:
       Minor: Return immutable copy.

##########
File path: samza-api/src/main/java/org/apache/samza/checkpoint/CheckpointId.java
##########
@@ -41,6 +41,14 @@ public CheckpointId(long millis, long nanos) {
     this.nanos = nanos;

Review comment:
       Can this be a private constructor so that the only way to create checkpoint IDs is through the public static create() method?

##########
File path: samza-api/src/main/java/org/apache/samza/checkpoint/StateCheckpointMarkerSerde.java
##########
@@ -0,0 +1,60 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.samza.checkpoint;
+
+import org.apache.commons.lang3.StringUtils;
+import org.apache.samza.SamzaException;
+
+
+public class StateCheckpointMarkerSerde<T extends StateCheckpointMarker> {
+  private static final short PROTOCOL_VERSION = 1;
+  private static final String SCM_SEPARATOR = ":";
+
+  public String serialize(T payload, StateCheckpointPayloadSerde<T> serde) {
+    StringBuilder builder = new StringBuilder();
+    builder.append(PROTOCOL_VERSION);
+    builder.append(SCM_SEPARATOR);
+    builder.append(serde.getClass().getName());
+    builder.append(SCM_SEPARATOR);
+    builder.append(serde.serialize(payload));
+
+    return builder.toString();
+  }
+
+  public T deserializePayload(String serializedSCM) {
+    if (StringUtils.isBlank(serializedSCM)) {
+      throw new IllegalArgumentException("Invalid remote store checkpoint message: " + serializedSCM);
+    }
+    String[] parts = serializedSCM.split(SCM_SEPARATOR);
+    if (parts.length != 3) {
+      throw new IllegalArgumentException("Invalid RemoteStore Metadata offset: " + serializedSCM);

Review comment:
       s/RemoteStore Metadata offset/state checkpoint marker

##########
File path: samza-api/src/main/java/org/apache/samza/checkpoint/StateCheckpointMarkerSerde.java
##########
@@ -0,0 +1,60 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.samza.checkpoint;
+
+import org.apache.commons.lang3.StringUtils;
+import org.apache.samza.SamzaException;
+
+
+public class StateCheckpointMarkerSerde<T extends StateCheckpointMarker> {
+  private static final short PROTOCOL_VERSION = 1;
+  private static final String SCM_SEPARATOR = ":";
+
+  public String serialize(T payload, StateCheckpointPayloadSerde<T> serde) {
+    StringBuilder builder = new StringBuilder();
+    builder.append(PROTOCOL_VERSION);
+    builder.append(SCM_SEPARATOR);
+    builder.append(serde.getClass().getName());
+    builder.append(SCM_SEPARATOR);
+    builder.append(serde.serialize(payload));
+
+    return builder.toString();
+  }
+
+  public T deserializePayload(String serializedSCM) {
+    if (StringUtils.isBlank(serializedSCM)) {
+      throw new IllegalArgumentException("Invalid remote store checkpoint message: " + serializedSCM);
+    }
+    String[] parts = serializedSCM.split(SCM_SEPARATOR);

Review comment:
       The serialized payload on line 36 may contain the separator as well. It's better to split into fixed length array (3 parts), and let the payload serde throw an exception if parts[2]  cannot be parsed correctly.

##########
File path: samza-api/src/main/java/org/apache/samza/storage/TaskBackupManager.java
##########
@@ -0,0 +1,81 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.samza.storage;
+
+import java.util.Map;
+import java.util.concurrent.CompletableFuture;
+
+import org.apache.samza.checkpoint.CheckpointId;
+import org.apache.samza.checkpoint.StateCheckpointMarker;
+
+/**
+ * <p>
+ * TaskBackupManager is the interface that must be implemented for
+ * any remote system that Samza persists its state to. The interface will be
+ * evoked in the following way:
+ * </p>
+ *
+ * <ul>
+ *   <li>Snapshot will be called before Upload.</li>
+ *   <li>persistToFilesystem will be called after Upload is completed</li>
+ *   <li>Cleanup is only called after Upload and persistToFilesystem has successfully completed</li>
+ * </ul>
+ */
+public interface TaskBackupManager {
+
+  /**
+   * Initiates the TaskBackupManagerIntance
+   */
+  default void init() {}

Review comment:
       Why default?

##########
File path: samza-api/src/main/java/org/apache/samza/checkpoint/Checkpoint.java
##########
@@ -30,22 +33,53 @@
  * of restarting a failed container within a running job.
  */
 public class Checkpoint {
-  private final Map<SystemStreamPartition, String> offsets;
+  private static final short PROTOCOL_VERSION = 1;
+  private final CheckpointId checkpointId;
+  private final Map<SystemStreamPartition, String> inputOffsets;
+  private final Map<String, List<StateCheckpointMarker>> stateCheckpoints;
 
   /**
-   * Constructs a new checkpoint based off a map of Samza stream offsets.
-   * @param offsets Map of Samza streams to their current offset.
+   * Constructs a new checkpoint based off a map of Samza stream offsets, using a default checkpoint id
+   * @param inputOffsets Map of Samza streams to their current offset.
    */
-  public Checkpoint(Map<SystemStreamPartition, String> offsets) {
-    this.offsets = offsets;
+  public Checkpoint(Map<SystemStreamPartition, String>  inputOffsets) {
+    this(CheckpointId.getPlaceholderCheckpointId(), inputOffsets, new HashMap<>());

Review comment:
       Why a fake placeholder id instead of valid but unused id?

##########
File path: samza-api/src/main/java/org/apache/samza/storage/kv/KeyValueStore.java
##########
@@ -35,6 +38,16 @@
  * @param <V> the type of values maintained by this key-value store.
  */
 public interface KeyValueStore<K, V> {
+
+  /**
+   * Initiates the KeyValueStore
+   *
+   * @param containerContext context of the KeyValueStore's container
+   * @param jobContext context of the job the KeyValueStore is in
+   * @param externalContext any external store required for initialization
+   */
+  default void init(ContainerContext containerContext, JobContext jobContext, ExternalContext externalContext) { }

Review comment:
       Minor: Reverse order (External, Job, Container) to match scope.

##########
File path: samza-core/src/main/scala/org/apache/samza/container/TaskInstance.scala
##########
@@ -236,42 +240,33 @@ class TaskInstance(
       tableManager.flush()
     }
 
-    var newestChangelogOffsets: Map[SystemStreamPartition, Option[String]] = null
-    if (storageManager != null) {
-      trace("Flushing state stores for taskName: %s" format taskName)
-      newestChangelogOffsets = storageManager.flush()
-      trace("Got newest changelog offsets for taskName: %s as: %s " format(taskName, newestChangelogOffsets))
-    }
-
     val checkpointId = CheckpointId.create()
-    if (storageManager != null && newestChangelogOffsets != null) {
-      trace("Checkpointing stores for taskName: %s with checkpoint id: %s" format (taskName, checkpointId))
-      storageManager.checkpoint(checkpointId, newestChangelogOffsets.toMap)
-    }
-
-    if (newestChangelogOffsets != null) {
-      newestChangelogOffsets.foreach {case (ssp, newestOffsetOption) =>
-        val offset = new CheckpointedChangelogOffset(checkpointId, newestOffsetOption.orNull).toString
-        allCheckpointOffsets.put(ssp, offset)
-      }
+    var newestStateCheckpointMakers: util.Map[String, util.List[StateCheckpointMarker]] = null
+    // Perform state commit
+    if (commitManager != null) {
+      trace("Flushing state stores for taskName: %s" format taskName)

Review comment:
       Update message to mention commit instead of flushing stores. Flushing stores step should go into the commit / storage manager instead.

##########
File path: samza-core/src/main/scala/org/apache/samza/container/TaskInstance.scala
##########
@@ -236,42 +240,33 @@ class TaskInstance(
       tableManager.flush()
     }
 
-    var newestChangelogOffsets: Map[SystemStreamPartition, Option[String]] = null
-    if (storageManager != null) {
-      trace("Flushing state stores for taskName: %s" format taskName)
-      newestChangelogOffsets = storageManager.flush()
-      trace("Got newest changelog offsets for taskName: %s as: %s " format(taskName, newestChangelogOffsets))
-    }
-
     val checkpointId = CheckpointId.create()
-    if (storageManager != null && newestChangelogOffsets != null) {
-      trace("Checkpointing stores for taskName: %s with checkpoint id: %s" format (taskName, checkpointId))
-      storageManager.checkpoint(checkpointId, newestChangelogOffsets.toMap)
-    }
-
-    if (newestChangelogOffsets != null) {
-      newestChangelogOffsets.foreach {case (ssp, newestOffsetOption) =>
-        val offset = new CheckpointedChangelogOffset(checkpointId, newestOffsetOption.orNull).toString
-        allCheckpointOffsets.put(ssp, offset)
-      }
+    var newestStateCheckpointMakers: util.Map[String, util.List[StateCheckpointMarker]] = null
+    // Perform state commit
+    if (commitManager != null) {
+      trace("Flushing state stores for taskName: %s" format taskName)
+      newestStateCheckpointMakers = commitManager.commit(taskName, checkpointId)
+      trace("Got newest state checkpoint markers for taskName: %s as: %s " format(taskName, newestStateCheckpointMakers))
     }
-    val checkpoint = new Checkpoint(allCheckpointOffsets)
+    val checkpoint = new Checkpoint(checkpointId, allCheckpointOffsets, newestStateCheckpointMakers)
     trace("Got combined checkpoint offsets for taskName: %s as: %s" format (taskName, allCheckpointOffsets))
 
+    // Commit state and input checkpoints offsets atomically

Review comment:
       Instead of 'commit' which is overloaded in this context, let's explain that this is "Write input offsets and state checkpoint markers to the checkpoint topic atomically"

##########
File path: samza-core/src/main/scala/org/apache/samza/container/TaskInstance.scala
##########
@@ -236,42 +240,33 @@ class TaskInstance(
       tableManager.flush()
     }
 
-    var newestChangelogOffsets: Map[SystemStreamPartition, Option[String]] = null
-    if (storageManager != null) {
-      trace("Flushing state stores for taskName: %s" format taskName)
-      newestChangelogOffsets = storageManager.flush()
-      trace("Got newest changelog offsets for taskName: %s as: %s " format(taskName, newestChangelogOffsets))
-    }
-
     val checkpointId = CheckpointId.create()
-    if (storageManager != null && newestChangelogOffsets != null) {
-      trace("Checkpointing stores for taskName: %s with checkpoint id: %s" format (taskName, checkpointId))
-      storageManager.checkpoint(checkpointId, newestChangelogOffsets.toMap)
-    }
-
-    if (newestChangelogOffsets != null) {
-      newestChangelogOffsets.foreach {case (ssp, newestOffsetOption) =>
-        val offset = new CheckpointedChangelogOffset(checkpointId, newestOffsetOption.orNull).toString
-        allCheckpointOffsets.put(ssp, offset)
-      }
+    var newestStateCheckpointMakers: util.Map[String, util.List[StateCheckpointMarker]] = null
+    // Perform state commit
+    if (commitManager != null) {
+      trace("Flushing state stores for taskName: %s" format taskName)
+      newestStateCheckpointMakers = commitManager.commit(taskName, checkpointId)
+      trace("Got newest state checkpoint markers for taskName: %s as: %s " format(taskName, newestStateCheckpointMakers))
     }
-    val checkpoint = new Checkpoint(allCheckpointOffsets)
+    val checkpoint = new Checkpoint(checkpointId, allCheckpointOffsets, newestStateCheckpointMakers)
     trace("Got combined checkpoint offsets for taskName: %s as: %s" format (taskName, allCheckpointOffsets))
 
+    // Commit state and input checkpoints offsets atomically
     offsetManager.writeCheckpoint(taskName, checkpoint)
 
-    if (storageManager != null) {
+    // Perform cleanup on unused checkpoints
+    if (commitManager != null) {
       trace("Remove old checkpoint stores for taskName: %s" format taskName)

Review comment:
       Generalize message and just say 'Cleaning up old checkpoint state'. Also include checkpointId in the trace message.

##########
File path: samza-core/src/main/scala/org/apache/samza/container/TaskInstance.scala
##########
@@ -236,42 +240,33 @@ class TaskInstance(
       tableManager.flush()
     }
 
-    var newestChangelogOffsets: Map[SystemStreamPartition, Option[String]] = null
-    if (storageManager != null) {
-      trace("Flushing state stores for taskName: %s" format taskName)
-      newestChangelogOffsets = storageManager.flush()
-      trace("Got newest changelog offsets for taskName: %s as: %s " format(taskName, newestChangelogOffsets))
-    }
-
     val checkpointId = CheckpointId.create()
-    if (storageManager != null && newestChangelogOffsets != null) {
-      trace("Checkpointing stores for taskName: %s with checkpoint id: %s" format (taskName, checkpointId))
-      storageManager.checkpoint(checkpointId, newestChangelogOffsets.toMap)
-    }
-
-    if (newestChangelogOffsets != null) {
-      newestChangelogOffsets.foreach {case (ssp, newestOffsetOption) =>
-        val offset = new CheckpointedChangelogOffset(checkpointId, newestOffsetOption.orNull).toString
-        allCheckpointOffsets.put(ssp, offset)
-      }
+    var newestStateCheckpointMakers: util.Map[String, util.List[StateCheckpointMarker]] = null
+    // Perform state commit
+    if (commitManager != null) {
+      trace("Flushing state stores for taskName: %s" format taskName)
+      newestStateCheckpointMakers = commitManager.commit(taskName, checkpointId)
+      trace("Got newest state checkpoint markers for taskName: %s as: %s " format(taskName, newestStateCheckpointMakers))
     }
-    val checkpoint = new Checkpoint(allCheckpointOffsets)
+    val checkpoint = new Checkpoint(checkpointId, allCheckpointOffsets, newestStateCheckpointMakers)
     trace("Got combined checkpoint offsets for taskName: %s as: %s" format (taskName, allCheckpointOffsets))
 
+    // Commit state and input checkpoints offsets atomically
     offsetManager.writeCheckpoint(taskName, checkpoint)
 
-    if (storageManager != null) {
+    // Perform cleanup on unused checkpoints
+    if (commitManager != null) {

Review comment:
       Can this null check be removed?

##########
File path: samza-core/src/main/scala/org/apache/samza/storage/TaskStorageCommitManager.java
##########
@@ -0,0 +1,99 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.samza.storage;
+
+import java.util.ArrayList;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.concurrent.CompletableFuture;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.TimeoutException;
+
+import org.apache.samza.SamzaException;
+import org.apache.samza.checkpoint.CheckpointId;
+import org.apache.samza.checkpoint.StateCheckpointMarker;
+import org.apache.samza.container.TaskName;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+
+public class TaskStorageCommitManager {
+
+  private static final Logger LOG = LoggerFactory.getLogger(TaskStorageCommitManager.class);
+  private static final long COMMIT_TIMEOUT_MS = 30000;
+  private final TaskBackupManager storageBackupManager;
+
+  public TaskStorageCommitManager(TaskBackupManager storageBackupManager) {
+    this.storageBackupManager = storageBackupManager;
+  }
+
+  /**
+   * Commits the local state on the remote backup implementation
+   * @return Committed StoreName to StateCheckpointMarker mappings of the committed SSPs
+   */
+  public Map<String, List<StateCheckpointMarker>> commit(TaskName taskName, CheckpointId checkpointId) {
+    Map<String, StateCheckpointMarker> snapshot = storageBackupManager.snapshot(checkpointId);
+    CompletableFuture<Map<String, StateCheckpointMarker>>
+        uploadFuture = storageBackupManager.upload(checkpointId, snapshot);
+
+    try {
+      // TODO: Make async with andThen and add thread management for concurrency
+      Map<String, StateCheckpointMarker> uploadMap = uploadFuture.get(COMMIT_TIMEOUT_MS, TimeUnit.MILLISECONDS);
+      if (uploadMap != null) {
+        LOG.trace("Checkpointing stores for taskName: {}} with checkpoint id: {}", taskName, checkpointId);
+        storageBackupManager.persistToFilesystem(checkpointId, uploadMap);
+      }
+
+      // TODO: call commit on multiple backup managers when available

Review comment:
       I'd recommend handling this (multiple backup managers per store) now to make sure all the interfaces, logging etc are general and consistent.

##########
File path: samza-kafka/src/main/java/org/apache/samza/storage/KafkaChangelogStateBackendFactory.java
##########
@@ -0,0 +1,127 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.samza.storage;
+
+import java.io.File;
+import java.time.Duration;
+import java.util.Collections;
+import java.util.Map;
+import java.util.Set;
+import org.apache.samza.SamzaException;
+import org.apache.samza.config.Config;
+import org.apache.samza.config.JobConfig;
+import org.apache.samza.config.StorageConfig;
+import org.apache.samza.config.TaskConfig;
+import org.apache.samza.container.SamzaContainer;
+import org.apache.samza.job.model.ContainerModel;
+import org.apache.samza.job.model.JobModel;
+import org.apache.samza.job.model.TaskModel;
+import org.apache.samza.system.SSPMetadataCache;
+import org.apache.samza.system.StreamMetadataCache;
+import org.apache.samza.system.SystemAdmins;
+import org.apache.samza.system.SystemStream;
+import org.apache.samza.system.SystemStreamPartition;
+import org.apache.samza.util.Clock;
+
+
+public class KafkaChangelogStateBackendFactory implements StateBackendFactory {
+  private static StreamMetadataCache streamCache;
+  private static SSPMetadataCache sspCache;
+
+  private static StreamMetadataCache getStreamCache(SystemAdmins admins, Clock clock) {
+    if (streamCache == null) {
+      streamCache = new StreamMetadataCache(admins, 5000, clock);
+    }
+    return streamCache;
+  }
+
+  private static SSPMetadataCache getSspCache(SystemAdmins admins, Clock clock, Set<SystemStreamPartition> ssps) {
+    if (sspCache == null) {
+      sspCache = new SSPMetadataCache(admins, Duration.ofSeconds(5), clock, ssps);
+    }
+    return sspCache;
+  }
+
+  @Override
+  public TaskBackupManager getBackupManager(JobModel jobModel, ContainerModel containerModel, TaskModel taskModel,
+      Map<String, StorageEngine> taskStores, Config config, Clock clock) {
+    SystemAdmins systemAdmins = new SystemAdmins(config);
+    StorageConfig storageConfig = new StorageConfig(config);
+    Map<String, SystemStream> storeChangelogs = storageConfig.getStoreChangelogs();
+
+    File defaultFileDir = new File(System.getProperty("user.dir"), "state");

Review comment:
       s/defaultFileDir/defaultStoreBaseDir

##########
File path: samza-api/src/main/java/org/apache/samza/checkpoint/KafkaStateChangelogOffset.java
##########
@@ -26,57 +26,58 @@
  * Checkpointed changelog offset has the format: [checkpointId, offset], separated by a colon.
  */
 @InterfaceStability.Unstable
-public class CheckpointedChangelogOffset {
+public class KafkaStateChangelogOffset {

Review comment:
       Can this be removed in this PR or do we need to keep this around for backwards compatibility?

##########
File path: samza-kafka/src/main/scala/org/apache/samza/storage/KafkaNonTransactionalStateTaskBackupManager.scala
##########
@@ -20,62 +20,70 @@
 package org.apache.samza.storage
 
 import java.io._
+import java.util
+import java.util.concurrent.CompletableFuture
 
 import com.google.common.annotations.VisibleForTesting
 import com.google.common.collect.ImmutableSet
-import org.apache.samza.checkpoint.CheckpointId
+import org.apache.samza.checkpoint.kafka.KafkaStateCheckpointMarker
+import org.apache.samza.checkpoint.{CheckpointId, StateCheckpointMarker}
 import org.apache.samza.container.TaskName
 import org.apache.samza.job.model.TaskMode
 import org.apache.samza.system._
 import org.apache.samza.util.Logging
-import org.apache.samza.util.ScalaJavaUtil.JavaOptionals
 import org.apache.samza.{Partition, SamzaException}
 
+import scala.collection.JavaConversions.mapAsScalaMap
 import scala.collection.JavaConverters._
+import scala.collection.mutable
 
 /**
  * Manage all the storage engines for a given task
  */
-class NonTransactionalStateTaskStorageManager(
+class KafkaNonTransactionalStateTaskBackupManager(
   taskName: TaskName,
-  containerStorageManager: ContainerStorageManager,
-  storeChangelogs: Map[String, SystemStream] = Map(),
+  taskStores: util.Map[String, StorageEngine],
+  storeChangelogs: util.Map[String, SystemStream] = new util.HashMap[String, SystemStream](),
   systemAdmins: SystemAdmins,
   loggedStoreBaseDir: File = new File(System.getProperty("user.dir"), "state"),
-  partition: Partition) extends Logging with TaskStorageManager {
+  partition: Partition) extends Logging with TaskBackupManager {
 
   private val storageManagerUtil = new StorageManagerUtil
-  private val persistedStores = containerStorageManager.getAllStores(taskName).asScala
+  private val persistedStores = taskStores.asScala
     .filter { case (storeName, storageEngine) => storageEngine.getStoreProperties.isPersistedToDisk }
 
-  def getStore(storeName: String): Option[StorageEngine] =  JavaOptionals.toRichOptional(containerStorageManager.getStore(taskName, storeName)).toOption
-
-  def flush(): Map[SystemStreamPartition, Option[String]] = {
+  override def snapshot(checkpointId: CheckpointId): util.Map[String, StateCheckpointMarker] = {
     debug("Flushing stores.")
-    containerStorageManager.getAllStores(taskName).asScala.values.foreach(_.flush)
+    taskStores.asScala.values.foreach(_.flush)
     val newestChangelogSSPOffsets = getNewestChangelogSSPOffsets()
-    writeChangelogOffsetFiles(newestChangelogSSPOffsets)
+    writeChangelogOffsetFiles(KafkaStateCheckpointMarker
+      .stateCheckpointMarkerToSSPmap(newestChangelogSSPOffsets))
     newestChangelogSSPOffsets
   }
 
-  override def checkpoint(checkpointId: CheckpointId,
-    newestChangelogOffsets: Map[SystemStreamPartition, Option[String]]): Unit = {}
+  override def upload(checkpointId: CheckpointId,
+    stateCheckpointMarkers: util.Map[String, StateCheckpointMarker]): CompletableFuture[util.Map[String, StateCheckpointMarker]] = {
+     CompletableFuture.completedFuture(stateCheckpointMarkers)
+  }
+
+  override def persistToFilesystem(checkpointId: CheckpointId,
+    stateCheckpointMarkers: util.Map[String, StateCheckpointMarker]): Unit = {}
 
-  override def removeOldCheckpoints(checkpointId: CheckpointId): Unit = {}
+  override def cleanUp(checkpointId: CheckpointId): Unit = {}
 
   @VisibleForTesting
-  def stop() {
+  def close() {

Review comment:
       override

##########
File path: samza-kafka/src/main/scala/org/apache/samza/storage/KafkaTransactionalStateTaskBackupManager.scala
##########
@@ -73,10 +80,11 @@ class TransactionalStateTaskStorageManager(
       }}
       .toMap
 
-    writeChangelogOffsetFiles(checkpointPaths, storeChangelogs, newestChangelogOffsets)
+    writeChangelogOffsetFiles(checkpointPaths, storeChangelogs,
+       KafkaStateCheckpointMarker.stateCheckpointMarkerToSSPmap(stateCheckpointMarkers).asScala)
   }
 
-  def removeOldCheckpoints(latestCheckpointId: CheckpointId): Unit = {
+  def cleanUp(latestCheckpointId: CheckpointId): Unit = {

Review comment:
       override def. Same for other overriden method.

##########
File path: samza-core/src/main/scala/org/apache/samza/container/TaskInstance.scala
##########
@@ -236,42 +240,33 @@ class TaskInstance(
       tableManager.flush()
     }
 
-    var newestChangelogOffsets: Map[SystemStreamPartition, Option[String]] = null
-    if (storageManager != null) {
-      trace("Flushing state stores for taskName: %s" format taskName)
-      newestChangelogOffsets = storageManager.flush()
-      trace("Got newest changelog offsets for taskName: %s as: %s " format(taskName, newestChangelogOffsets))
-    }
-
     val checkpointId = CheckpointId.create()
-    if (storageManager != null && newestChangelogOffsets != null) {
-      trace("Checkpointing stores for taskName: %s with checkpoint id: %s" format (taskName, checkpointId))
-      storageManager.checkpoint(checkpointId, newestChangelogOffsets.toMap)
-    }
-
-    if (newestChangelogOffsets != null) {
-      newestChangelogOffsets.foreach {case (ssp, newestOffsetOption) =>
-        val offset = new CheckpointedChangelogOffset(checkpointId, newestOffsetOption.orNull).toString
-        allCheckpointOffsets.put(ssp, offset)
-      }
+    var newestStateCheckpointMakers: util.Map[String, util.List[StateCheckpointMarker]] = null

Review comment:
       s/newestStateCheckpointMarker/stateCheckpointMarkers.
   The newest was in the name earlier to indicate that for kafka changelog topics we need to flush them first to get the latest offset. Not relevant in this class  anymore.

##########
File path: samza-api/src/main/java/org/apache/samza/storage/TaskBackupManager.java
##########
@@ -0,0 +1,81 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.samza.storage;
+
+import java.util.Map;
+import java.util.concurrent.CompletableFuture;
+
+import org.apache.samza.checkpoint.CheckpointId;
+import org.apache.samza.checkpoint.StateCheckpointMarker;
+
+/**
+ * <p>
+ * TaskBackupManager is the interface that must be implemented for
+ * any remote system that Samza persists its state to. The interface will be
+ * evoked in the following way:
+ * </p>
+ *
+ * <ul>
+ *   <li>Snapshot will be called before Upload.</li>
+ *   <li>persistToFilesystem will be called after Upload is completed</li>
+ *   <li>Cleanup is only called after Upload and persistToFilesystem has successfully completed</li>
+ * </ul>
+ */
+public interface TaskBackupManager {
+
+  /**
+   * Initiates the TaskBackupManagerIntance
+   */
+  default void init() {}
+
+  /**
+   * Commit operation that is synchronous to processing
+   * @param checkpointId Checkpoint id of the current commit
+   * @return The storename to checkpoint of the snapshotted local store
+   */
+  Map<String, StateCheckpointMarker> snapshot(CheckpointId checkpointId);
+
+  /**
+   * Commit operation that is asynchronous to message processing,
+   * @param checkpointId Checkpoint id of the current commit
+   * @param stateCheckpointMarkers The map of storename to checkpoint makers returned by the snapshot
+   * @return The future of storename to checkpoint map of the uploaded local store
+   */
+  CompletableFuture<Map<String, StateCheckpointMarker>> upload(CheckpointId checkpointId, Map<String, StateCheckpointMarker> stateCheckpointMarkers);
+
+  /**
+   * Persist the state locally to the file system
+   * @param checkpointId The id of the checkpoint to be committed
+   * @param stateCheckpointMarkers Uploaded storename to checkpoints markers to be persisted locally
+   */
+  void persistToFilesystem(CheckpointId checkpointId, Map<String, StateCheckpointMarker> stateCheckpointMarkers);
+
+  /**
+   * Cleanup any local or remote state for obsolete checkpoint information that are older than checkpointId
+   * @param checkpointId The id of the latest successfully committed checkpoint
+   */
+  void cleanUp(CheckpointId checkpointId);
+
+  /**
+   * Used for testing as a shutdown hook to cleanup any allocated resources

Review comment:
       Let's remove "used for testing" and make this a part of the public interface to mirror init.

##########
File path: samza-core/src/main/scala/org/apache/samza/container/TaskInstance.scala
##########
@@ -236,42 +240,33 @@ class TaskInstance(
       tableManager.flush()
     }
 
-    var newestChangelogOffsets: Map[SystemStreamPartition, Option[String]] = null
-    if (storageManager != null) {
-      trace("Flushing state stores for taskName: %s" format taskName)
-      newestChangelogOffsets = storageManager.flush()
-      trace("Got newest changelog offsets for taskName: %s as: %s " format(taskName, newestChangelogOffsets))
-    }
-
     val checkpointId = CheckpointId.create()
-    if (storageManager != null && newestChangelogOffsets != null) {
-      trace("Checkpointing stores for taskName: %s with checkpoint id: %s" format (taskName, checkpointId))
-      storageManager.checkpoint(checkpointId, newestChangelogOffsets.toMap)
-    }
-
-    if (newestChangelogOffsets != null) {
-      newestChangelogOffsets.foreach {case (ssp, newestOffsetOption) =>
-        val offset = new CheckpointedChangelogOffset(checkpointId, newestOffsetOption.orNull).toString
-        allCheckpointOffsets.put(ssp, offset)
-      }
+    var newestStateCheckpointMakers: util.Map[String, util.List[StateCheckpointMarker]] = null

Review comment:
       Will the commit manager ever be null? If not, instead of a mutable nullable variable, just return an empty map from commitManager if there are no state checkpoint markers.

##########
File path: samza-core/src/main/scala/org/apache/samza/container/SamzaContainer.scala
##########
@@ -52,7 +52,8 @@ import org.apache.samza.util.{Util, _}
 import org.apache.samza.SamzaException
 import org.apache.samza.clustermanager.StandbyTaskUtil
 
-import scala.collection.JavaConversions
+import scala.collection.JavaConversions.mapAsScalaMap

Review comment:
       JavaConversions is deprecated. User JavaConverters instead.
   
   https://stackoverflow.com/questions/8301947/what-is-the-difference-between-javaconverters-and-javaconversions-in-scala

##########
File path: samza-kafka/src/main/java/org/apache/samza/checkpoint/kafka/KafkaStateCheckpointMarker.java
##########
@@ -0,0 +1,118 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.samza.checkpoint.kafka;
+
+
+import java.util.HashMap;
+import java.util.Map;
+import java.util.Objects;
+import org.apache.commons.lang3.StringUtils;
+import org.apache.samza.Partition;
+import org.apache.samza.annotation.InterfaceStability;
+import org.apache.samza.checkpoint.StateCheckpointMarker;
+import org.apache.samza.storage.KafkaChangelogStateBackendFactory;
+import org.apache.samza.system.SystemStreamPartition;
+import scala.Option;
+
+
+@InterfaceStability.Unstable
+public class KafkaStateCheckpointMarker implements StateCheckpointMarker {
+  private static final short PROTO_VERSION = 1;
+  private static final String FACTORY_NAME = KafkaChangelogStateBackendFactory.class.getName();
+  private static final String SEPARATOR = ";";
+
+  // One offset per SSP
+  private final SystemStreamPartition ssp;
+  private final String changelogOffset;
+
+  public KafkaStateCheckpointMarker(SystemStreamPartition ssp, String changelogOffset) {
+    this.ssp = ssp;
+    this.changelogOffset = changelogOffset;
+  }
+
+  public static short getProtocolVersion() {
+    return PROTO_VERSION;
+  }
+
+  public SystemStreamPartition getSsp() {
+    return ssp;
+  }
+
+  public String getChangelogOffset() {
+    return changelogOffset;
+  }
+
+  @Override
+  public String getFactoryName() {
+    return FACTORY_NAME;
+  }
+
+  @Override
+  public boolean equals(Object o) {
+    if (this == o) return true;
+    if (o == null || getClass() != o.getClass()) return false;
+    KafkaStateCheckpointMarker that = (KafkaStateCheckpointMarker) o;
+    return Objects.equals(ssp, that.ssp) &&
+        Objects.equals(changelogOffset, that.changelogOffset);
+  }
+
+  @Override
+  public int hashCode() {
+    return Objects.hash(PROTO_VERSION, ssp, changelogOffset);
+  }
+
+  @Override
+  public String toString() {
+    return String.format("%s%s%s%s%s%s%s%s%s", PROTO_VERSION, SEPARATOR,
+      ssp.getSystem(), SEPARATOR, ssp.getStream(), SEPARATOR, ssp.getPartition(), SEPARATOR, changelogOffset);
+  }
+
+  public static KafkaStateCheckpointMarker fromString(String message) {
+    if (StringUtils.isBlank(message)) {
+      throw new IllegalArgumentException("Invalid KafkaStateCheckpointMarker format: " + message);
+    }
+    String[] payload = message.split(KafkaStateCheckpointMarker.SEPARATOR);
+    if (payload.length != 5) {
+      throw new IllegalArgumentException("Invalid KafkaStateCheckpointMarker argument count: " + message);
+    }
+    if (Short.parseShort(payload[0]) != PROTO_VERSION) {
+      throw new IllegalArgumentException("Invalid KafkaStateCheckpointMarker protocol version: " + message);
+    }
+    Partition partition = new Partition(Integer.parseInt(payload[3]));
+    String offset = null;
+    if (!"null".equals(payload[4])) {
+      offset = payload[4];
+    }
+
+    return new KafkaStateCheckpointMarker(new SystemStreamPartition(payload[1], payload[2], partition), offset);
+  }
+
+  public static Map<SystemStreamPartition, Option<String>> stateCheckpointMarkerToSSPmap(Map<String, StateCheckpointMarker> markers) {

Review comment:
       Javadoc for public method.

##########
File path: samza-core/src/main/scala/org/apache/samza/container/TaskInstance.scala
##########
@@ -224,8 +228,8 @@ class TaskInstance(
     val allCheckpointOffsets = new java.util.HashMap[SystemStreamPartition, String]()
     val inputCheckpoint = offsetManager.buildCheckpoint(taskName)
     if (inputCheckpoint != null) {
-      trace("Got input offsets for taskName: %s as: %s" format(taskName, inputCheckpoint.getOffsets))
-      allCheckpointOffsets.putAll(inputCheckpoint.getOffsets)
+      trace("Got input offsets for taskName: %s as: %s" format(taskName, inputCheckpoint.getInputOffsets))
+      allCheckpointOffsets.putAll(inputCheckpoint.getInputOffsets)

Review comment:
       It looks like we don't need a separate allCheckpointOffsets map anymore and inputCheckpoint will suffice.

##########
File path: samza-core/src/main/scala/org/apache/samza/storage/TaskStorageCommitManager.java
##########
@@ -0,0 +1,99 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.samza.storage;
+
+import java.util.ArrayList;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.concurrent.CompletableFuture;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.TimeoutException;
+
+import org.apache.samza.SamzaException;
+import org.apache.samza.checkpoint.CheckpointId;
+import org.apache.samza.checkpoint.StateCheckpointMarker;
+import org.apache.samza.container.TaskName;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+
+public class TaskStorageCommitManager {
+
+  private static final Logger LOG = LoggerFactory.getLogger(TaskStorageCommitManager.class);
+  private static final long COMMIT_TIMEOUT_MS = 30000;
+  private final TaskBackupManager storageBackupManager;
+
+  public TaskStorageCommitManager(TaskBackupManager storageBackupManager) {
+    this.storageBackupManager = storageBackupManager;

Review comment:
       Does this need to be a per store map?

##########
File path: samza-core/src/main/scala/org/apache/samza/storage/TaskStorageCommitManager.java
##########
@@ -0,0 +1,99 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.samza.storage;
+
+import java.util.ArrayList;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.concurrent.CompletableFuture;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.TimeoutException;
+
+import org.apache.samza.SamzaException;
+import org.apache.samza.checkpoint.CheckpointId;
+import org.apache.samza.checkpoint.StateCheckpointMarker;
+import org.apache.samza.container.TaskName;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+
+public class TaskStorageCommitManager {
+
+  private static final Logger LOG = LoggerFactory.getLogger(TaskStorageCommitManager.class);
+  private static final long COMMIT_TIMEOUT_MS = 30000;
+  private final TaskBackupManager storageBackupManager;
+
+  public TaskStorageCommitManager(TaskBackupManager storageBackupManager) {
+    this.storageBackupManager = storageBackupManager;
+  }
+
+  /**
+   * Commits the local state on the remote backup implementation
+   * @return Committed StoreName to StateCheckpointMarker mappings of the committed SSPs
+   */
+  public Map<String, List<StateCheckpointMarker>> commit(TaskName taskName, CheckpointId checkpointId) {
+    Map<String, StateCheckpointMarker> snapshot = storageBackupManager.snapshot(checkpointId);
+    CompletableFuture<Map<String, StateCheckpointMarker>>
+        uploadFuture = storageBackupManager.upload(checkpointId, snapshot);
+
+    try {
+      // TODO: Make async with andThen and add thread management for concurrency
+      Map<String, StateCheckpointMarker> uploadMap = uploadFuture.get(COMMIT_TIMEOUT_MS, TimeUnit.MILLISECONDS);
+      if (uploadMap != null) {
+        LOG.trace("Checkpointing stores for taskName: {}} with checkpoint id: {}", taskName, checkpointId);
+        storageBackupManager.persistToFilesystem(checkpointId, uploadMap);
+      }
+
+      // TODO: call commit on multiple backup managers when available
+      return mergeCheckpoints(uploadMap);
+    } catch (TimeoutException e) {
+      throw new SamzaException("Upload timed out, commitTimeoutMs: " + COMMIT_TIMEOUT_MS, e);
+    } catch (Exception e) {
+      throw new SamzaException("Upload commit portion could not be completed", e);

Review comment:
       Include store and task information in error message.

##########
File path: samza-core/src/main/scala/org/apache/samza/container/TaskInstance.scala
##########
@@ -73,8 +76,9 @@ class TaskInstance(
 
   private val kvStoreSupplier = ScalaJavaUtil.toJavaFunction(
     (storeName: String) => {
-      if (storageManager != null && storageManager.getStore(storeName).isDefined) {
-        storageManager.getStore(storeName).get.asInstanceOf[KeyValueStore[_, _]]
+      if (containerStorageManager != null) {
+        val storeOption = JavaOptionals.toRichOptional(containerStorageManager.getStore(taskName, storeName)).toOption

Review comment:
       Probably don't need to convert to java optional here just to do isDefined

##########
File path: samza-core/src/main/scala/org/apache/samza/container/TaskInstance.scala
##########
@@ -236,42 +240,33 @@ class TaskInstance(
       tableManager.flush()
     }
 
-    var newestChangelogOffsets: Map[SystemStreamPartition, Option[String]] = null
-    if (storageManager != null) {
-      trace("Flushing state stores for taskName: %s" format taskName)
-      newestChangelogOffsets = storageManager.flush()
-      trace("Got newest changelog offsets for taskName: %s as: %s " format(taskName, newestChangelogOffsets))
-    }
-
     val checkpointId = CheckpointId.create()
-    if (storageManager != null && newestChangelogOffsets != null) {
-      trace("Checkpointing stores for taskName: %s with checkpoint id: %s" format (taskName, checkpointId))
-      storageManager.checkpoint(checkpointId, newestChangelogOffsets.toMap)
-    }
-
-    if (newestChangelogOffsets != null) {
-      newestChangelogOffsets.foreach {case (ssp, newestOffsetOption) =>
-        val offset = new CheckpointedChangelogOffset(checkpointId, newestOffsetOption.orNull).toString
-        allCheckpointOffsets.put(ssp, offset)
-      }
+    var newestStateCheckpointMakers: util.Map[String, util.List[StateCheckpointMarker]] = null
+    // Perform state commit
+    if (commitManager != null) {
+      trace("Flushing state stores for taskName: %s" format taskName)
+      newestStateCheckpointMakers = commitManager.commit(taskName, checkpointId)
+      trace("Got newest state checkpoint markers for taskName: %s as: %s " format(taskName, newestStateCheckpointMakers))
     }
-    val checkpoint = new Checkpoint(allCheckpointOffsets)
+    val checkpoint = new Checkpoint(checkpointId, allCheckpointOffsets, newestStateCheckpointMakers)
     trace("Got combined checkpoint offsets for taskName: %s as: %s" format (taskName, allCheckpointOffsets))

Review comment:
       Update trace message, log state checkpoint markers as well.

##########
File path: samza-kafka/src/main/java/org/apache/samza/storage/KafkaChangelogStateBackendFactory.java
##########
@@ -0,0 +1,127 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.samza.storage;
+
+import java.io.File;
+import java.time.Duration;
+import java.util.Collections;
+import java.util.Map;
+import java.util.Set;
+import org.apache.samza.SamzaException;
+import org.apache.samza.config.Config;
+import org.apache.samza.config.JobConfig;
+import org.apache.samza.config.StorageConfig;
+import org.apache.samza.config.TaskConfig;
+import org.apache.samza.container.SamzaContainer;
+import org.apache.samza.job.model.ContainerModel;
+import org.apache.samza.job.model.JobModel;
+import org.apache.samza.job.model.TaskModel;
+import org.apache.samza.system.SSPMetadataCache;
+import org.apache.samza.system.StreamMetadataCache;
+import org.apache.samza.system.SystemAdmins;
+import org.apache.samza.system.SystemStream;
+import org.apache.samza.system.SystemStreamPartition;
+import org.apache.samza.util.Clock;
+
+
+public class KafkaChangelogStateBackendFactory implements StateBackendFactory {
+  private static StreamMetadataCache streamCache;
+  private static SSPMetadataCache sspCache;
+
+  private static StreamMetadataCache getStreamCache(SystemAdmins admins, Clock clock) {
+    if (streamCache == null) {
+      streamCache = new StreamMetadataCache(admins, 5000, clock);

Review comment:
       Does this need to be thread safe? Same for method below.

##########
File path: samza-kv/src/main/scala/org/apache/samza/storage/kv/KeyValueStorageEngine.scala
##########
@@ -31,6 +31,7 @@ import java.util.Optional
 
 import com.google.common.annotations.VisibleForTesting
 import org.apache.samza.checkpoint.CheckpointId
+import org.apache.samza.context.{ContainerContext, Context, ExternalContext, JobContext}

Review comment:
       Unused import?

##########
File path: samza-kafka/src/main/java/org/apache/samza/storage/KafkaChangelogStateBackendFactory.java
##########
@@ -0,0 +1,127 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.samza.storage;
+
+import java.io.File;
+import java.time.Duration;
+import java.util.Collections;
+import java.util.Map;
+import java.util.Set;
+import org.apache.samza.SamzaException;
+import org.apache.samza.config.Config;
+import org.apache.samza.config.JobConfig;
+import org.apache.samza.config.StorageConfig;
+import org.apache.samza.config.TaskConfig;
+import org.apache.samza.container.SamzaContainer;
+import org.apache.samza.job.model.ContainerModel;
+import org.apache.samza.job.model.JobModel;
+import org.apache.samza.job.model.TaskModel;
+import org.apache.samza.system.SSPMetadataCache;
+import org.apache.samza.system.StreamMetadataCache;
+import org.apache.samza.system.SystemAdmins;
+import org.apache.samza.system.SystemStream;
+import org.apache.samza.system.SystemStreamPartition;
+import org.apache.samza.util.Clock;
+
+
+public class KafkaChangelogStateBackendFactory implements StateBackendFactory {
+  private static StreamMetadataCache streamCache;
+  private static SSPMetadataCache sspCache;
+
+  private static StreamMetadataCache getStreamCache(SystemAdmins admins, Clock clock) {

Review comment:
       Move private methods below public methods.

##########
File path: samza-kafka/src/main/java/org/apache/samza/storage/KafkaChangelogStateBackendFactory.java
##########
@@ -0,0 +1,127 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.samza.storage;
+
+import java.io.File;
+import java.time.Duration;
+import java.util.Collections;
+import java.util.Map;
+import java.util.Set;
+import org.apache.samza.SamzaException;
+import org.apache.samza.config.Config;
+import org.apache.samza.config.JobConfig;
+import org.apache.samza.config.StorageConfig;
+import org.apache.samza.config.TaskConfig;
+import org.apache.samza.container.SamzaContainer;
+import org.apache.samza.job.model.ContainerModel;
+import org.apache.samza.job.model.JobModel;
+import org.apache.samza.job.model.TaskModel;
+import org.apache.samza.system.SSPMetadataCache;
+import org.apache.samza.system.StreamMetadataCache;
+import org.apache.samza.system.SystemAdmins;
+import org.apache.samza.system.SystemStream;
+import org.apache.samza.system.SystemStreamPartition;
+import org.apache.samza.util.Clock;
+
+
+public class KafkaChangelogStateBackendFactory implements StateBackendFactory {
+  private static StreamMetadataCache streamCache;
+  private static SSPMetadataCache sspCache;
+
+  private static StreamMetadataCache getStreamCache(SystemAdmins admins, Clock clock) {
+    if (streamCache == null) {
+      streamCache = new StreamMetadataCache(admins, 5000, clock);
+    }
+    return streamCache;
+  }
+
+  private static SSPMetadataCache getSspCache(SystemAdmins admins, Clock clock, Set<SystemStreamPartition> ssps) {
+    if (sspCache == null) {
+      sspCache = new SSPMetadataCache(admins, Duration.ofSeconds(5), clock, ssps);
+    }
+    return sspCache;
+  }
+
+  @Override
+  public TaskBackupManager getBackupManager(JobModel jobModel, ContainerModel containerModel, TaskModel taskModel,
+      Map<String, StorageEngine> taskStores, Config config, Clock clock) {
+    SystemAdmins systemAdmins = new SystemAdmins(config);
+    StorageConfig storageConfig = new StorageConfig(config);
+    Map<String, SystemStream> storeChangelogs = storageConfig.getStoreChangelogs();
+
+    File defaultFileDir = new File(System.getProperty("user.dir"), "state");
+    File loggedStoreBaseDir = SamzaContainer.getLoggedStorageBaseDir(new JobConfig(config), defaultFileDir);
+
+    if (new TaskConfig(config).getTransactionalStateCheckpointEnabled()) {
+      return new KafkaTransactionalStateTaskBackupManager(taskModel.getTaskName(),
+          taskStores, storeChangelogs, systemAdmins, loggedStoreBaseDir, taskModel.getChangelogPartition(),
+          taskModel.getTaskMode(), new StorageManagerUtil());
+    } else {
+      return new KafkaNonTransactionalStateTaskBackupManager(taskModel.getTaskName(),
+          taskStores, storeChangelogs, systemAdmins, loggedStoreBaseDir, taskModel.getChangelogPartition());
+    }
+  }
+
+  @Override
+  public TaskRestoreManager getRestoreManager(JobModel jobModel, ContainerModel containerModel, TaskModel taskModel, Map<String, StorageEngine> taskStores, Config config, Clock clock) {
+    SystemAdmins systemAdmins = new SystemAdmins(config);
+    Map<String, SystemStream> storeChangelogs = new StorageConfig(config).getStoreChangelogs();
+    Map<String, SystemStream> filteredStoreChangelogs = ContainerStorageManager
+        .getChangelogSystemStreams(containerModel, storeChangelogs, null);
+
+    File defaultFileDir = new File(System.getProperty("user.dir"), "state");
+    File loggedStoreBaseDir = SamzaContainer.getLoggedStorageBaseDir(new JobConfig(config), defaultFileDir);
+    File nonLoggedStoreBaseDir = SamzaContainer.getNonLoggedStorageBaseDir(new JobConfig(config), defaultFileDir);
+
+    if (new TaskConfig(config).getTransactionalStateRestoreEnabled()) {
+      return new TransactionalStateTaskRestoreManager(
+          taskModel,
+          taskStores,
+          filteredStoreChangelogs,
+          systemAdmins,
+          null, // TODO @dchen have the restore managers create and manage Kafka consume lifecycle
+          KafkaChangelogStateBackendFactory
+              .getSspCache(systemAdmins, clock, Collections.emptySet()),
+          loggedStoreBaseDir,
+          nonLoggedStoreBaseDir,
+          config,
+          clock
+      );
+    } else {
+      return new NonTransactionalStateTaskRestoreManager(
+          taskModel,
+          filteredStoreChangelogs,
+          taskStores,
+          systemAdmins,
+          KafkaChangelogStateBackendFactory.getStreamCache(systemAdmins, clock),
+          null, // TODO  @dchen have the restore managers create and manage Kafka consume lifecycle

Review comment:
       What does this mean? Is this required for this PR to work E2E with existing jobs?

##########
File path: samza-api/src/main/java/org/apache/samza/storage/TaskBackupManager.java
##########
@@ -0,0 +1,81 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.samza.storage;
+
+import java.util.Map;
+import java.util.concurrent.CompletableFuture;
+
+import org.apache.samza.checkpoint.CheckpointId;
+import org.apache.samza.checkpoint.StateCheckpointMarker;
+
+/**
+ * <p>
+ * TaskBackupManager is the interface that must be implemented for
+ * any remote system that Samza persists its state to. The interface will be
+ * evoked in the following way:
+ * </p>
+ *
+ * <ul>
+ *   <li>Snapshot will be called before Upload.</li>
+ *   <li>persistToFilesystem will be called after Upload is completed</li>
+ *   <li>Cleanup is only called after Upload and persistToFilesystem has successfully completed</li>
+ * </ul>
+ */
+public interface TaskBackupManager {

Review comment:
       Let's include some more details in the class and method javadocs explaining what these concepts (snapshots, uploads, cleanup etc.) are. Let's assume that the reader has not read the design doc and is not familiar with the new commit process.

##########
File path: samza-core/src/main/scala/org/apache/samza/storage/TaskStorageCommitManager.java
##########
@@ -0,0 +1,99 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.samza.storage;
+
+import java.util.ArrayList;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.concurrent.CompletableFuture;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.TimeoutException;
+
+import org.apache.samza.SamzaException;
+import org.apache.samza.checkpoint.CheckpointId;
+import org.apache.samza.checkpoint.StateCheckpointMarker;
+import org.apache.samza.container.TaskName;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+
+public class TaskStorageCommitManager {
+
+  private static final Logger LOG = LoggerFactory.getLogger(TaskStorageCommitManager.class);
+  private static final long COMMIT_TIMEOUT_MS = 30000;
+  private final TaskBackupManager storageBackupManager;
+
+  public TaskStorageCommitManager(TaskBackupManager storageBackupManager) {
+    this.storageBackupManager = storageBackupManager;
+  }
+
+  /**
+   * Commits the local state on the remote backup implementation
+   * @return Committed StoreName to StateCheckpointMarker mappings of the committed SSPs
+   */
+  public Map<String, List<StateCheckpointMarker>> commit(TaskName taskName, CheckpointId checkpointId) {
+    Map<String, StateCheckpointMarker> snapshot = storageBackupManager.snapshot(checkpointId);
+    CompletableFuture<Map<String, StateCheckpointMarker>>
+        uploadFuture = storageBackupManager.upload(checkpointId, snapshot);
+
+    try {
+      // TODO: Make async with andThen and add thread management for concurrency
+      Map<String, StateCheckpointMarker> uploadMap = uploadFuture.get(COMMIT_TIMEOUT_MS, TimeUnit.MILLISECONDS);

Review comment:
       It'll be better to not include the commit timeout in this version until we have the concurrency handled. Otherwise large commits might timeout and fail the job instead of just stalling processing for some time.

##########
File path: samza-kafka/src/main/scala/org/apache/samza/storage/KafkaNonTransactionalStateTaskBackupManager.scala
##########
@@ -20,62 +20,70 @@
 package org.apache.samza.storage
 
 import java.io._
+import java.util
+import java.util.concurrent.CompletableFuture
 
 import com.google.common.annotations.VisibleForTesting
 import com.google.common.collect.ImmutableSet
-import org.apache.samza.checkpoint.CheckpointId
+import org.apache.samza.checkpoint.kafka.KafkaStateCheckpointMarker
+import org.apache.samza.checkpoint.{CheckpointId, StateCheckpointMarker}
 import org.apache.samza.container.TaskName
 import org.apache.samza.job.model.TaskMode
 import org.apache.samza.system._
 import org.apache.samza.util.Logging
-import org.apache.samza.util.ScalaJavaUtil.JavaOptionals
 import org.apache.samza.{Partition, SamzaException}
 
+import scala.collection.JavaConversions.mapAsScalaMap
 import scala.collection.JavaConverters._
+import scala.collection.mutable
 
 /**
  * Manage all the storage engines for a given task
  */
-class NonTransactionalStateTaskStorageManager(
+class KafkaNonTransactionalStateTaskBackupManager(
   taskName: TaskName,
-  containerStorageManager: ContainerStorageManager,
-  storeChangelogs: Map[String, SystemStream] = Map(),
+  taskStores: util.Map[String, StorageEngine],
+  storeChangelogs: util.Map[String, SystemStream] = new util.HashMap[String, SystemStream](),
   systemAdmins: SystemAdmins,
   loggedStoreBaseDir: File = new File(System.getProperty("user.dir"), "state"),
-  partition: Partition) extends Logging with TaskStorageManager {
+  partition: Partition) extends Logging with TaskBackupManager {
 
   private val storageManagerUtil = new StorageManagerUtil
-  private val persistedStores = containerStorageManager.getAllStores(taskName).asScala
+  private val persistedStores = taskStores.asScala
     .filter { case (storeName, storageEngine) => storageEngine.getStoreProperties.isPersistedToDisk }
 
-  def getStore(storeName: String): Option[StorageEngine] =  JavaOptionals.toRichOptional(containerStorageManager.getStore(taskName, storeName)).toOption
-
-  def flush(): Map[SystemStreamPartition, Option[String]] = {
+  override def snapshot(checkpointId: CheckpointId): util.Map[String, StateCheckpointMarker] = {
     debug("Flushing stores.")
-    containerStorageManager.getAllStores(taskName).asScala.values.foreach(_.flush)
+    taskStores.asScala.values.foreach(_.flush)
     val newestChangelogSSPOffsets = getNewestChangelogSSPOffsets()
-    writeChangelogOffsetFiles(newestChangelogSSPOffsets)
+    writeChangelogOffsetFiles(KafkaStateCheckpointMarker
+      .stateCheckpointMarkerToSSPmap(newestChangelogSSPOffsets))
     newestChangelogSSPOffsets
   }
 
-  override def checkpoint(checkpointId: CheckpointId,
-    newestChangelogOffsets: Map[SystemStreamPartition, Option[String]]): Unit = {}
+  override def upload(checkpointId: CheckpointId,
+    stateCheckpointMarkers: util.Map[String, StateCheckpointMarker]): CompletableFuture[util.Map[String, StateCheckpointMarker]] = {
+     CompletableFuture.completedFuture(stateCheckpointMarkers)
+  }
+
+  override def persistToFilesystem(checkpointId: CheckpointId,

Review comment:
       Why is this a no-op? Isn't this what should be calling `writeChangelogOffsetFiles`?

##########
File path: samza-core/src/main/scala/org/apache/samza/storage/TaskStorageCommitManager.java
##########
@@ -0,0 +1,99 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.samza.storage;
+
+import java.util.ArrayList;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.concurrent.CompletableFuture;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.TimeoutException;
+
+import org.apache.samza.SamzaException;
+import org.apache.samza.checkpoint.CheckpointId;
+import org.apache.samza.checkpoint.StateCheckpointMarker;
+import org.apache.samza.container.TaskName;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+
+public class TaskStorageCommitManager {
+
+  private static final Logger LOG = LoggerFactory.getLogger(TaskStorageCommitManager.class);
+  private static final long COMMIT_TIMEOUT_MS = 30000;
+  private final TaskBackupManager storageBackupManager;
+
+  public TaskStorageCommitManager(TaskBackupManager storageBackupManager) {
+    this.storageBackupManager = storageBackupManager;
+  }
+
+  /**
+   * Commits the local state on the remote backup implementation
+   * @return Committed StoreName to StateCheckpointMarker mappings of the committed SSPs
+   */
+  public Map<String, List<StateCheckpointMarker>> commit(TaskName taskName, CheckpointId checkpointId) {
+    Map<String, StateCheckpointMarker> snapshot = storageBackupManager.snapshot(checkpointId);

Review comment:
       Trace log returned SCMs

##########
File path: samza-core/src/main/scala/org/apache/samza/storage/TaskStorageCommitManager.java
##########
@@ -0,0 +1,99 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.samza.storage;
+
+import java.util.ArrayList;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.concurrent.CompletableFuture;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.TimeoutException;
+
+import org.apache.samza.SamzaException;
+import org.apache.samza.checkpoint.CheckpointId;
+import org.apache.samza.checkpoint.StateCheckpointMarker;
+import org.apache.samza.container.TaskName;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+
+public class TaskStorageCommitManager {
+
+  private static final Logger LOG = LoggerFactory.getLogger(TaskStorageCommitManager.class);
+  private static final long COMMIT_TIMEOUT_MS = 30000;
+  private final TaskBackupManager storageBackupManager;
+
+  public TaskStorageCommitManager(TaskBackupManager storageBackupManager) {
+    this.storageBackupManager = storageBackupManager;
+  }
+
+  /**
+   * Commits the local state on the remote backup implementation
+   * @return Committed StoreName to StateCheckpointMarker mappings of the committed SSPs
+   */
+  public Map<String, List<StateCheckpointMarker>> commit(TaskName taskName, CheckpointId checkpointId) {
+    Map<String, StateCheckpointMarker> snapshot = storageBackupManager.snapshot(checkpointId);
+    CompletableFuture<Map<String, StateCheckpointMarker>>
+        uploadFuture = storageBackupManager.upload(checkpointId, snapshot);
+
+    try {
+      // TODO: Make async with andThen and add thread management for concurrency
+      Map<String, StateCheckpointMarker> uploadMap = uploadFuture.get(COMMIT_TIMEOUT_MS, TimeUnit.MILLISECONDS);
+      if (uploadMap != null) {
+        LOG.trace("Checkpointing stores for taskName: {}} with checkpoint id: {}", taskName, checkpointId);
+        storageBackupManager.persistToFilesystem(checkpointId, uploadMap);
+      }
+
+      // TODO: call commit on multiple backup managers when available
+      return mergeCheckpoints(uploadMap);
+    } catch (TimeoutException e) {
+      throw new SamzaException("Upload timed out, commitTimeoutMs: " + COMMIT_TIMEOUT_MS, e);
+    } catch (Exception e) {
+      throw new SamzaException("Upload commit portion could not be completed", e);
+    }
+  }
+
+  public void cleanUp(CheckpointId checkpointId) {
+    storageBackupManager.cleanUp(checkpointId);
+  }
+
+  private Map<String, List<StateCheckpointMarker>> mergeCheckpoints(Map<String, StateCheckpointMarker>... stateCheckpoints) {
+    if (stateCheckpoints == null || stateCheckpoints.length < 1) {
+      return null;
+    }
+    Map<String, StateCheckpointMarker> firstCheckpoint = stateCheckpoints[0];
+    if (firstCheckpoint == null) {
+      return null;

Review comment:
       Return empty collections instead of nulls wherever applicable. Avoids accidental NPEs.

##########
File path: samza-core/src/main/scala/org/apache/samza/storage/TaskStorageCommitManager.java
##########
@@ -0,0 +1,99 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.samza.storage;
+
+import java.util.ArrayList;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.concurrent.CompletableFuture;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.TimeoutException;
+
+import org.apache.samza.SamzaException;
+import org.apache.samza.checkpoint.CheckpointId;
+import org.apache.samza.checkpoint.StateCheckpointMarker;
+import org.apache.samza.container.TaskName;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+
+public class TaskStorageCommitManager {
+
+  private static final Logger LOG = LoggerFactory.getLogger(TaskStorageCommitManager.class);
+  private static final long COMMIT_TIMEOUT_MS = 30000;
+  private final TaskBackupManager storageBackupManager;
+
+  public TaskStorageCommitManager(TaskBackupManager storageBackupManager) {
+    this.storageBackupManager = storageBackupManager;
+  }
+
+  /**
+   * Commits the local state on the remote backup implementation
+   * @return Committed StoreName to StateCheckpointMarker mappings of the committed SSPs
+   */
+  public Map<String, List<StateCheckpointMarker>> commit(TaskName taskName, CheckpointId checkpointId) {
+    Map<String, StateCheckpointMarker> snapshot = storageBackupManager.snapshot(checkpointId);
+    CompletableFuture<Map<String, StateCheckpointMarker>>
+        uploadFuture = storageBackupManager.upload(checkpointId, snapshot);
+
+    try {
+      // TODO: Make async with andThen and add thread management for concurrency
+      Map<String, StateCheckpointMarker> uploadMap = uploadFuture.get(COMMIT_TIMEOUT_MS, TimeUnit.MILLISECONDS);
+      if (uploadMap != null) {
+        LOG.trace("Checkpointing stores for taskName: {}} with checkpoint id: {}", taskName, checkpointId);

Review comment:
       Clarify log message for what this step is doing ("persisting ..."  instead of more general "checkpointing ...")

##########
File path: samza-kafka/src/main/java/org/apache/samza/checkpoint/kafka/KafkaStateCheckpointMarker.java
##########
@@ -0,0 +1,118 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.samza.checkpoint.kafka;
+
+
+import java.util.HashMap;
+import java.util.Map;
+import java.util.Objects;
+import org.apache.commons.lang3.StringUtils;
+import org.apache.samza.Partition;
+import org.apache.samza.annotation.InterfaceStability;
+import org.apache.samza.checkpoint.StateCheckpointMarker;
+import org.apache.samza.storage.KafkaChangelogStateBackendFactory;
+import org.apache.samza.system.SystemStreamPartition;
+import scala.Option;
+
+
+@InterfaceStability.Unstable
+public class KafkaStateCheckpointMarker implements StateCheckpointMarker {
+  private static final short PROTO_VERSION = 1;
+  private static final String FACTORY_NAME = KafkaChangelogStateBackendFactory.class.getName();
+  private static final String SEPARATOR = ";";
+
+  // One offset per SSP
+  private final SystemStreamPartition ssp;
+  private final String changelogOffset;
+
+  public KafkaStateCheckpointMarker(SystemStreamPartition ssp, String changelogOffset) {
+    this.ssp = ssp;
+    this.changelogOffset = changelogOffset;
+  }
+
+  public static short getProtocolVersion() {
+    return PROTO_VERSION;
+  }
+
+  public SystemStreamPartition getSsp() {
+    return ssp;
+  }
+
+  public String getChangelogOffset() {
+    return changelogOffset;
+  }
+
+  @Override
+  public String getFactoryName() {
+    return FACTORY_NAME;
+  }
+
+  @Override
+  public boolean equals(Object o) {

Review comment:
       Minor: Move equals/hashcode/toString below other public methods.

##########
File path: samza-api/src/main/java/org/apache/samza/checkpoint/Checkpoint.java
##########
@@ -30,22 +33,53 @@
  * of restarting a failed container within a running job.
  */
 public class Checkpoint {
-  private final Map<SystemStreamPartition, String> offsets;
+  private static final short PROTOCOL_VERSION = 1;
+  private final CheckpointId checkpointId;
+  private final Map<SystemStreamPartition, String> inputOffsets;
+  private final Map<String, List<StateCheckpointMarker>> stateCheckpoints;
 
   /**
-   * Constructs a new checkpoint based off a map of Samza stream offsets.
-   * @param offsets Map of Samza streams to their current offset.
+   * Constructs a new checkpoint based off a map of Samza stream offsets, using a default checkpoint id
+   * @param inputOffsets Map of Samza streams to their current offset.
    */
-  public Checkpoint(Map<SystemStreamPartition, String> offsets) {
-    this.offsets = offsets;
+  public Checkpoint(Map<SystemStreamPartition, String>  inputOffsets) {
+    this(CheckpointId.getPlaceholderCheckpointId(), inputOffsets, new HashMap<>());

Review comment:
       Minor: Use Collections.EMPTY_MAP or an empty map so that this cannot be accidentally modified.

##########
File path: samza-kafka/src/main/java/org/apache/samza/storage/KafkaChangelogStateBackendFactory.java
##########
@@ -0,0 +1,127 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.samza.storage;
+
+import java.io.File;
+import java.time.Duration;
+import java.util.Collections;
+import java.util.Map;
+import java.util.Set;
+import org.apache.samza.SamzaException;
+import org.apache.samza.config.Config;
+import org.apache.samza.config.JobConfig;
+import org.apache.samza.config.StorageConfig;
+import org.apache.samza.config.TaskConfig;
+import org.apache.samza.container.SamzaContainer;
+import org.apache.samza.job.model.ContainerModel;
+import org.apache.samza.job.model.JobModel;
+import org.apache.samza.job.model.TaskModel;
+import org.apache.samza.system.SSPMetadataCache;
+import org.apache.samza.system.StreamMetadataCache;
+import org.apache.samza.system.SystemAdmins;
+import org.apache.samza.system.SystemStream;
+import org.apache.samza.system.SystemStreamPartition;
+import org.apache.samza.util.Clock;
+
+
+public class KafkaChangelogStateBackendFactory implements StateBackendFactory {
+  private static StreamMetadataCache streamCache;
+  private static SSPMetadataCache sspCache;
+
+  private static StreamMetadataCache getStreamCache(SystemAdmins admins, Clock clock) {

Review comment:
       Clarify in javadocs or inline comment at call site what we're doing here (sharing metadata cache across all state backend instances)

##########
File path: samza-api/src/main/java/org/apache/samza/storage/TaskBackupManager.java
##########
@@ -0,0 +1,81 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.samza.storage;
+
+import java.util.Map;
+import java.util.concurrent.CompletableFuture;
+
+import org.apache.samza.checkpoint.CheckpointId;
+import org.apache.samza.checkpoint.StateCheckpointMarker;
+
+/**
+ * <p>
+ * TaskBackupManager is the interface that must be implemented for
+ * any remote system that Samza persists its state to. The interface will be
+ * evoked in the following way:
+ * </p>
+ *
+ * <ul>
+ *   <li>Snapshot will be called before Upload.</li>
+ *   <li>persistToFilesystem will be called after Upload is completed</li>
+ *   <li>Cleanup is only called after Upload and persistToFilesystem has successfully completed</li>
+ * </ul>
+ */
+public interface TaskBackupManager {
+
+  /**
+   * Initiates the TaskBackupManagerIntance
+   */
+  default void init() {}

Review comment:
       Do we need to pass in the Context classes to init?

##########
File path: samza-api/src/main/java/org/apache/samza/checkpoint/StateCheckpointMarkerSerde.java
##########
@@ -0,0 +1,60 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.samza.checkpoint;
+
+import org.apache.commons.lang3.StringUtils;
+import org.apache.samza.SamzaException;
+
+
+public class StateCheckpointMarkerSerde<T extends StateCheckpointMarker> {
+  private static final short PROTOCOL_VERSION = 1;
+  private static final String SCM_SEPARATOR = ":";
+
+  public String serialize(T payload, StateCheckpointPayloadSerde<T> serde) {
+    StringBuilder builder = new StringBuilder();
+    builder.append(PROTOCOL_VERSION);
+    builder.append(SCM_SEPARATOR);
+    builder.append(serde.getClass().getName());
+    builder.append(SCM_SEPARATOR);
+    builder.append(serde.serialize(payload));
+
+    return builder.toString();
+  }
+
+  public T deserializePayload(String serializedSCM) {
+    if (StringUtils.isBlank(serializedSCM)) {
+      throw new IllegalArgumentException("Invalid remote store checkpoint message: " + serializedSCM);
+    }
+    String[] parts = serializedSCM.split(SCM_SEPARATOR);
+    if (parts.length != 3) {
+      throw new IllegalArgumentException("Invalid RemoteStore Metadata offset: " + serializedSCM);

Review comment:
       Also let's make sure the call site logs/includes the source task + store name information when this exception happens.




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [samza] prateekm commented on a change in pull request #1429: SAMZA-2591: API updates for TaskStorageBackupManager

Posted by GitBox <gi...@apache.org>.
prateekm commented on a change in pull request #1429:
URL: https://github.com/apache/samza/pull/1429#discussion_r585905942



##########
File path: samza-api/src/main/java/org/apache/samza/storage/TaskBackupManager.java
##########
@@ -60,10 +61,12 @@ default void start(Checkpoint checkpoint) {}
    * @param stateCheckpointMarkers The map of storename to checkpoint makers returned by the snapshot
    * @return The future of storename to checkpoint map of the uploaded local store
    */
-  CompletableFuture<Map<String, StateCheckpointMarker>> upload(CheckpointId checkpointId, Map<String, StateCheckpointMarker> stateCheckpointMarkers);
+  CompletableFuture<Map<String, StateCheckpointMarker>> upload(CheckpointId checkpointId,
+      Map<String, StateCheckpointMarker> stateCheckpointMarkers);
 
   /**
-   * Persist the state locally to the file system
+   * Occurs after the state has been persisted. Writes a copy of the persisted StateCheckpointMarkers from

Review comment:
       Should this be called independently by each storage backend (resulting in many different "OFFSET-V2" equivalent files), or should this be called once by TaskInstance / TaskCommitManager after collecting all the inputOffsets and SCMs and creating the checkpoint?




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [samza] dxichen commented on a change in pull request #1429: SAMZA-2591: API updates for TaskStorageBackupManager

Posted by GitBox <gi...@apache.org>.
dxichen commented on a change in pull request #1429:
URL: https://github.com/apache/samza/pull/1429#discussion_r540377997



##########
File path: samza-api/src/main/java/org/apache/samza/checkpoint/RemoteStoreMetadata.java
##########
@@ -0,0 +1,87 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.samza.checkpoint;
+
+import java.util.Objects;
+import org.apache.commons.lang3.StringUtils;
+
+
+public class RemoteStoreMetadata {

Review comment:
       done




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [samza] dxichen commented on a change in pull request #1429: SAMZA-2591: API updates for TaskStorageBackupManager

Posted by GitBox <gi...@apache.org>.
dxichen commented on a change in pull request #1429:
URL: https://github.com/apache/samza/pull/1429#discussion_r540379993



##########
File path: samza-api/src/main/java/org/apache/samza/checkpoint/KafkaStateChangelogOffset.java
##########
@@ -26,57 +26,58 @@
  * Checkpointed changelog offset has the format: [checkpointId, offset], separated by a colon.
  */
 @InterfaceStability.Unstable
-public class CheckpointedChangelogOffset {
+public class KafkaStateChangelogOffset {

Review comment:
       this will be deprecated in favor of `KafkaStateCheckpointMarker`




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [samza] prateekm commented on a change in pull request #1429: SAMZA-2591: API updates for TaskStorageBackupManager

Posted by GitBox <gi...@apache.org>.
prateekm commented on a change in pull request #1429:
URL: https://github.com/apache/samza/pull/1429#discussion_r561204115



##########
File path: samza-api/src/main/java/org/apache/samza/checkpoint/Checkpoint.java
##########
@@ -30,22 +33,53 @@
  * of restarting a failed container within a running job.
  */
 public class Checkpoint {
-  private final Map<SystemStreamPartition, String> offsets;
+  private static final short PROTOCOL_VERSION = 1;
+  private final CheckpointId checkpointId;
+  private final Map<SystemStreamPartition, String> inputOffsets;
+  private final Map<String, List<StateCheckpointMarker>> stateCheckpoints;
 
   /**
-   * Constructs a new checkpoint based off a map of Samza stream offsets.
-   * @param offsets Map of Samza streams to their current offset.
+   * Constructs a new checkpoint based off a map of Samza stream offsets, using a default checkpoint id
+   * @param inputOffsets Map of Samza streams to their current offset.
    */
-  public Checkpoint(Map<SystemStreamPartition, String> offsets) {
-    this.offsets = offsets;
+  public Checkpoint(Map<SystemStreamPartition, String>  inputOffsets) {
+    this(CheckpointId.getPlaceholderCheckpointId(), inputOffsets, new HashMap<>());
+  }
+
+  /**
+   * Constructs the checkpoint with separated input and state offsets
+   * @param id CheckpointId associated with this checkpoint
+   * @param inputOffsets Map of Samza system stream partition to offset of the checkpoint
+   * @param stateCheckpoints Map of local state store names and StateCheckpointMarkers for each state backend system
+   */
+  public Checkpoint(CheckpointId id, Map<SystemStreamPartition, String> inputOffsets, Map<String, List<StateCheckpointMarker>> stateCheckpoints) {
+    this.checkpointId = id;
+    this.inputOffsets = inputOffsets;
+    this.stateCheckpoints = stateCheckpoints;

Review comment:
       Create an immutable copy of the map.

##########
File path: samza-api/src/main/java/org/apache/samza/checkpoint/Checkpoint.java
##########
@@ -30,22 +33,53 @@
  * of restarting a failed container within a running job.
  */
 public class Checkpoint {
-  private final Map<SystemStreamPartition, String> offsets;
+  private static final short PROTOCOL_VERSION = 1;
+  private final CheckpointId checkpointId;
+  private final Map<SystemStreamPartition, String> inputOffsets;
+  private final Map<String, List<StateCheckpointMarker>> stateCheckpoints;
 
   /**
-   * Constructs a new checkpoint based off a map of Samza stream offsets.
-   * @param offsets Map of Samza streams to their current offset.
+   * Constructs a new checkpoint based off a map of Samza stream offsets, using a default checkpoint id
+   * @param inputOffsets Map of Samza streams to their current offset.
    */
-  public Checkpoint(Map<SystemStreamPartition, String> offsets) {
-    this.offsets = offsets;
+  public Checkpoint(Map<SystemStreamPartition, String>  inputOffsets) {
+    this(CheckpointId.getPlaceholderCheckpointId(), inputOffsets, new HashMap<>());
+  }
+
+  /**
+   * Constructs the checkpoint with separated input and state offsets
+   * @param id CheckpointId associated with this checkpoint
+   * @param inputOffsets Map of Samza system stream partition to offset of the checkpoint
+   * @param stateCheckpoints Map of local state store names and StateCheckpointMarkers for each state backend system
+   */
+  public Checkpoint(CheckpointId id, Map<SystemStreamPartition, String> inputOffsets, Map<String, List<StateCheckpointMarker>> stateCheckpoints) {
+    this.checkpointId = id;
+    this.inputOffsets = inputOffsets;
+    this.stateCheckpoints = stateCheckpoints;
+  }
+
+  /**
+   * Gets the checkpoint id for the checkpoint
+   * @return The timestamp based checkpoint identifier associated with the checkpoint
+   */
+  public CheckpointId getCheckpointId() {
+    return checkpointId;
   }
 
   /**
    * Gets a unmodifiable view of the current Samza stream offsets.
    * @return A unmodifiable view of a Map of Samza streams to their recorded offsets.
    */
-  public Map<SystemStreamPartition, String> getOffsets() {
-    return Collections.unmodifiableMap(offsets);
+  public Map<SystemStreamPartition, String> getInputOffsets() {
+    return Collections.unmodifiableMap(inputOffsets);
+  }
+
+  /**
+   * Gets the stateCheckpointMarkers
+   * @return The state checkpoint markers for the checkpoint
+   */
+  public Map<String, List<StateCheckpointMarker>> getStateCheckpointMarkers() {
+    return stateCheckpoints;

Review comment:
       Minor: Return immutable copy.

##########
File path: samza-api/src/main/java/org/apache/samza/checkpoint/CheckpointId.java
##########
@@ -41,6 +41,14 @@ public CheckpointId(long millis, long nanos) {
     this.nanos = nanos;

Review comment:
       Can this be a private constructor so that the only way to create checkpoint IDs is through the public static create() method?

##########
File path: samza-api/src/main/java/org/apache/samza/checkpoint/StateCheckpointMarkerSerde.java
##########
@@ -0,0 +1,60 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.samza.checkpoint;
+
+import org.apache.commons.lang3.StringUtils;
+import org.apache.samza.SamzaException;
+
+
+public class StateCheckpointMarkerSerde<T extends StateCheckpointMarker> {
+  private static final short PROTOCOL_VERSION = 1;
+  private static final String SCM_SEPARATOR = ":";
+
+  public String serialize(T payload, StateCheckpointPayloadSerde<T> serde) {
+    StringBuilder builder = new StringBuilder();
+    builder.append(PROTOCOL_VERSION);
+    builder.append(SCM_SEPARATOR);
+    builder.append(serde.getClass().getName());
+    builder.append(SCM_SEPARATOR);
+    builder.append(serde.serialize(payload));
+
+    return builder.toString();
+  }
+
+  public T deserializePayload(String serializedSCM) {
+    if (StringUtils.isBlank(serializedSCM)) {
+      throw new IllegalArgumentException("Invalid remote store checkpoint message: " + serializedSCM);
+    }
+    String[] parts = serializedSCM.split(SCM_SEPARATOR);
+    if (parts.length != 3) {
+      throw new IllegalArgumentException("Invalid RemoteStore Metadata offset: " + serializedSCM);

Review comment:
       s/RemoteStore Metadata offset/state checkpoint marker

##########
File path: samza-api/src/main/java/org/apache/samza/checkpoint/StateCheckpointMarkerSerde.java
##########
@@ -0,0 +1,60 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.samza.checkpoint;
+
+import org.apache.commons.lang3.StringUtils;
+import org.apache.samza.SamzaException;
+
+
+public class StateCheckpointMarkerSerde<T extends StateCheckpointMarker> {
+  private static final short PROTOCOL_VERSION = 1;
+  private static final String SCM_SEPARATOR = ":";
+
+  public String serialize(T payload, StateCheckpointPayloadSerde<T> serde) {
+    StringBuilder builder = new StringBuilder();
+    builder.append(PROTOCOL_VERSION);
+    builder.append(SCM_SEPARATOR);
+    builder.append(serde.getClass().getName());
+    builder.append(SCM_SEPARATOR);
+    builder.append(serde.serialize(payload));
+
+    return builder.toString();
+  }
+
+  public T deserializePayload(String serializedSCM) {
+    if (StringUtils.isBlank(serializedSCM)) {
+      throw new IllegalArgumentException("Invalid remote store checkpoint message: " + serializedSCM);
+    }
+    String[] parts = serializedSCM.split(SCM_SEPARATOR);

Review comment:
       The serialized payload on line 36 may contain the separator as well. It's better to split into fixed length array (3 parts), and let the payload serde throw an exception if parts[2]  cannot be parsed correctly.

##########
File path: samza-api/src/main/java/org/apache/samza/storage/TaskBackupManager.java
##########
@@ -0,0 +1,81 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.samza.storage;
+
+import java.util.Map;
+import java.util.concurrent.CompletableFuture;
+
+import org.apache.samza.checkpoint.CheckpointId;
+import org.apache.samza.checkpoint.StateCheckpointMarker;
+
+/**
+ * <p>
+ * TaskBackupManager is the interface that must be implemented for
+ * any remote system that Samza persists its state to. The interface will be
+ * evoked in the following way:
+ * </p>
+ *
+ * <ul>
+ *   <li>Snapshot will be called before Upload.</li>
+ *   <li>persistToFilesystem will be called after Upload is completed</li>
+ *   <li>Cleanup is only called after Upload and persistToFilesystem has successfully completed</li>
+ * </ul>
+ */
+public interface TaskBackupManager {
+
+  /**
+   * Initiates the TaskBackupManagerIntance
+   */
+  default void init() {}

Review comment:
       Why default?

##########
File path: samza-api/src/main/java/org/apache/samza/checkpoint/Checkpoint.java
##########
@@ -30,22 +33,53 @@
  * of restarting a failed container within a running job.
  */
 public class Checkpoint {
-  private final Map<SystemStreamPartition, String> offsets;
+  private static final short PROTOCOL_VERSION = 1;
+  private final CheckpointId checkpointId;
+  private final Map<SystemStreamPartition, String> inputOffsets;
+  private final Map<String, List<StateCheckpointMarker>> stateCheckpoints;
 
   /**
-   * Constructs a new checkpoint based off a map of Samza stream offsets.
-   * @param offsets Map of Samza streams to their current offset.
+   * Constructs a new checkpoint based off a map of Samza stream offsets, using a default checkpoint id
+   * @param inputOffsets Map of Samza streams to their current offset.
    */
-  public Checkpoint(Map<SystemStreamPartition, String> offsets) {
-    this.offsets = offsets;
+  public Checkpoint(Map<SystemStreamPartition, String>  inputOffsets) {
+    this(CheckpointId.getPlaceholderCheckpointId(), inputOffsets, new HashMap<>());

Review comment:
       Why a fake placeholder id instead of valid but unused id?

##########
File path: samza-api/src/main/java/org/apache/samza/storage/kv/KeyValueStore.java
##########
@@ -35,6 +38,16 @@
  * @param <V> the type of values maintained by this key-value store.
  */
 public interface KeyValueStore<K, V> {
+
+  /**
+   * Initiates the KeyValueStore
+   *
+   * @param containerContext context of the KeyValueStore's container
+   * @param jobContext context of the job the KeyValueStore is in
+   * @param externalContext any external store required for initialization
+   */
+  default void init(ContainerContext containerContext, JobContext jobContext, ExternalContext externalContext) { }

Review comment:
       Minor: Reverse order (External, Job, Container) to match scope.

##########
File path: samza-core/src/main/scala/org/apache/samza/container/TaskInstance.scala
##########
@@ -236,42 +240,33 @@ class TaskInstance(
       tableManager.flush()
     }
 
-    var newestChangelogOffsets: Map[SystemStreamPartition, Option[String]] = null
-    if (storageManager != null) {
-      trace("Flushing state stores for taskName: %s" format taskName)
-      newestChangelogOffsets = storageManager.flush()
-      trace("Got newest changelog offsets for taskName: %s as: %s " format(taskName, newestChangelogOffsets))
-    }
-
     val checkpointId = CheckpointId.create()
-    if (storageManager != null && newestChangelogOffsets != null) {
-      trace("Checkpointing stores for taskName: %s with checkpoint id: %s" format (taskName, checkpointId))
-      storageManager.checkpoint(checkpointId, newestChangelogOffsets.toMap)
-    }
-
-    if (newestChangelogOffsets != null) {
-      newestChangelogOffsets.foreach {case (ssp, newestOffsetOption) =>
-        val offset = new CheckpointedChangelogOffset(checkpointId, newestOffsetOption.orNull).toString
-        allCheckpointOffsets.put(ssp, offset)
-      }
+    var newestStateCheckpointMakers: util.Map[String, util.List[StateCheckpointMarker]] = null
+    // Perform state commit
+    if (commitManager != null) {
+      trace("Flushing state stores for taskName: %s" format taskName)

Review comment:
       Update message to mention commit instead of flushing stores. Flushing stores step should go into the commit / storage manager instead.

##########
File path: samza-core/src/main/scala/org/apache/samza/container/TaskInstance.scala
##########
@@ -236,42 +240,33 @@ class TaskInstance(
       tableManager.flush()
     }
 
-    var newestChangelogOffsets: Map[SystemStreamPartition, Option[String]] = null
-    if (storageManager != null) {
-      trace("Flushing state stores for taskName: %s" format taskName)
-      newestChangelogOffsets = storageManager.flush()
-      trace("Got newest changelog offsets for taskName: %s as: %s " format(taskName, newestChangelogOffsets))
-    }
-
     val checkpointId = CheckpointId.create()
-    if (storageManager != null && newestChangelogOffsets != null) {
-      trace("Checkpointing stores for taskName: %s with checkpoint id: %s" format (taskName, checkpointId))
-      storageManager.checkpoint(checkpointId, newestChangelogOffsets.toMap)
-    }
-
-    if (newestChangelogOffsets != null) {
-      newestChangelogOffsets.foreach {case (ssp, newestOffsetOption) =>
-        val offset = new CheckpointedChangelogOffset(checkpointId, newestOffsetOption.orNull).toString
-        allCheckpointOffsets.put(ssp, offset)
-      }
+    var newestStateCheckpointMakers: util.Map[String, util.List[StateCheckpointMarker]] = null
+    // Perform state commit
+    if (commitManager != null) {
+      trace("Flushing state stores for taskName: %s" format taskName)
+      newestStateCheckpointMakers = commitManager.commit(taskName, checkpointId)
+      trace("Got newest state checkpoint markers for taskName: %s as: %s " format(taskName, newestStateCheckpointMakers))
     }
-    val checkpoint = new Checkpoint(allCheckpointOffsets)
+    val checkpoint = new Checkpoint(checkpointId, allCheckpointOffsets, newestStateCheckpointMakers)
     trace("Got combined checkpoint offsets for taskName: %s as: %s" format (taskName, allCheckpointOffsets))
 
+    // Commit state and input checkpoints offsets atomically

Review comment:
       Instead of 'commit' which is overloaded in this context, let's explain that this is "Write input offsets and state checkpoint markers to the checkpoint topic atomically"

##########
File path: samza-core/src/main/scala/org/apache/samza/container/TaskInstance.scala
##########
@@ -236,42 +240,33 @@ class TaskInstance(
       tableManager.flush()
     }
 
-    var newestChangelogOffsets: Map[SystemStreamPartition, Option[String]] = null
-    if (storageManager != null) {
-      trace("Flushing state stores for taskName: %s" format taskName)
-      newestChangelogOffsets = storageManager.flush()
-      trace("Got newest changelog offsets for taskName: %s as: %s " format(taskName, newestChangelogOffsets))
-    }
-
     val checkpointId = CheckpointId.create()
-    if (storageManager != null && newestChangelogOffsets != null) {
-      trace("Checkpointing stores for taskName: %s with checkpoint id: %s" format (taskName, checkpointId))
-      storageManager.checkpoint(checkpointId, newestChangelogOffsets.toMap)
-    }
-
-    if (newestChangelogOffsets != null) {
-      newestChangelogOffsets.foreach {case (ssp, newestOffsetOption) =>
-        val offset = new CheckpointedChangelogOffset(checkpointId, newestOffsetOption.orNull).toString
-        allCheckpointOffsets.put(ssp, offset)
-      }
+    var newestStateCheckpointMakers: util.Map[String, util.List[StateCheckpointMarker]] = null
+    // Perform state commit
+    if (commitManager != null) {
+      trace("Flushing state stores for taskName: %s" format taskName)
+      newestStateCheckpointMakers = commitManager.commit(taskName, checkpointId)
+      trace("Got newest state checkpoint markers for taskName: %s as: %s " format(taskName, newestStateCheckpointMakers))
     }
-    val checkpoint = new Checkpoint(allCheckpointOffsets)
+    val checkpoint = new Checkpoint(checkpointId, allCheckpointOffsets, newestStateCheckpointMakers)
     trace("Got combined checkpoint offsets for taskName: %s as: %s" format (taskName, allCheckpointOffsets))
 
+    // Commit state and input checkpoints offsets atomically
     offsetManager.writeCheckpoint(taskName, checkpoint)
 
-    if (storageManager != null) {
+    // Perform cleanup on unused checkpoints
+    if (commitManager != null) {
       trace("Remove old checkpoint stores for taskName: %s" format taskName)

Review comment:
       Generalize message and just say 'Cleaning up old checkpoint state'. Also include checkpointId in the trace message.

##########
File path: samza-core/src/main/scala/org/apache/samza/container/TaskInstance.scala
##########
@@ -236,42 +240,33 @@ class TaskInstance(
       tableManager.flush()
     }
 
-    var newestChangelogOffsets: Map[SystemStreamPartition, Option[String]] = null
-    if (storageManager != null) {
-      trace("Flushing state stores for taskName: %s" format taskName)
-      newestChangelogOffsets = storageManager.flush()
-      trace("Got newest changelog offsets for taskName: %s as: %s " format(taskName, newestChangelogOffsets))
-    }
-
     val checkpointId = CheckpointId.create()
-    if (storageManager != null && newestChangelogOffsets != null) {
-      trace("Checkpointing stores for taskName: %s with checkpoint id: %s" format (taskName, checkpointId))
-      storageManager.checkpoint(checkpointId, newestChangelogOffsets.toMap)
-    }
-
-    if (newestChangelogOffsets != null) {
-      newestChangelogOffsets.foreach {case (ssp, newestOffsetOption) =>
-        val offset = new CheckpointedChangelogOffset(checkpointId, newestOffsetOption.orNull).toString
-        allCheckpointOffsets.put(ssp, offset)
-      }
+    var newestStateCheckpointMakers: util.Map[String, util.List[StateCheckpointMarker]] = null
+    // Perform state commit
+    if (commitManager != null) {
+      trace("Flushing state stores for taskName: %s" format taskName)
+      newestStateCheckpointMakers = commitManager.commit(taskName, checkpointId)
+      trace("Got newest state checkpoint markers for taskName: %s as: %s " format(taskName, newestStateCheckpointMakers))
     }
-    val checkpoint = new Checkpoint(allCheckpointOffsets)
+    val checkpoint = new Checkpoint(checkpointId, allCheckpointOffsets, newestStateCheckpointMakers)
     trace("Got combined checkpoint offsets for taskName: %s as: %s" format (taskName, allCheckpointOffsets))
 
+    // Commit state and input checkpoints offsets atomically
     offsetManager.writeCheckpoint(taskName, checkpoint)
 
-    if (storageManager != null) {
+    // Perform cleanup on unused checkpoints
+    if (commitManager != null) {

Review comment:
       Can this null check be removed?

##########
File path: samza-core/src/main/scala/org/apache/samza/storage/TaskStorageCommitManager.java
##########
@@ -0,0 +1,99 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.samza.storage;
+
+import java.util.ArrayList;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.concurrent.CompletableFuture;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.TimeoutException;
+
+import org.apache.samza.SamzaException;
+import org.apache.samza.checkpoint.CheckpointId;
+import org.apache.samza.checkpoint.StateCheckpointMarker;
+import org.apache.samza.container.TaskName;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+
+public class TaskStorageCommitManager {
+
+  private static final Logger LOG = LoggerFactory.getLogger(TaskStorageCommitManager.class);
+  private static final long COMMIT_TIMEOUT_MS = 30000;
+  private final TaskBackupManager storageBackupManager;
+
+  public TaskStorageCommitManager(TaskBackupManager storageBackupManager) {
+    this.storageBackupManager = storageBackupManager;
+  }
+
+  /**
+   * Commits the local state on the remote backup implementation
+   * @return Committed StoreName to StateCheckpointMarker mappings of the committed SSPs
+   */
+  public Map<String, List<StateCheckpointMarker>> commit(TaskName taskName, CheckpointId checkpointId) {
+    Map<String, StateCheckpointMarker> snapshot = storageBackupManager.snapshot(checkpointId);
+    CompletableFuture<Map<String, StateCheckpointMarker>>
+        uploadFuture = storageBackupManager.upload(checkpointId, snapshot);
+
+    try {
+      // TODO: Make async with andThen and add thread management for concurrency
+      Map<String, StateCheckpointMarker> uploadMap = uploadFuture.get(COMMIT_TIMEOUT_MS, TimeUnit.MILLISECONDS);
+      if (uploadMap != null) {
+        LOG.trace("Checkpointing stores for taskName: {}} with checkpoint id: {}", taskName, checkpointId);
+        storageBackupManager.persistToFilesystem(checkpointId, uploadMap);
+      }
+
+      // TODO: call commit on multiple backup managers when available

Review comment:
       I'd recommend handling this (multiple backup managers per store) now to make sure all the interfaces, logging etc are general and consistent.

##########
File path: samza-kafka/src/main/java/org/apache/samza/storage/KafkaChangelogStateBackendFactory.java
##########
@@ -0,0 +1,127 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.samza.storage;
+
+import java.io.File;
+import java.time.Duration;
+import java.util.Collections;
+import java.util.Map;
+import java.util.Set;
+import org.apache.samza.SamzaException;
+import org.apache.samza.config.Config;
+import org.apache.samza.config.JobConfig;
+import org.apache.samza.config.StorageConfig;
+import org.apache.samza.config.TaskConfig;
+import org.apache.samza.container.SamzaContainer;
+import org.apache.samza.job.model.ContainerModel;
+import org.apache.samza.job.model.JobModel;
+import org.apache.samza.job.model.TaskModel;
+import org.apache.samza.system.SSPMetadataCache;
+import org.apache.samza.system.StreamMetadataCache;
+import org.apache.samza.system.SystemAdmins;
+import org.apache.samza.system.SystemStream;
+import org.apache.samza.system.SystemStreamPartition;
+import org.apache.samza.util.Clock;
+
+
+public class KafkaChangelogStateBackendFactory implements StateBackendFactory {
+  private static StreamMetadataCache streamCache;
+  private static SSPMetadataCache sspCache;
+
+  private static StreamMetadataCache getStreamCache(SystemAdmins admins, Clock clock) {
+    if (streamCache == null) {
+      streamCache = new StreamMetadataCache(admins, 5000, clock);
+    }
+    return streamCache;
+  }
+
+  private static SSPMetadataCache getSspCache(SystemAdmins admins, Clock clock, Set<SystemStreamPartition> ssps) {
+    if (sspCache == null) {
+      sspCache = new SSPMetadataCache(admins, Duration.ofSeconds(5), clock, ssps);
+    }
+    return sspCache;
+  }
+
+  @Override
+  public TaskBackupManager getBackupManager(JobModel jobModel, ContainerModel containerModel, TaskModel taskModel,
+      Map<String, StorageEngine> taskStores, Config config, Clock clock) {
+    SystemAdmins systemAdmins = new SystemAdmins(config);
+    StorageConfig storageConfig = new StorageConfig(config);
+    Map<String, SystemStream> storeChangelogs = storageConfig.getStoreChangelogs();
+
+    File defaultFileDir = new File(System.getProperty("user.dir"), "state");

Review comment:
       s/defaultFileDir/defaultStoreBaseDir

##########
File path: samza-api/src/main/java/org/apache/samza/checkpoint/KafkaStateChangelogOffset.java
##########
@@ -26,57 +26,58 @@
  * Checkpointed changelog offset has the format: [checkpointId, offset], separated by a colon.
  */
 @InterfaceStability.Unstable
-public class CheckpointedChangelogOffset {
+public class KafkaStateChangelogOffset {

Review comment:
       Can this be removed in this PR or do we need to keep this around for backwards compatibility?

##########
File path: samza-kafka/src/main/scala/org/apache/samza/storage/KafkaNonTransactionalStateTaskBackupManager.scala
##########
@@ -20,62 +20,70 @@
 package org.apache.samza.storage
 
 import java.io._
+import java.util
+import java.util.concurrent.CompletableFuture
 
 import com.google.common.annotations.VisibleForTesting
 import com.google.common.collect.ImmutableSet
-import org.apache.samza.checkpoint.CheckpointId
+import org.apache.samza.checkpoint.kafka.KafkaStateCheckpointMarker
+import org.apache.samza.checkpoint.{CheckpointId, StateCheckpointMarker}
 import org.apache.samza.container.TaskName
 import org.apache.samza.job.model.TaskMode
 import org.apache.samza.system._
 import org.apache.samza.util.Logging
-import org.apache.samza.util.ScalaJavaUtil.JavaOptionals
 import org.apache.samza.{Partition, SamzaException}
 
+import scala.collection.JavaConversions.mapAsScalaMap
 import scala.collection.JavaConverters._
+import scala.collection.mutable
 
 /**
  * Manage all the storage engines for a given task
  */
-class NonTransactionalStateTaskStorageManager(
+class KafkaNonTransactionalStateTaskBackupManager(
   taskName: TaskName,
-  containerStorageManager: ContainerStorageManager,
-  storeChangelogs: Map[String, SystemStream] = Map(),
+  taskStores: util.Map[String, StorageEngine],
+  storeChangelogs: util.Map[String, SystemStream] = new util.HashMap[String, SystemStream](),
   systemAdmins: SystemAdmins,
   loggedStoreBaseDir: File = new File(System.getProperty("user.dir"), "state"),
-  partition: Partition) extends Logging with TaskStorageManager {
+  partition: Partition) extends Logging with TaskBackupManager {
 
   private val storageManagerUtil = new StorageManagerUtil
-  private val persistedStores = containerStorageManager.getAllStores(taskName).asScala
+  private val persistedStores = taskStores.asScala
     .filter { case (storeName, storageEngine) => storageEngine.getStoreProperties.isPersistedToDisk }
 
-  def getStore(storeName: String): Option[StorageEngine] =  JavaOptionals.toRichOptional(containerStorageManager.getStore(taskName, storeName)).toOption
-
-  def flush(): Map[SystemStreamPartition, Option[String]] = {
+  override def snapshot(checkpointId: CheckpointId): util.Map[String, StateCheckpointMarker] = {
     debug("Flushing stores.")
-    containerStorageManager.getAllStores(taskName).asScala.values.foreach(_.flush)
+    taskStores.asScala.values.foreach(_.flush)
     val newestChangelogSSPOffsets = getNewestChangelogSSPOffsets()
-    writeChangelogOffsetFiles(newestChangelogSSPOffsets)
+    writeChangelogOffsetFiles(KafkaStateCheckpointMarker
+      .stateCheckpointMarkerToSSPmap(newestChangelogSSPOffsets))
     newestChangelogSSPOffsets
   }
 
-  override def checkpoint(checkpointId: CheckpointId,
-    newestChangelogOffsets: Map[SystemStreamPartition, Option[String]]): Unit = {}
+  override def upload(checkpointId: CheckpointId,
+    stateCheckpointMarkers: util.Map[String, StateCheckpointMarker]): CompletableFuture[util.Map[String, StateCheckpointMarker]] = {
+     CompletableFuture.completedFuture(stateCheckpointMarkers)
+  }
+
+  override def persistToFilesystem(checkpointId: CheckpointId,
+    stateCheckpointMarkers: util.Map[String, StateCheckpointMarker]): Unit = {}
 
-  override def removeOldCheckpoints(checkpointId: CheckpointId): Unit = {}
+  override def cleanUp(checkpointId: CheckpointId): Unit = {}
 
   @VisibleForTesting
-  def stop() {
+  def close() {

Review comment:
       override

##########
File path: samza-kafka/src/main/scala/org/apache/samza/storage/KafkaTransactionalStateTaskBackupManager.scala
##########
@@ -73,10 +80,11 @@ class TransactionalStateTaskStorageManager(
       }}
       .toMap
 
-    writeChangelogOffsetFiles(checkpointPaths, storeChangelogs, newestChangelogOffsets)
+    writeChangelogOffsetFiles(checkpointPaths, storeChangelogs,
+       KafkaStateCheckpointMarker.stateCheckpointMarkerToSSPmap(stateCheckpointMarkers).asScala)
   }
 
-  def removeOldCheckpoints(latestCheckpointId: CheckpointId): Unit = {
+  def cleanUp(latestCheckpointId: CheckpointId): Unit = {

Review comment:
       override def. Same for other overriden method.

##########
File path: samza-core/src/main/scala/org/apache/samza/container/TaskInstance.scala
##########
@@ -236,42 +240,33 @@ class TaskInstance(
       tableManager.flush()
     }
 
-    var newestChangelogOffsets: Map[SystemStreamPartition, Option[String]] = null
-    if (storageManager != null) {
-      trace("Flushing state stores for taskName: %s" format taskName)
-      newestChangelogOffsets = storageManager.flush()
-      trace("Got newest changelog offsets for taskName: %s as: %s " format(taskName, newestChangelogOffsets))
-    }
-
     val checkpointId = CheckpointId.create()
-    if (storageManager != null && newestChangelogOffsets != null) {
-      trace("Checkpointing stores for taskName: %s with checkpoint id: %s" format (taskName, checkpointId))
-      storageManager.checkpoint(checkpointId, newestChangelogOffsets.toMap)
-    }
-
-    if (newestChangelogOffsets != null) {
-      newestChangelogOffsets.foreach {case (ssp, newestOffsetOption) =>
-        val offset = new CheckpointedChangelogOffset(checkpointId, newestOffsetOption.orNull).toString
-        allCheckpointOffsets.put(ssp, offset)
-      }
+    var newestStateCheckpointMakers: util.Map[String, util.List[StateCheckpointMarker]] = null

Review comment:
       s/newestStateCheckpointMarker/stateCheckpointMarkers.
   The newest was in the name earlier to indicate that for kafka changelog topics we need to flush them first to get the latest offset. Not relevant in this class  anymore.

##########
File path: samza-api/src/main/java/org/apache/samza/storage/TaskBackupManager.java
##########
@@ -0,0 +1,81 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.samza.storage;
+
+import java.util.Map;
+import java.util.concurrent.CompletableFuture;
+
+import org.apache.samza.checkpoint.CheckpointId;
+import org.apache.samza.checkpoint.StateCheckpointMarker;
+
+/**
+ * <p>
+ * TaskBackupManager is the interface that must be implemented for
+ * any remote system that Samza persists its state to. The interface will be
+ * evoked in the following way:
+ * </p>
+ *
+ * <ul>
+ *   <li>Snapshot will be called before Upload.</li>
+ *   <li>persistToFilesystem will be called after Upload is completed</li>
+ *   <li>Cleanup is only called after Upload and persistToFilesystem has successfully completed</li>
+ * </ul>
+ */
+public interface TaskBackupManager {
+
+  /**
+   * Initiates the TaskBackupManagerIntance
+   */
+  default void init() {}
+
+  /**
+   * Commit operation that is synchronous to processing
+   * @param checkpointId Checkpoint id of the current commit
+   * @return The storename to checkpoint of the snapshotted local store
+   */
+  Map<String, StateCheckpointMarker> snapshot(CheckpointId checkpointId);
+
+  /**
+   * Commit operation that is asynchronous to message processing,
+   * @param checkpointId Checkpoint id of the current commit
+   * @param stateCheckpointMarkers The map of storename to checkpoint makers returned by the snapshot
+   * @return The future of storename to checkpoint map of the uploaded local store
+   */
+  CompletableFuture<Map<String, StateCheckpointMarker>> upload(CheckpointId checkpointId, Map<String, StateCheckpointMarker> stateCheckpointMarkers);
+
+  /**
+   * Persist the state locally to the file system
+   * @param checkpointId The id of the checkpoint to be committed
+   * @param stateCheckpointMarkers Uploaded storename to checkpoints markers to be persisted locally
+   */
+  void persistToFilesystem(CheckpointId checkpointId, Map<String, StateCheckpointMarker> stateCheckpointMarkers);
+
+  /**
+   * Cleanup any local or remote state for obsolete checkpoint information that are older than checkpointId
+   * @param checkpointId The id of the latest successfully committed checkpoint
+   */
+  void cleanUp(CheckpointId checkpointId);
+
+  /**
+   * Used for testing as a shutdown hook to cleanup any allocated resources

Review comment:
       Let's remove "used for testing" and make this a part of the public interface to mirror init.

##########
File path: samza-core/src/main/scala/org/apache/samza/container/TaskInstance.scala
##########
@@ -236,42 +240,33 @@ class TaskInstance(
       tableManager.flush()
     }
 
-    var newestChangelogOffsets: Map[SystemStreamPartition, Option[String]] = null
-    if (storageManager != null) {
-      trace("Flushing state stores for taskName: %s" format taskName)
-      newestChangelogOffsets = storageManager.flush()
-      trace("Got newest changelog offsets for taskName: %s as: %s " format(taskName, newestChangelogOffsets))
-    }
-
     val checkpointId = CheckpointId.create()
-    if (storageManager != null && newestChangelogOffsets != null) {
-      trace("Checkpointing stores for taskName: %s with checkpoint id: %s" format (taskName, checkpointId))
-      storageManager.checkpoint(checkpointId, newestChangelogOffsets.toMap)
-    }
-
-    if (newestChangelogOffsets != null) {
-      newestChangelogOffsets.foreach {case (ssp, newestOffsetOption) =>
-        val offset = new CheckpointedChangelogOffset(checkpointId, newestOffsetOption.orNull).toString
-        allCheckpointOffsets.put(ssp, offset)
-      }
+    var newestStateCheckpointMakers: util.Map[String, util.List[StateCheckpointMarker]] = null

Review comment:
       Will the commit manager ever be null? If not, instead of a mutable nullable variable, just return an empty map from commitManager if there are no state checkpoint markers.

##########
File path: samza-core/src/main/scala/org/apache/samza/container/SamzaContainer.scala
##########
@@ -52,7 +52,8 @@ import org.apache.samza.util.{Util, _}
 import org.apache.samza.SamzaException
 import org.apache.samza.clustermanager.StandbyTaskUtil
 
-import scala.collection.JavaConversions
+import scala.collection.JavaConversions.mapAsScalaMap

Review comment:
       JavaConversions is deprecated. User JavaConverters instead.
   
   https://stackoverflow.com/questions/8301947/what-is-the-difference-between-javaconverters-and-javaconversions-in-scala

##########
File path: samza-kafka/src/main/java/org/apache/samza/checkpoint/kafka/KafkaStateCheckpointMarker.java
##########
@@ -0,0 +1,118 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.samza.checkpoint.kafka;
+
+
+import java.util.HashMap;
+import java.util.Map;
+import java.util.Objects;
+import org.apache.commons.lang3.StringUtils;
+import org.apache.samza.Partition;
+import org.apache.samza.annotation.InterfaceStability;
+import org.apache.samza.checkpoint.StateCheckpointMarker;
+import org.apache.samza.storage.KafkaChangelogStateBackendFactory;
+import org.apache.samza.system.SystemStreamPartition;
+import scala.Option;
+
+
+@InterfaceStability.Unstable
+public class KafkaStateCheckpointMarker implements StateCheckpointMarker {
+  private static final short PROTO_VERSION = 1;
+  private static final String FACTORY_NAME = KafkaChangelogStateBackendFactory.class.getName();
+  private static final String SEPARATOR = ";";
+
+  // One offset per SSP
+  private final SystemStreamPartition ssp;
+  private final String changelogOffset;
+
+  public KafkaStateCheckpointMarker(SystemStreamPartition ssp, String changelogOffset) {
+    this.ssp = ssp;
+    this.changelogOffset = changelogOffset;
+  }
+
+  public static short getProtocolVersion() {
+    return PROTO_VERSION;
+  }
+
+  public SystemStreamPartition getSsp() {
+    return ssp;
+  }
+
+  public String getChangelogOffset() {
+    return changelogOffset;
+  }
+
+  @Override
+  public String getFactoryName() {
+    return FACTORY_NAME;
+  }
+
+  @Override
+  public boolean equals(Object o) {
+    if (this == o) return true;
+    if (o == null || getClass() != o.getClass()) return false;
+    KafkaStateCheckpointMarker that = (KafkaStateCheckpointMarker) o;
+    return Objects.equals(ssp, that.ssp) &&
+        Objects.equals(changelogOffset, that.changelogOffset);
+  }
+
+  @Override
+  public int hashCode() {
+    return Objects.hash(PROTO_VERSION, ssp, changelogOffset);
+  }
+
+  @Override
+  public String toString() {
+    return String.format("%s%s%s%s%s%s%s%s%s", PROTO_VERSION, SEPARATOR,
+      ssp.getSystem(), SEPARATOR, ssp.getStream(), SEPARATOR, ssp.getPartition(), SEPARATOR, changelogOffset);
+  }
+
+  public static KafkaStateCheckpointMarker fromString(String message) {
+    if (StringUtils.isBlank(message)) {
+      throw new IllegalArgumentException("Invalid KafkaStateCheckpointMarker format: " + message);
+    }
+    String[] payload = message.split(KafkaStateCheckpointMarker.SEPARATOR);
+    if (payload.length != 5) {
+      throw new IllegalArgumentException("Invalid KafkaStateCheckpointMarker argument count: " + message);
+    }
+    if (Short.parseShort(payload[0]) != PROTO_VERSION) {
+      throw new IllegalArgumentException("Invalid KafkaStateCheckpointMarker protocol version: " + message);
+    }
+    Partition partition = new Partition(Integer.parseInt(payload[3]));
+    String offset = null;
+    if (!"null".equals(payload[4])) {
+      offset = payload[4];
+    }
+
+    return new KafkaStateCheckpointMarker(new SystemStreamPartition(payload[1], payload[2], partition), offset);
+  }
+
+  public static Map<SystemStreamPartition, Option<String>> stateCheckpointMarkerToSSPmap(Map<String, StateCheckpointMarker> markers) {

Review comment:
       Javadoc for public method.

##########
File path: samza-core/src/main/scala/org/apache/samza/container/TaskInstance.scala
##########
@@ -224,8 +228,8 @@ class TaskInstance(
     val allCheckpointOffsets = new java.util.HashMap[SystemStreamPartition, String]()
     val inputCheckpoint = offsetManager.buildCheckpoint(taskName)
     if (inputCheckpoint != null) {
-      trace("Got input offsets for taskName: %s as: %s" format(taskName, inputCheckpoint.getOffsets))
-      allCheckpointOffsets.putAll(inputCheckpoint.getOffsets)
+      trace("Got input offsets for taskName: %s as: %s" format(taskName, inputCheckpoint.getInputOffsets))
+      allCheckpointOffsets.putAll(inputCheckpoint.getInputOffsets)

Review comment:
       It looks like we don't need a separate allCheckpointOffsets map anymore and inputCheckpoint will suffice.

##########
File path: samza-core/src/main/scala/org/apache/samza/storage/TaskStorageCommitManager.java
##########
@@ -0,0 +1,99 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.samza.storage;
+
+import java.util.ArrayList;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.concurrent.CompletableFuture;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.TimeoutException;
+
+import org.apache.samza.SamzaException;
+import org.apache.samza.checkpoint.CheckpointId;
+import org.apache.samza.checkpoint.StateCheckpointMarker;
+import org.apache.samza.container.TaskName;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+
+public class TaskStorageCommitManager {
+
+  private static final Logger LOG = LoggerFactory.getLogger(TaskStorageCommitManager.class);
+  private static final long COMMIT_TIMEOUT_MS = 30000;
+  private final TaskBackupManager storageBackupManager;
+
+  public TaskStorageCommitManager(TaskBackupManager storageBackupManager) {
+    this.storageBackupManager = storageBackupManager;

Review comment:
       Does this need to be a per store map?

##########
File path: samza-core/src/main/scala/org/apache/samza/storage/TaskStorageCommitManager.java
##########
@@ -0,0 +1,99 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.samza.storage;
+
+import java.util.ArrayList;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.concurrent.CompletableFuture;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.TimeoutException;
+
+import org.apache.samza.SamzaException;
+import org.apache.samza.checkpoint.CheckpointId;
+import org.apache.samza.checkpoint.StateCheckpointMarker;
+import org.apache.samza.container.TaskName;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+
+public class TaskStorageCommitManager {
+
+  private static final Logger LOG = LoggerFactory.getLogger(TaskStorageCommitManager.class);
+  private static final long COMMIT_TIMEOUT_MS = 30000;
+  private final TaskBackupManager storageBackupManager;
+
+  public TaskStorageCommitManager(TaskBackupManager storageBackupManager) {
+    this.storageBackupManager = storageBackupManager;
+  }
+
+  /**
+   * Commits the local state on the remote backup implementation
+   * @return Committed StoreName to StateCheckpointMarker mappings of the committed SSPs
+   */
+  public Map<String, List<StateCheckpointMarker>> commit(TaskName taskName, CheckpointId checkpointId) {
+    Map<String, StateCheckpointMarker> snapshot = storageBackupManager.snapshot(checkpointId);
+    CompletableFuture<Map<String, StateCheckpointMarker>>
+        uploadFuture = storageBackupManager.upload(checkpointId, snapshot);
+
+    try {
+      // TODO: Make async with andThen and add thread management for concurrency
+      Map<String, StateCheckpointMarker> uploadMap = uploadFuture.get(COMMIT_TIMEOUT_MS, TimeUnit.MILLISECONDS);
+      if (uploadMap != null) {
+        LOG.trace("Checkpointing stores for taskName: {}} with checkpoint id: {}", taskName, checkpointId);
+        storageBackupManager.persistToFilesystem(checkpointId, uploadMap);
+      }
+
+      // TODO: call commit on multiple backup managers when available
+      return mergeCheckpoints(uploadMap);
+    } catch (TimeoutException e) {
+      throw new SamzaException("Upload timed out, commitTimeoutMs: " + COMMIT_TIMEOUT_MS, e);
+    } catch (Exception e) {
+      throw new SamzaException("Upload commit portion could not be completed", e);

Review comment:
       Include store and task information in error message.

##########
File path: samza-core/src/main/scala/org/apache/samza/container/TaskInstance.scala
##########
@@ -73,8 +76,9 @@ class TaskInstance(
 
   private val kvStoreSupplier = ScalaJavaUtil.toJavaFunction(
     (storeName: String) => {
-      if (storageManager != null && storageManager.getStore(storeName).isDefined) {
-        storageManager.getStore(storeName).get.asInstanceOf[KeyValueStore[_, _]]
+      if (containerStorageManager != null) {
+        val storeOption = JavaOptionals.toRichOptional(containerStorageManager.getStore(taskName, storeName)).toOption

Review comment:
       Probably don't need to convert to java optional here just to do isDefined

##########
File path: samza-core/src/main/scala/org/apache/samza/container/TaskInstance.scala
##########
@@ -236,42 +240,33 @@ class TaskInstance(
       tableManager.flush()
     }
 
-    var newestChangelogOffsets: Map[SystemStreamPartition, Option[String]] = null
-    if (storageManager != null) {
-      trace("Flushing state stores for taskName: %s" format taskName)
-      newestChangelogOffsets = storageManager.flush()
-      trace("Got newest changelog offsets for taskName: %s as: %s " format(taskName, newestChangelogOffsets))
-    }
-
     val checkpointId = CheckpointId.create()
-    if (storageManager != null && newestChangelogOffsets != null) {
-      trace("Checkpointing stores for taskName: %s with checkpoint id: %s" format (taskName, checkpointId))
-      storageManager.checkpoint(checkpointId, newestChangelogOffsets.toMap)
-    }
-
-    if (newestChangelogOffsets != null) {
-      newestChangelogOffsets.foreach {case (ssp, newestOffsetOption) =>
-        val offset = new CheckpointedChangelogOffset(checkpointId, newestOffsetOption.orNull).toString
-        allCheckpointOffsets.put(ssp, offset)
-      }
+    var newestStateCheckpointMakers: util.Map[String, util.List[StateCheckpointMarker]] = null
+    // Perform state commit
+    if (commitManager != null) {
+      trace("Flushing state stores for taskName: %s" format taskName)
+      newestStateCheckpointMakers = commitManager.commit(taskName, checkpointId)
+      trace("Got newest state checkpoint markers for taskName: %s as: %s " format(taskName, newestStateCheckpointMakers))
     }
-    val checkpoint = new Checkpoint(allCheckpointOffsets)
+    val checkpoint = new Checkpoint(checkpointId, allCheckpointOffsets, newestStateCheckpointMakers)
     trace("Got combined checkpoint offsets for taskName: %s as: %s" format (taskName, allCheckpointOffsets))

Review comment:
       Update trace message, log state checkpoint markers as well.

##########
File path: samza-kafka/src/main/java/org/apache/samza/storage/KafkaChangelogStateBackendFactory.java
##########
@@ -0,0 +1,127 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.samza.storage;
+
+import java.io.File;
+import java.time.Duration;
+import java.util.Collections;
+import java.util.Map;
+import java.util.Set;
+import org.apache.samza.SamzaException;
+import org.apache.samza.config.Config;
+import org.apache.samza.config.JobConfig;
+import org.apache.samza.config.StorageConfig;
+import org.apache.samza.config.TaskConfig;
+import org.apache.samza.container.SamzaContainer;
+import org.apache.samza.job.model.ContainerModel;
+import org.apache.samza.job.model.JobModel;
+import org.apache.samza.job.model.TaskModel;
+import org.apache.samza.system.SSPMetadataCache;
+import org.apache.samza.system.StreamMetadataCache;
+import org.apache.samza.system.SystemAdmins;
+import org.apache.samza.system.SystemStream;
+import org.apache.samza.system.SystemStreamPartition;
+import org.apache.samza.util.Clock;
+
+
+public class KafkaChangelogStateBackendFactory implements StateBackendFactory {
+  private static StreamMetadataCache streamCache;
+  private static SSPMetadataCache sspCache;
+
+  private static StreamMetadataCache getStreamCache(SystemAdmins admins, Clock clock) {
+    if (streamCache == null) {
+      streamCache = new StreamMetadataCache(admins, 5000, clock);

Review comment:
       Does this need to be thread safe? Same for method below.

##########
File path: samza-kv/src/main/scala/org/apache/samza/storage/kv/KeyValueStorageEngine.scala
##########
@@ -31,6 +31,7 @@ import java.util.Optional
 
 import com.google.common.annotations.VisibleForTesting
 import org.apache.samza.checkpoint.CheckpointId
+import org.apache.samza.context.{ContainerContext, Context, ExternalContext, JobContext}

Review comment:
       Unused import?

##########
File path: samza-kafka/src/main/java/org/apache/samza/storage/KafkaChangelogStateBackendFactory.java
##########
@@ -0,0 +1,127 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.samza.storage;
+
+import java.io.File;
+import java.time.Duration;
+import java.util.Collections;
+import java.util.Map;
+import java.util.Set;
+import org.apache.samza.SamzaException;
+import org.apache.samza.config.Config;
+import org.apache.samza.config.JobConfig;
+import org.apache.samza.config.StorageConfig;
+import org.apache.samza.config.TaskConfig;
+import org.apache.samza.container.SamzaContainer;
+import org.apache.samza.job.model.ContainerModel;
+import org.apache.samza.job.model.JobModel;
+import org.apache.samza.job.model.TaskModel;
+import org.apache.samza.system.SSPMetadataCache;
+import org.apache.samza.system.StreamMetadataCache;
+import org.apache.samza.system.SystemAdmins;
+import org.apache.samza.system.SystemStream;
+import org.apache.samza.system.SystemStreamPartition;
+import org.apache.samza.util.Clock;
+
+
+public class KafkaChangelogStateBackendFactory implements StateBackendFactory {
+  private static StreamMetadataCache streamCache;
+  private static SSPMetadataCache sspCache;
+
+  private static StreamMetadataCache getStreamCache(SystemAdmins admins, Clock clock) {

Review comment:
       Move private methods below public methods.

##########
File path: samza-kafka/src/main/java/org/apache/samza/storage/KafkaChangelogStateBackendFactory.java
##########
@@ -0,0 +1,127 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.samza.storage;
+
+import java.io.File;
+import java.time.Duration;
+import java.util.Collections;
+import java.util.Map;
+import java.util.Set;
+import org.apache.samza.SamzaException;
+import org.apache.samza.config.Config;
+import org.apache.samza.config.JobConfig;
+import org.apache.samza.config.StorageConfig;
+import org.apache.samza.config.TaskConfig;
+import org.apache.samza.container.SamzaContainer;
+import org.apache.samza.job.model.ContainerModel;
+import org.apache.samza.job.model.JobModel;
+import org.apache.samza.job.model.TaskModel;
+import org.apache.samza.system.SSPMetadataCache;
+import org.apache.samza.system.StreamMetadataCache;
+import org.apache.samza.system.SystemAdmins;
+import org.apache.samza.system.SystemStream;
+import org.apache.samza.system.SystemStreamPartition;
+import org.apache.samza.util.Clock;
+
+
+public class KafkaChangelogStateBackendFactory implements StateBackendFactory {
+  private static StreamMetadataCache streamCache;
+  private static SSPMetadataCache sspCache;
+
+  private static StreamMetadataCache getStreamCache(SystemAdmins admins, Clock clock) {
+    if (streamCache == null) {
+      streamCache = new StreamMetadataCache(admins, 5000, clock);
+    }
+    return streamCache;
+  }
+
+  private static SSPMetadataCache getSspCache(SystemAdmins admins, Clock clock, Set<SystemStreamPartition> ssps) {
+    if (sspCache == null) {
+      sspCache = new SSPMetadataCache(admins, Duration.ofSeconds(5), clock, ssps);
+    }
+    return sspCache;
+  }
+
+  @Override
+  public TaskBackupManager getBackupManager(JobModel jobModel, ContainerModel containerModel, TaskModel taskModel,
+      Map<String, StorageEngine> taskStores, Config config, Clock clock) {
+    SystemAdmins systemAdmins = new SystemAdmins(config);
+    StorageConfig storageConfig = new StorageConfig(config);
+    Map<String, SystemStream> storeChangelogs = storageConfig.getStoreChangelogs();
+
+    File defaultFileDir = new File(System.getProperty("user.dir"), "state");
+    File loggedStoreBaseDir = SamzaContainer.getLoggedStorageBaseDir(new JobConfig(config), defaultFileDir);
+
+    if (new TaskConfig(config).getTransactionalStateCheckpointEnabled()) {
+      return new KafkaTransactionalStateTaskBackupManager(taskModel.getTaskName(),
+          taskStores, storeChangelogs, systemAdmins, loggedStoreBaseDir, taskModel.getChangelogPartition(),
+          taskModel.getTaskMode(), new StorageManagerUtil());
+    } else {
+      return new KafkaNonTransactionalStateTaskBackupManager(taskModel.getTaskName(),
+          taskStores, storeChangelogs, systemAdmins, loggedStoreBaseDir, taskModel.getChangelogPartition());
+    }
+  }
+
+  @Override
+  public TaskRestoreManager getRestoreManager(JobModel jobModel, ContainerModel containerModel, TaskModel taskModel, Map<String, StorageEngine> taskStores, Config config, Clock clock) {
+    SystemAdmins systemAdmins = new SystemAdmins(config);
+    Map<String, SystemStream> storeChangelogs = new StorageConfig(config).getStoreChangelogs();
+    Map<String, SystemStream> filteredStoreChangelogs = ContainerStorageManager
+        .getChangelogSystemStreams(containerModel, storeChangelogs, null);
+
+    File defaultFileDir = new File(System.getProperty("user.dir"), "state");
+    File loggedStoreBaseDir = SamzaContainer.getLoggedStorageBaseDir(new JobConfig(config), defaultFileDir);
+    File nonLoggedStoreBaseDir = SamzaContainer.getNonLoggedStorageBaseDir(new JobConfig(config), defaultFileDir);
+
+    if (new TaskConfig(config).getTransactionalStateRestoreEnabled()) {
+      return new TransactionalStateTaskRestoreManager(
+          taskModel,
+          taskStores,
+          filteredStoreChangelogs,
+          systemAdmins,
+          null, // TODO @dchen have the restore managers create and manage Kafka consume lifecycle
+          KafkaChangelogStateBackendFactory
+              .getSspCache(systemAdmins, clock, Collections.emptySet()),
+          loggedStoreBaseDir,
+          nonLoggedStoreBaseDir,
+          config,
+          clock
+      );
+    } else {
+      return new NonTransactionalStateTaskRestoreManager(
+          taskModel,
+          filteredStoreChangelogs,
+          taskStores,
+          systemAdmins,
+          KafkaChangelogStateBackendFactory.getStreamCache(systemAdmins, clock),
+          null, // TODO  @dchen have the restore managers create and manage Kafka consume lifecycle

Review comment:
       What does this mean? Is this required for this PR to work E2E with existing jobs?

##########
File path: samza-api/src/main/java/org/apache/samza/storage/TaskBackupManager.java
##########
@@ -0,0 +1,81 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.samza.storage;
+
+import java.util.Map;
+import java.util.concurrent.CompletableFuture;
+
+import org.apache.samza.checkpoint.CheckpointId;
+import org.apache.samza.checkpoint.StateCheckpointMarker;
+
+/**
+ * <p>
+ * TaskBackupManager is the interface that must be implemented for
+ * any remote system that Samza persists its state to. The interface will be
+ * evoked in the following way:
+ * </p>
+ *
+ * <ul>
+ *   <li>Snapshot will be called before Upload.</li>
+ *   <li>persistToFilesystem will be called after Upload is completed</li>
+ *   <li>Cleanup is only called after Upload and persistToFilesystem has successfully completed</li>
+ * </ul>
+ */
+public interface TaskBackupManager {

Review comment:
       Let's include some more details in the class and method javadocs explaining what these concepts (snapshots, uploads, cleanup etc.) are. Let's assume that the reader has not read the design doc and is not familiar with the new commit process.

##########
File path: samza-core/src/main/scala/org/apache/samza/storage/TaskStorageCommitManager.java
##########
@@ -0,0 +1,99 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.samza.storage;
+
+import java.util.ArrayList;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.concurrent.CompletableFuture;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.TimeoutException;
+
+import org.apache.samza.SamzaException;
+import org.apache.samza.checkpoint.CheckpointId;
+import org.apache.samza.checkpoint.StateCheckpointMarker;
+import org.apache.samza.container.TaskName;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+
+public class TaskStorageCommitManager {
+
+  private static final Logger LOG = LoggerFactory.getLogger(TaskStorageCommitManager.class);
+  private static final long COMMIT_TIMEOUT_MS = 30000;
+  private final TaskBackupManager storageBackupManager;
+
+  public TaskStorageCommitManager(TaskBackupManager storageBackupManager) {
+    this.storageBackupManager = storageBackupManager;
+  }
+
+  /**
+   * Commits the local state on the remote backup implementation
+   * @return Committed StoreName to StateCheckpointMarker mappings of the committed SSPs
+   */
+  public Map<String, List<StateCheckpointMarker>> commit(TaskName taskName, CheckpointId checkpointId) {
+    Map<String, StateCheckpointMarker> snapshot = storageBackupManager.snapshot(checkpointId);
+    CompletableFuture<Map<String, StateCheckpointMarker>>
+        uploadFuture = storageBackupManager.upload(checkpointId, snapshot);
+
+    try {
+      // TODO: Make async with andThen and add thread management for concurrency
+      Map<String, StateCheckpointMarker> uploadMap = uploadFuture.get(COMMIT_TIMEOUT_MS, TimeUnit.MILLISECONDS);

Review comment:
       It'll be better to not include the commit timeout in this version until we have the concurrency handled. Otherwise large commits might timeout and fail the job instead of just stalling processing for some time.

##########
File path: samza-kafka/src/main/scala/org/apache/samza/storage/KafkaNonTransactionalStateTaskBackupManager.scala
##########
@@ -20,62 +20,70 @@
 package org.apache.samza.storage
 
 import java.io._
+import java.util
+import java.util.concurrent.CompletableFuture
 
 import com.google.common.annotations.VisibleForTesting
 import com.google.common.collect.ImmutableSet
-import org.apache.samza.checkpoint.CheckpointId
+import org.apache.samza.checkpoint.kafka.KafkaStateCheckpointMarker
+import org.apache.samza.checkpoint.{CheckpointId, StateCheckpointMarker}
 import org.apache.samza.container.TaskName
 import org.apache.samza.job.model.TaskMode
 import org.apache.samza.system._
 import org.apache.samza.util.Logging
-import org.apache.samza.util.ScalaJavaUtil.JavaOptionals
 import org.apache.samza.{Partition, SamzaException}
 
+import scala.collection.JavaConversions.mapAsScalaMap
 import scala.collection.JavaConverters._
+import scala.collection.mutable
 
 /**
  * Manage all the storage engines for a given task
  */
-class NonTransactionalStateTaskStorageManager(
+class KafkaNonTransactionalStateTaskBackupManager(
   taskName: TaskName,
-  containerStorageManager: ContainerStorageManager,
-  storeChangelogs: Map[String, SystemStream] = Map(),
+  taskStores: util.Map[String, StorageEngine],
+  storeChangelogs: util.Map[String, SystemStream] = new util.HashMap[String, SystemStream](),
   systemAdmins: SystemAdmins,
   loggedStoreBaseDir: File = new File(System.getProperty("user.dir"), "state"),
-  partition: Partition) extends Logging with TaskStorageManager {
+  partition: Partition) extends Logging with TaskBackupManager {
 
   private val storageManagerUtil = new StorageManagerUtil
-  private val persistedStores = containerStorageManager.getAllStores(taskName).asScala
+  private val persistedStores = taskStores.asScala
     .filter { case (storeName, storageEngine) => storageEngine.getStoreProperties.isPersistedToDisk }
 
-  def getStore(storeName: String): Option[StorageEngine] =  JavaOptionals.toRichOptional(containerStorageManager.getStore(taskName, storeName)).toOption
-
-  def flush(): Map[SystemStreamPartition, Option[String]] = {
+  override def snapshot(checkpointId: CheckpointId): util.Map[String, StateCheckpointMarker] = {
     debug("Flushing stores.")
-    containerStorageManager.getAllStores(taskName).asScala.values.foreach(_.flush)
+    taskStores.asScala.values.foreach(_.flush)
     val newestChangelogSSPOffsets = getNewestChangelogSSPOffsets()
-    writeChangelogOffsetFiles(newestChangelogSSPOffsets)
+    writeChangelogOffsetFiles(KafkaStateCheckpointMarker
+      .stateCheckpointMarkerToSSPmap(newestChangelogSSPOffsets))
     newestChangelogSSPOffsets
   }
 
-  override def checkpoint(checkpointId: CheckpointId,
-    newestChangelogOffsets: Map[SystemStreamPartition, Option[String]]): Unit = {}
+  override def upload(checkpointId: CheckpointId,
+    stateCheckpointMarkers: util.Map[String, StateCheckpointMarker]): CompletableFuture[util.Map[String, StateCheckpointMarker]] = {
+     CompletableFuture.completedFuture(stateCheckpointMarkers)
+  }
+
+  override def persistToFilesystem(checkpointId: CheckpointId,

Review comment:
       Why is this a no-op? Isn't this what should be calling `writeChangelogOffsetFiles`?

##########
File path: samza-core/src/main/scala/org/apache/samza/storage/TaskStorageCommitManager.java
##########
@@ -0,0 +1,99 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.samza.storage;
+
+import java.util.ArrayList;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.concurrent.CompletableFuture;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.TimeoutException;
+
+import org.apache.samza.SamzaException;
+import org.apache.samza.checkpoint.CheckpointId;
+import org.apache.samza.checkpoint.StateCheckpointMarker;
+import org.apache.samza.container.TaskName;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+
+public class TaskStorageCommitManager {
+
+  private static final Logger LOG = LoggerFactory.getLogger(TaskStorageCommitManager.class);
+  private static final long COMMIT_TIMEOUT_MS = 30000;
+  private final TaskBackupManager storageBackupManager;
+
+  public TaskStorageCommitManager(TaskBackupManager storageBackupManager) {
+    this.storageBackupManager = storageBackupManager;
+  }
+
+  /**
+   * Commits the local state on the remote backup implementation
+   * @return Committed StoreName to StateCheckpointMarker mappings of the committed SSPs
+   */
+  public Map<String, List<StateCheckpointMarker>> commit(TaskName taskName, CheckpointId checkpointId) {
+    Map<String, StateCheckpointMarker> snapshot = storageBackupManager.snapshot(checkpointId);

Review comment:
       Trace log returned SCMs

##########
File path: samza-core/src/main/scala/org/apache/samza/storage/TaskStorageCommitManager.java
##########
@@ -0,0 +1,99 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.samza.storage;
+
+import java.util.ArrayList;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.concurrent.CompletableFuture;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.TimeoutException;
+
+import org.apache.samza.SamzaException;
+import org.apache.samza.checkpoint.CheckpointId;
+import org.apache.samza.checkpoint.StateCheckpointMarker;
+import org.apache.samza.container.TaskName;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+
+public class TaskStorageCommitManager {
+
+  private static final Logger LOG = LoggerFactory.getLogger(TaskStorageCommitManager.class);
+  private static final long COMMIT_TIMEOUT_MS = 30000;
+  private final TaskBackupManager storageBackupManager;
+
+  public TaskStorageCommitManager(TaskBackupManager storageBackupManager) {
+    this.storageBackupManager = storageBackupManager;
+  }
+
+  /**
+   * Commits the local state on the remote backup implementation
+   * @return Committed StoreName to StateCheckpointMarker mappings of the committed SSPs
+   */
+  public Map<String, List<StateCheckpointMarker>> commit(TaskName taskName, CheckpointId checkpointId) {
+    Map<String, StateCheckpointMarker> snapshot = storageBackupManager.snapshot(checkpointId);
+    CompletableFuture<Map<String, StateCheckpointMarker>>
+        uploadFuture = storageBackupManager.upload(checkpointId, snapshot);
+
+    try {
+      // TODO: Make async with andThen and add thread management for concurrency
+      Map<String, StateCheckpointMarker> uploadMap = uploadFuture.get(COMMIT_TIMEOUT_MS, TimeUnit.MILLISECONDS);
+      if (uploadMap != null) {
+        LOG.trace("Checkpointing stores for taskName: {}} with checkpoint id: {}", taskName, checkpointId);
+        storageBackupManager.persistToFilesystem(checkpointId, uploadMap);
+      }
+
+      // TODO: call commit on multiple backup managers when available
+      return mergeCheckpoints(uploadMap);
+    } catch (TimeoutException e) {
+      throw new SamzaException("Upload timed out, commitTimeoutMs: " + COMMIT_TIMEOUT_MS, e);
+    } catch (Exception e) {
+      throw new SamzaException("Upload commit portion could not be completed", e);
+    }
+  }
+
+  public void cleanUp(CheckpointId checkpointId) {
+    storageBackupManager.cleanUp(checkpointId);
+  }
+
+  private Map<String, List<StateCheckpointMarker>> mergeCheckpoints(Map<String, StateCheckpointMarker>... stateCheckpoints) {
+    if (stateCheckpoints == null || stateCheckpoints.length < 1) {
+      return null;
+    }
+    Map<String, StateCheckpointMarker> firstCheckpoint = stateCheckpoints[0];
+    if (firstCheckpoint == null) {
+      return null;

Review comment:
       Return empty collections instead of nulls wherever applicable. Avoids accidental NPEs.

##########
File path: samza-core/src/main/scala/org/apache/samza/storage/TaskStorageCommitManager.java
##########
@@ -0,0 +1,99 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.samza.storage;
+
+import java.util.ArrayList;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.concurrent.CompletableFuture;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.TimeoutException;
+
+import org.apache.samza.SamzaException;
+import org.apache.samza.checkpoint.CheckpointId;
+import org.apache.samza.checkpoint.StateCheckpointMarker;
+import org.apache.samza.container.TaskName;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+
+public class TaskStorageCommitManager {
+
+  private static final Logger LOG = LoggerFactory.getLogger(TaskStorageCommitManager.class);
+  private static final long COMMIT_TIMEOUT_MS = 30000;
+  private final TaskBackupManager storageBackupManager;
+
+  public TaskStorageCommitManager(TaskBackupManager storageBackupManager) {
+    this.storageBackupManager = storageBackupManager;
+  }
+
+  /**
+   * Commits the local state on the remote backup implementation
+   * @return Committed StoreName to StateCheckpointMarker mappings of the committed SSPs
+   */
+  public Map<String, List<StateCheckpointMarker>> commit(TaskName taskName, CheckpointId checkpointId) {
+    Map<String, StateCheckpointMarker> snapshot = storageBackupManager.snapshot(checkpointId);
+    CompletableFuture<Map<String, StateCheckpointMarker>>
+        uploadFuture = storageBackupManager.upload(checkpointId, snapshot);
+
+    try {
+      // TODO: Make async with andThen and add thread management for concurrency
+      Map<String, StateCheckpointMarker> uploadMap = uploadFuture.get(COMMIT_TIMEOUT_MS, TimeUnit.MILLISECONDS);
+      if (uploadMap != null) {
+        LOG.trace("Checkpointing stores for taskName: {}} with checkpoint id: {}", taskName, checkpointId);

Review comment:
       Clarify log message for what this step is doing ("persisting ..."  instead of more general "checkpointing ...")

##########
File path: samza-kafka/src/main/java/org/apache/samza/checkpoint/kafka/KafkaStateCheckpointMarker.java
##########
@@ -0,0 +1,118 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.samza.checkpoint.kafka;
+
+
+import java.util.HashMap;
+import java.util.Map;
+import java.util.Objects;
+import org.apache.commons.lang3.StringUtils;
+import org.apache.samza.Partition;
+import org.apache.samza.annotation.InterfaceStability;
+import org.apache.samza.checkpoint.StateCheckpointMarker;
+import org.apache.samza.storage.KafkaChangelogStateBackendFactory;
+import org.apache.samza.system.SystemStreamPartition;
+import scala.Option;
+
+
+@InterfaceStability.Unstable
+public class KafkaStateCheckpointMarker implements StateCheckpointMarker {
+  private static final short PROTO_VERSION = 1;
+  private static final String FACTORY_NAME = KafkaChangelogStateBackendFactory.class.getName();
+  private static final String SEPARATOR = ";";
+
+  // One offset per SSP
+  private final SystemStreamPartition ssp;
+  private final String changelogOffset;
+
+  public KafkaStateCheckpointMarker(SystemStreamPartition ssp, String changelogOffset) {
+    this.ssp = ssp;
+    this.changelogOffset = changelogOffset;
+  }
+
+  public static short getProtocolVersion() {
+    return PROTO_VERSION;
+  }
+
+  public SystemStreamPartition getSsp() {
+    return ssp;
+  }
+
+  public String getChangelogOffset() {
+    return changelogOffset;
+  }
+
+  @Override
+  public String getFactoryName() {
+    return FACTORY_NAME;
+  }
+
+  @Override
+  public boolean equals(Object o) {

Review comment:
       Minor: Move equals/hashcode/toString below other public methods.

##########
File path: samza-api/src/main/java/org/apache/samza/checkpoint/Checkpoint.java
##########
@@ -30,22 +33,53 @@
  * of restarting a failed container within a running job.
  */
 public class Checkpoint {
-  private final Map<SystemStreamPartition, String> offsets;
+  private static final short PROTOCOL_VERSION = 1;
+  private final CheckpointId checkpointId;
+  private final Map<SystemStreamPartition, String> inputOffsets;
+  private final Map<String, List<StateCheckpointMarker>> stateCheckpoints;
 
   /**
-   * Constructs a new checkpoint based off a map of Samza stream offsets.
-   * @param offsets Map of Samza streams to their current offset.
+   * Constructs a new checkpoint based off a map of Samza stream offsets, using a default checkpoint id
+   * @param inputOffsets Map of Samza streams to their current offset.
    */
-  public Checkpoint(Map<SystemStreamPartition, String> offsets) {
-    this.offsets = offsets;
+  public Checkpoint(Map<SystemStreamPartition, String>  inputOffsets) {
+    this(CheckpointId.getPlaceholderCheckpointId(), inputOffsets, new HashMap<>());

Review comment:
       Minor: Use Collections.EMPTY_MAP or an empty map so that this cannot be accidentally modified.

##########
File path: samza-kafka/src/main/java/org/apache/samza/storage/KafkaChangelogStateBackendFactory.java
##########
@@ -0,0 +1,127 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.samza.storage;
+
+import java.io.File;
+import java.time.Duration;
+import java.util.Collections;
+import java.util.Map;
+import java.util.Set;
+import org.apache.samza.SamzaException;
+import org.apache.samza.config.Config;
+import org.apache.samza.config.JobConfig;
+import org.apache.samza.config.StorageConfig;
+import org.apache.samza.config.TaskConfig;
+import org.apache.samza.container.SamzaContainer;
+import org.apache.samza.job.model.ContainerModel;
+import org.apache.samza.job.model.JobModel;
+import org.apache.samza.job.model.TaskModel;
+import org.apache.samza.system.SSPMetadataCache;
+import org.apache.samza.system.StreamMetadataCache;
+import org.apache.samza.system.SystemAdmins;
+import org.apache.samza.system.SystemStream;
+import org.apache.samza.system.SystemStreamPartition;
+import org.apache.samza.util.Clock;
+
+
+public class KafkaChangelogStateBackendFactory implements StateBackendFactory {
+  private static StreamMetadataCache streamCache;
+  private static SSPMetadataCache sspCache;
+
+  private static StreamMetadataCache getStreamCache(SystemAdmins admins, Clock clock) {

Review comment:
       Clarify in javadocs or inline comment at call site what we're doing here (sharing metadata cache across all state backend instances)

##########
File path: samza-api/src/main/java/org/apache/samza/storage/TaskBackupManager.java
##########
@@ -0,0 +1,81 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.samza.storage;
+
+import java.util.Map;
+import java.util.concurrent.CompletableFuture;
+
+import org.apache.samza.checkpoint.CheckpointId;
+import org.apache.samza.checkpoint.StateCheckpointMarker;
+
+/**
+ * <p>
+ * TaskBackupManager is the interface that must be implemented for
+ * any remote system that Samza persists its state to. The interface will be
+ * evoked in the following way:
+ * </p>
+ *
+ * <ul>
+ *   <li>Snapshot will be called before Upload.</li>
+ *   <li>persistToFilesystem will be called after Upload is completed</li>
+ *   <li>Cleanup is only called after Upload and persistToFilesystem has successfully completed</li>
+ * </ul>
+ */
+public interface TaskBackupManager {
+
+  /**
+   * Initiates the TaskBackupManagerIntance
+   */
+  default void init() {}

Review comment:
       Do we need to pass in the Context classes to init?

##########
File path: samza-api/src/main/java/org/apache/samza/checkpoint/StateCheckpointMarkerSerde.java
##########
@@ -0,0 +1,60 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.samza.checkpoint;
+
+import org.apache.commons.lang3.StringUtils;
+import org.apache.samza.SamzaException;
+
+
+public class StateCheckpointMarkerSerde<T extends StateCheckpointMarker> {
+  private static final short PROTOCOL_VERSION = 1;
+  private static final String SCM_SEPARATOR = ":";
+
+  public String serialize(T payload, StateCheckpointPayloadSerde<T> serde) {
+    StringBuilder builder = new StringBuilder();
+    builder.append(PROTOCOL_VERSION);
+    builder.append(SCM_SEPARATOR);
+    builder.append(serde.getClass().getName());
+    builder.append(SCM_SEPARATOR);
+    builder.append(serde.serialize(payload));
+
+    return builder.toString();
+  }
+
+  public T deserializePayload(String serializedSCM) {
+    if (StringUtils.isBlank(serializedSCM)) {
+      throw new IllegalArgumentException("Invalid remote store checkpoint message: " + serializedSCM);
+    }
+    String[] parts = serializedSCM.split(SCM_SEPARATOR);
+    if (parts.length != 3) {
+      throw new IllegalArgumentException("Invalid RemoteStore Metadata offset: " + serializedSCM);

Review comment:
       Also let's make sure the call site logs/includes the source task + store name information when this exception happens.




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [samza] prateekm commented on a change in pull request #1429: SAMZA-2591: API updates for TaskStorageBackupManager

Posted by GitBox <gi...@apache.org>.
prateekm commented on a change in pull request #1429:
URL: https://github.com/apache/samza/pull/1429#discussion_r594681640



##########
File path: samza-core/src/main/java/org/apache/samza/storage/StorageManagerUtil.java
##########
@@ -323,7 +319,7 @@ public boolean storeExists(File storeDir) {
    * If the file does not exist, returns null.
    * // TODO HIGH dchen add tests at all call sites for handling null value.
    *
-   * @param storagePartitionDir base directory for the store
+   * @param storagePartitionDir base directory for the store checkpoint file

Review comment:
       "store directory to read the checkpoint file from"
   
   "base directory" is overloaded, it's used to refer to the "logged-store" dir as well as the task store dir, but not the store checkpoint dir.

##########
File path: samza-api/src/main/java/org/apache/samza/storage/TaskBackupManager.java
##########
@@ -43,32 +45,40 @@
 public interface TaskBackupManager {
 
   /**
-   * Initializes the TaskBackupManager instance
-   * @param checkpoint Last recorded checkpoint from the CheckpointManager or null if no last checkpoint was found
+   * Initializes the TaskBackupManager instance.
+   *
+   * @param checkpoint last recorded checkpoint from the CheckpointManager or null if no last checkpoint was found
    */
   void init(@Nullable Checkpoint checkpoint);
 
   /**
-   * Commit operation that is synchronous to processing
+   *  Snapshot is used to capture the current state of the stores in order to persist it to the backup manager in the
+   *  {@link #upload(CheckpointId, Map)} phase. Performs the commit operation that is synchronous
+   *  to processing. Returns the per store name state snapshotted checkpoints to be used in upload.
+   *
    * @param checkpointId {@link CheckpointId} of the current commit
    * @return a map of store name to state checkpoint markers for stores managed by this state backend
    */
   Map<String, String> snapshot(CheckpointId checkpointId);
 
   /**
-   * Commit operation that is asynchronous to message processing,
+   * Upload is used to persist to state provided by the {@link #snapshot(CheckpointId)} to the

Review comment:
       s/persist to state/persist the state

##########
File path: samza-core/src/main/java/org/apache/samza/storage/TaskStorageCommitManager.java
##########
@@ -139,18 +132,16 @@ public void init() {
   }
 
   /**
-   * // TODO HIGH dchen fix method name and documentation. local file system isn't very relevant here (too general).
-   * // Maybe rename to "writeCheckpointToStoreDirs" or something and update docs.
    * Writes the {@link Checkpoint} returned by {@link #commit(CheckpointId)}
    * locally to the file system on disk if the checkpoint passed in is an instance of {@link CheckpointV2},
    * otherwise if it is an instance of {@link CheckpointV1} persists the Kafka changelog ssp-offsets only.
    *
    * Note: The assumption is that this method will be invoked once for each {@link Checkpoint} version that the
-   * task needs to write.
+   * task needs to write, once per checkpoint version for backwards compatibility.

Review comment:
       "needs to write, as determined by {@link TaskConfig#getCheckpointVersionsToWrite} (or whatever the method name is). This is required for upgrade and rollback compatibility."
   

##########
File path: samza-core/src/main/java/org/apache/samza/storage/TaskStorageCommitManager.java
##########
@@ -139,18 +132,16 @@ public void init() {
   }
 
   /**
-   * // TODO HIGH dchen fix method name and documentation. local file system isn't very relevant here (too general).
-   * // Maybe rename to "writeCheckpointToStoreDirs" or something and update docs.
    * Writes the {@link Checkpoint} returned by {@link #commit(CheckpointId)}
    * locally to the file system on disk if the checkpoint passed in is an instance of {@link CheckpointV2},
    * otherwise if it is an instance of {@link CheckpointV1} persists the Kafka changelog ssp-offsets only.
    *
    * Note: The assumption is that this method will be invoked once for each {@link Checkpoint} version that the
-   * task needs to write.
+   * task needs to write, once per checkpoint version for backwards compatibility.
    *
    * @param checkpoint the latest checkpoint to be persisted to local file system
    */
-  public void persistToLocalFileSystem(Checkpoint checkpoint) {
+  public void writeCheckpointToStoreDirectory(Checkpoint checkpoint) {

Review comment:
       `Directories` (since this writes to both store dir and store checkpoint dir). Can also clarify this in the method javadocs. E.g.: 
   
   "Writes the {@link Checkpoint} information returned by {@link #commit(CheckpointId)} in each store directory and store checkpoint directory. For CheckpointV2, writes the entire task {@link CheckpointV2}. For CheckpointV1, only writes the changelog ssp offsets in the OFFSET* files."

##########
File path: samza-core/src/main/java/org/apache/samza/storage/TaskStorageCommitManager.java
##########
@@ -181,27 +176,26 @@ public void persistToLocalFileSystem(Checkpoint checkpoint) {
   }
 
   /**
-   * // TODO MED dchen: fix documentation
-   * Cleanup the commit state for each of the task backup managers
+   * Cleanup on the commit state from the {@link #commit(CheckpointId)} call. Evokes the cleanup the commit state

Review comment:
       "Performs any post-commit and cleanup actions after the {@link Checkpoint} is successfully written to the checkpoint topic. Invokes {@link TaskStorageBackupManager#cleanup} on each of the configured backup managers. Deletes all local store checkpoint directories older than the {@param latestCheckpointId}

##########
File path: samza-core/src/main/java/org/apache/samza/storage/StorageManagerUtil.java
##########
@@ -246,15 +245,12 @@ public void writeOffsetFile(File storeDir, Map<SystemStreamPartition, String> of
 
   /**
    * Writes the checkpoint to the store checkpoint directory based on the checkpointId.
-   * // TODO HIGH dchen why assume writing to "checkpoint directory" instead of arbitrary directory?
-   * // TODO HIGH dchen fix param descriptions
-   * @param checkpointDir base store directory to write the checkpoint to
+   *
+   * @param storeDir base store directory to write the checkpoint to

Review comment:
       "store or store checkpoint directory to write the checkpoint file to"

##########
File path: samza-api/src/main/java/org/apache/samza/storage/TaskBackupManager.java
##########
@@ -43,32 +45,40 @@
 public interface TaskBackupManager {
 
   /**
-   * Initializes the TaskBackupManager instance
-   * @param checkpoint Last recorded checkpoint from the CheckpointManager or null if no last checkpoint was found
+   * Initializes the TaskBackupManager instance.
+   *
+   * @param checkpoint last recorded checkpoint from the CheckpointManager or null if no last checkpoint was found
    */
   void init(@Nullable Checkpoint checkpoint);
 
   /**
-   * Commit operation that is synchronous to processing
+   *  Snapshot is used to capture the current state of the stores in order to persist it to the backup manager in the
+   *  {@link #upload(CheckpointId, Map)} phase. Performs the commit operation that is synchronous
+   *  to processing. Returns the per store name state snapshotted checkpoints to be used in upload.
+   *
    * @param checkpointId {@link CheckpointId} of the current commit
    * @return a map of store name to state checkpoint markers for stores managed by this state backend
    */
   Map<String, String> snapshot(CheckpointId checkpointId);
 
   /**
-   * Commit operation that is asynchronous to message processing,
+   * Upload is used to persist to state provided by the {@link #snapshot(CheckpointId)} to the
+   * underlying backup system. Commit operation that is asynchronous to message processing and returns a
+   * {@link CompletableFuture} containing the successfully uploaded state checkpoints.

Review comment:
       "uploaded state checkpoint markers"

##########
File path: samza-core/src/main/java/org/apache/samza/storage/TaskStorageCommitManager.java
##########
@@ -181,27 +176,26 @@ public void persistToLocalFileSystem(Checkpoint checkpoint) {
   }
 
   /**
-   * // TODO MED dchen: fix documentation
-   * Cleanup the commit state for each of the task backup managers
+   * Cleanup on the commit state from the {@link #commit(CheckpointId)} call. Evokes the cleanup the commit state

Review comment:
       s/Evokes/Invokes in this class.




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [samza] mynameborat commented on a change in pull request #1429: SAMZA-2591: API updates for TaskStorageBackupManager

Posted by GitBox <gi...@apache.org>.
mynameborat commented on a change in pull request #1429:
URL: https://github.com/apache/samza/pull/1429#discussion_r489513707



##########
File path: samza-core/src/main/scala/org/apache/samza/storage/TaskStorageBackupManager.scala
##########
@@ -22,11 +22,31 @@ package org.apache.samza.storage
 import org.apache.samza.checkpoint.CheckpointId
 import org.apache.samza.system.SystemStreamPartition
 
-trait TaskStorageManager {
+trait TaskStorageBackupManager {

Review comment:
       I looked a bit more and feel this interface is currently overloaded with responsibilities. For the most part it does only commit handling for stores related to a task and most of the management moved to `ContainerStorageManager` but exposing the store through `getStore` lingers around. 
   
   > If you can take a crack at removing `getStore` method off this interface that will good. `TaskInstance` can get store handles from CSM instead or just directly gets the list of stores associated with it and eliminates CSM from the picture are few options to consider.
   
   The above is not a blocker but do create a follow up if you aren't planning to do.
   
   Given that context, I'd suggest to rename this interface to `TaskCommitManager` and not roll the backup responsibilities into it rather keep it separate and use composition within the commit manager. By this, you will potentially have flexibility to reuse some of the shared commit logic across the types (kafka, remote store, etc) but still keep the back/upload part isolated to its own implementation without have a single class that manages both these.
   
   Let me know your thoughts?
   
   
   

##########
File path: samza-core/src/main/scala/org/apache/samza/storage/KafkaTransactionalStateTaskStorageBackupManager.scala
##########
@@ -39,24 +39,28 @@ import scala.collection.JavaConverters._
 /**
  * Manage all the storage engines for a given task
  */
-class TransactionalStateTaskStorageManager(
+class KafkaTransactionalStateTaskStorageBackupManager(

Review comment:
       Do you foresee any shared logic in the commit path regardless of the choice of durability? Wonder if we should keep this as is and extract the remote store specifics out to the backup manager.




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [samza] prateekm commented on a change in pull request #1429: SAMZA-2591: API updates for TaskStorageBackupManager

Posted by GitBox <gi...@apache.org>.
prateekm commented on a change in pull request #1429:
URL: https://github.com/apache/samza/pull/1429#discussion_r594676882



##########
File path: samza-core/src/main/java/org/apache/samza/storage/TaskStorageCommitManager.java
##########
@@ -161,18 +152,22 @@ public void persistToLocalFileSystem(Checkpoint checkpoint) {
           taskName, checkpoint);
       storageEngines.forEach((storeName, storageEngine) -> {
         // Only write the checkpoint file if the store is durable and persisted to disk
-        if (storageEngine.getStoreProperties().isLoggedStore() &&
+        if (storageEngine.getStoreProperties().isDurableStore() &&
             storageEngine.getStoreProperties().isPersistedToDisk()) {
           CheckpointV2 checkpointV2 = (CheckpointV2) checkpoint;
 
-          // TODO HIGH dchen add try-catch with info about task, store, checkpointId etc. Write can throw IOException.
-          File storeDir = storageManagerUtil.getTaskStoreDir(durableStoreBaseDir, storeName, taskName, TaskMode.Active);
-          storageManagerUtil.writeCheckpointFile(storeDir, checkpointV2);
+          try {
+            File storeDir = storageManagerUtil.getTaskStoreDir(durableStoreBaseDir, storeName, taskName, TaskMode.Active);
+            storageManagerUtil.writeCheckpointV2File(storeDir, checkpointV2);
 
-          // TODO HIGH dchen add try-catch with info about task, store, checkpointId etc. Write can throw IOException.
-          CheckpointId checkpointId = checkpointV2.getCheckpointId();
-          File checkpointDir = Paths.get(StorageManagerUtil.getCheckpointDirPath(storeDir, checkpointId)).toFile();
-          storageManagerUtil.writeCheckpointFile(checkpointDir, checkpointV2);
+            CheckpointId checkpointId = checkpointV2.getCheckpointId();
+            File checkpointDir = Paths.get(StorageManagerUtil.getCheckpointDirPath(storeDir, checkpointId)).toFile();
+            storageManagerUtil.writeCheckpointV2File(checkpointDir, checkpointV2);
+          } catch (Exception e) {
+            throw new SamzaException(

Review comment:
       Include original exception.




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [samza] prateekm commented on a change in pull request #1429: SAMZA-2591: API updates for TaskStorageBackupManager

Posted by GitBox <gi...@apache.org>.
prateekm commented on a change in pull request #1429:
URL: https://github.com/apache/samza/pull/1429#discussion_r614976079



##########
File path: samza-core/src/main/scala/org/apache/samza/container/TaskInstance.scala
##########
@@ -280,6 +282,7 @@ class TaskInstance(
       if (timeSinceLastCommit < commitMaxDelayMs) {
         info("Skipping commit for taskName: %s since another commit is in progress. " +
           "%s ms have elapsed since the pending commit started." format (taskName, timeSinceLastCommit))
+        metrics.asyncCommitSkipped.set(numSkippedCommits + 1)

Review comment:
       This is always 1. You probably need to do `metrics.asyncCommitSkipped.set(metrics.asyncCommitSkipped.get + 1)`.

##########
File path: samza-core/src/main/scala/org/apache/samza/container/TaskInstance.scala
##########
@@ -335,8 +341,21 @@ class TaskInstance(
         debug("Starting async stage of commit for taskName: %s checkpointId: %s" format (taskName, checkpointId))
 
         try {
+          val uploadStartTimeNs = System.nanoTime()
           val uploadSCMsFuture = commitManager.upload(checkpointId, snapshotSCMs)
 
+          uploadSCMsFuture.whenComplete(new BiConsumer[util.Map[String, util.Map[String, String]], Throwable] {
+            override def accept(t: util.Map[String, util.Map[String, String]], throwable: Throwable): Unit = {
+              if (throwable == null) {
+                metrics.asyncUploadNs.update(System.nanoTime() - uploadStartTimeNs)
+                metrics.asyncUploadsCompleted.inc()
+              } else {
+                debug("Commit upload did not complete successfully for taskName: %s checkpointId: %s with error msg: %s"
+                  format (taskName, checkpointId, throwable.getMessage))
+              }
+            }
+          })

Review comment:
       Add a metric for cleanupNs as well.

##########
File path: samza-core/src/main/scala/org/apache/samza/container/TaskInstanceMetrics.scala
##########
@@ -37,7 +37,14 @@ class TaskInstanceMetrics(
   val flushes = newCounter("flush-calls")
   val pendingMessages = newGauge("pending-messages", 0)
   val messagesInFlight = newGauge("messages-in-flight", 0)
-  val asyncCallbackCompleted = newCounter("async-callback-complete-calls");
+  val asyncCallbackCompleted = newCounter("async-callback-complete-calls")
+
+  val asyncCommitSkipped = newGauge("async-commits-skipped", 0)
+  val asyncUploadsCompleted = newCounter("async-uploads-completed")
+  val asyncUploadNs = newTimer("async-upload-ns")
+  val commitNs = newTimer("commit-ns")

Review comment:
       Does it make sense to have a container wide version of these metrics for an overview as well (in addition to per task version)?

##########
File path: samza-core/src/main/scala/org/apache/samza/container/TaskInstance.scala
##########
@@ -280,6 +282,7 @@ class TaskInstance(
       if (timeSinceLastCommit < commitMaxDelayMs) {
         info("Skipping commit for taskName: %s since another commit is in progress. " +
           "%s ms have elapsed since the pending commit started." format (taskName, timeSinceLastCommit))
+        metrics.asyncCommitSkipped.set(numSkippedCommits + 1)

Review comment:
       Also the metric name should just be commitsSkipped / skippedCommits.

##########
File path: samza-core/src/main/scala/org/apache/samza/container/TaskInstance.scala
##########
@@ -453,6 +472,11 @@ class TaskInstance(
                 "during async stage of commit for taskName: %s checkpointId: %s. New exception logged above. " +
                 "Saved exception under Caused By.", commitException.get())
             }
+          } else {
+            metrics.commitNs.update(System.nanoTime() - commitStartNs)
+            // reset the numbers skipped commits for the current commit
+            numSkippedCommits = 0

Review comment:
       It may not be correct to reset this gauge since the default reporting interval is only 1 minute. E.g., if you set task.commit.ms = 10 secs and skip 5 commits, the reported gauge value may still incorrectly read 0 at the end. Better to leave this as an incrementing only value.

##########
File path: samza-core/src/main/scala/org/apache/samza/container/TaskInstanceMetrics.scala
##########
@@ -37,7 +37,14 @@ class TaskInstanceMetrics(
   val flushes = newCounter("flush-calls")
   val pendingMessages = newGauge("pending-messages", 0)
   val messagesInFlight = newGauge("messages-in-flight", 0)
-  val asyncCallbackCompleted = newCounter("async-callback-complete-calls");
+  val asyncCallbackCompleted = newCounter("async-callback-complete-calls")
+
+  val asyncCommitSkipped = newGauge("async-commits-skipped", 0)
+  val asyncUploadsCompleted = newCounter("async-uploads-completed")
+  val asyncUploadNs = newTimer("async-upload-ns")
+  val commitNs = newTimer("commit-ns")

Review comment:
       Minor: order these hierarchically: commitNs -> [snapshotNs, asyncCommitNs -> [uploadNs, checkpointWriteNs, cleanupNs]]

##########
File path: samza-core/src/main/scala/org/apache/samza/container/TaskInstanceMetrics.scala
##########
@@ -37,7 +37,14 @@ class TaskInstanceMetrics(
   val flushes = newCounter("flush-calls")
   val pendingMessages = newGauge("pending-messages", 0)
   val messagesInFlight = newGauge("messages-in-flight", 0)
-  val asyncCallbackCompleted = newCounter("async-callback-complete-calls");
+  val asyncCallbackCompleted = newCounter("async-callback-complete-calls")
+
+  val asyncCommitSkipped = newGauge("async-commits-skipped", 0)
+  val asyncUploadsCompleted = newCounter("async-uploads-completed")

Review comment:
       Would it be better to just have commitsSkipped and commitsCompleted for the overall commit? How would you use this metric in isolation?




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [samza] prateekm commented on a change in pull request #1429: SAMZA-2591: API updates for TaskStorageBackupManager

Posted by GitBox <gi...@apache.org>.
prateekm commented on a change in pull request #1429:
URL: https://github.com/apache/samza/pull/1429#discussion_r614965895



##########
File path: samza-core/src/main/scala/org/apache/samza/storage/ContainerStorageManager.java
##########
@@ -256,6 +254,10 @@ public ContainerStorageManager(
         containerChangelogSystems, systemFactories, config, this.samzaContainerMetrics.registry());
     this.storeConsumers = createStoreIndexedMap(this.changelogSystemStreams, storeSystemConsumers);
 
+    // TODO HIGH dchen tune based on observed concurrency
+    this.restoreExecutor = Executors.newFixedThreadPool(containerModel.getTasks().size(),

Review comment:
       @dxichen I think this still needs to be 2x until you make RestoreManager init() and restore() nonblocking.
   
   Where is restore manager init (not store init) called btw? Is this on this executor as well? Since we're deleting old stores during restore init, ideally that should be parallel and nonblocking as well.

##########
File path: samza-core/src/main/scala/org/apache/samza/storage/ContainerStorageManager.java
##########
@@ -643,15 +652,11 @@ private void restoreStores() throws InterruptedException {
     // Start each store consumer once
     this.storeConsumers.values().stream().distinct().forEach(SystemConsumer::start);
 
-    // Create a thread pool for parallel restores (and stopping of persistent stores)
-    ExecutorService executorService = Executors.newFixedThreadPool(this.parallelRestoreThreadPoolSize,
-        new ThreadFactoryBuilder().setDaemon(true).setNameFormat(RESTORE_THREAD_NAME).build());
-
     List<Future> taskRestoreFutures = new ArrayList<>(this.taskRestoreManagers.entrySet().size());
 
     // Submit restore callable for each taskInstance
     this.taskRestoreManagers.forEach((taskInstance, taskRestoreManager) -> {
-      taskRestoreFutures.add(executorService.submit(
+      taskRestoreFutures.add(restoreExecutor.submit(

Review comment:
       What's the relationship b/w the parallelRestoreThreadPoolSize and new executors pool size? Are they the same?




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org