You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@samza.apache.org by GitBox <gi...@apache.org> on 2021/01/20 20:05:24 UTC

[GitHub] [samza] prateekm commented on a change in pull request #1429: SAMZA-2591: API updates for TaskStorageBackupManager

prateekm commented on a change in pull request #1429:
URL: https://github.com/apache/samza/pull/1429#discussion_r561204115



##########
File path: samza-api/src/main/java/org/apache/samza/checkpoint/Checkpoint.java
##########
@@ -30,22 +33,53 @@
  * of restarting a failed container within a running job.
  */
 public class Checkpoint {
-  private final Map<SystemStreamPartition, String> offsets;
+  private static final short PROTOCOL_VERSION = 1;
+  private final CheckpointId checkpointId;
+  private final Map<SystemStreamPartition, String> inputOffsets;
+  private final Map<String, List<StateCheckpointMarker>> stateCheckpoints;
 
   /**
-   * Constructs a new checkpoint based off a map of Samza stream offsets.
-   * @param offsets Map of Samza streams to their current offset.
+   * Constructs a new checkpoint based off a map of Samza stream offsets, using a default checkpoint id
+   * @param inputOffsets Map of Samza streams to their current offset.
    */
-  public Checkpoint(Map<SystemStreamPartition, String> offsets) {
-    this.offsets = offsets;
+  public Checkpoint(Map<SystemStreamPartition, String>  inputOffsets) {
+    this(CheckpointId.getPlaceholderCheckpointId(), inputOffsets, new HashMap<>());
+  }
+
+  /**
+   * Constructs the checkpoint with separated input and state offsets
+   * @param id CheckpointId associated with this checkpoint
+   * @param inputOffsets Map of Samza system stream partition to offset of the checkpoint
+   * @param stateCheckpoints Map of local state store names and StateCheckpointMarkers for each state backend system
+   */
+  public Checkpoint(CheckpointId id, Map<SystemStreamPartition, String> inputOffsets, Map<String, List<StateCheckpointMarker>> stateCheckpoints) {
+    this.checkpointId = id;
+    this.inputOffsets = inputOffsets;
+    this.stateCheckpoints = stateCheckpoints;

Review comment:
       Create an immutable copy of the map.

##########
File path: samza-api/src/main/java/org/apache/samza/checkpoint/Checkpoint.java
##########
@@ -30,22 +33,53 @@
  * of restarting a failed container within a running job.
  */
 public class Checkpoint {
-  private final Map<SystemStreamPartition, String> offsets;
+  private static final short PROTOCOL_VERSION = 1;
+  private final CheckpointId checkpointId;
+  private final Map<SystemStreamPartition, String> inputOffsets;
+  private final Map<String, List<StateCheckpointMarker>> stateCheckpoints;
 
   /**
-   * Constructs a new checkpoint based off a map of Samza stream offsets.
-   * @param offsets Map of Samza streams to their current offset.
+   * Constructs a new checkpoint based off a map of Samza stream offsets, using a default checkpoint id
+   * @param inputOffsets Map of Samza streams to their current offset.
    */
-  public Checkpoint(Map<SystemStreamPartition, String> offsets) {
-    this.offsets = offsets;
+  public Checkpoint(Map<SystemStreamPartition, String>  inputOffsets) {
+    this(CheckpointId.getPlaceholderCheckpointId(), inputOffsets, new HashMap<>());
+  }
+
+  /**
+   * Constructs the checkpoint with separated input and state offsets
+   * @param id CheckpointId associated with this checkpoint
+   * @param inputOffsets Map of Samza system stream partition to offset of the checkpoint
+   * @param stateCheckpoints Map of local state store names and StateCheckpointMarkers for each state backend system
+   */
+  public Checkpoint(CheckpointId id, Map<SystemStreamPartition, String> inputOffsets, Map<String, List<StateCheckpointMarker>> stateCheckpoints) {
+    this.checkpointId = id;
+    this.inputOffsets = inputOffsets;
+    this.stateCheckpoints = stateCheckpoints;
+  }
+
+  /**
+   * Gets the checkpoint id for the checkpoint
+   * @return The timestamp based checkpoint identifier associated with the checkpoint
+   */
+  public CheckpointId getCheckpointId() {
+    return checkpointId;
   }
 
   /**
    * Gets a unmodifiable view of the current Samza stream offsets.
    * @return A unmodifiable view of a Map of Samza streams to their recorded offsets.
    */
-  public Map<SystemStreamPartition, String> getOffsets() {
-    return Collections.unmodifiableMap(offsets);
+  public Map<SystemStreamPartition, String> getInputOffsets() {
+    return Collections.unmodifiableMap(inputOffsets);
+  }
+
+  /**
+   * Gets the stateCheckpointMarkers
+   * @return The state checkpoint markers for the checkpoint
+   */
+  public Map<String, List<StateCheckpointMarker>> getStateCheckpointMarkers() {
+    return stateCheckpoints;

Review comment:
       Minor: Return immutable copy.

##########
File path: samza-api/src/main/java/org/apache/samza/checkpoint/CheckpointId.java
##########
@@ -41,6 +41,14 @@ public CheckpointId(long millis, long nanos) {
     this.nanos = nanos;

Review comment:
       Can this be a private constructor so that the only way to create checkpoint IDs is through the public static create() method?

##########
File path: samza-api/src/main/java/org/apache/samza/checkpoint/StateCheckpointMarkerSerde.java
##########
@@ -0,0 +1,60 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.samza.checkpoint;
+
+import org.apache.commons.lang3.StringUtils;
+import org.apache.samza.SamzaException;
+
+
+public class StateCheckpointMarkerSerde<T extends StateCheckpointMarker> {
+  private static final short PROTOCOL_VERSION = 1;
+  private static final String SCM_SEPARATOR = ":";
+
+  public String serialize(T payload, StateCheckpointPayloadSerde<T> serde) {
+    StringBuilder builder = new StringBuilder();
+    builder.append(PROTOCOL_VERSION);
+    builder.append(SCM_SEPARATOR);
+    builder.append(serde.getClass().getName());
+    builder.append(SCM_SEPARATOR);
+    builder.append(serde.serialize(payload));
+
+    return builder.toString();
+  }
+
+  public T deserializePayload(String serializedSCM) {
+    if (StringUtils.isBlank(serializedSCM)) {
+      throw new IllegalArgumentException("Invalid remote store checkpoint message: " + serializedSCM);
+    }
+    String[] parts = serializedSCM.split(SCM_SEPARATOR);
+    if (parts.length != 3) {
+      throw new IllegalArgumentException("Invalid RemoteStore Metadata offset: " + serializedSCM);

Review comment:
       s/RemoteStore Metadata offset/state checkpoint marker

##########
File path: samza-api/src/main/java/org/apache/samza/checkpoint/StateCheckpointMarkerSerde.java
##########
@@ -0,0 +1,60 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.samza.checkpoint;
+
+import org.apache.commons.lang3.StringUtils;
+import org.apache.samza.SamzaException;
+
+
+public class StateCheckpointMarkerSerde<T extends StateCheckpointMarker> {
+  private static final short PROTOCOL_VERSION = 1;
+  private static final String SCM_SEPARATOR = ":";
+
+  public String serialize(T payload, StateCheckpointPayloadSerde<T> serde) {
+    StringBuilder builder = new StringBuilder();
+    builder.append(PROTOCOL_VERSION);
+    builder.append(SCM_SEPARATOR);
+    builder.append(serde.getClass().getName());
+    builder.append(SCM_SEPARATOR);
+    builder.append(serde.serialize(payload));
+
+    return builder.toString();
+  }
+
+  public T deserializePayload(String serializedSCM) {
+    if (StringUtils.isBlank(serializedSCM)) {
+      throw new IllegalArgumentException("Invalid remote store checkpoint message: " + serializedSCM);
+    }
+    String[] parts = serializedSCM.split(SCM_SEPARATOR);

Review comment:
       The serialized payload on line 36 may contain the separator as well. It's better to split into fixed length array (3 parts), and let the payload serde throw an exception if parts[2]  cannot be parsed correctly.

##########
File path: samza-api/src/main/java/org/apache/samza/storage/TaskBackupManager.java
##########
@@ -0,0 +1,81 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.samza.storage;
+
+import java.util.Map;
+import java.util.concurrent.CompletableFuture;
+
+import org.apache.samza.checkpoint.CheckpointId;
+import org.apache.samza.checkpoint.StateCheckpointMarker;
+
+/**
+ * <p>
+ * TaskBackupManager is the interface that must be implemented for
+ * any remote system that Samza persists its state to. The interface will be
+ * evoked in the following way:
+ * </p>
+ *
+ * <ul>
+ *   <li>Snapshot will be called before Upload.</li>
+ *   <li>persistToFilesystem will be called after Upload is completed</li>
+ *   <li>Cleanup is only called after Upload and persistToFilesystem has successfully completed</li>
+ * </ul>
+ */
+public interface TaskBackupManager {
+
+  /**
+   * Initiates the TaskBackupManagerIntance
+   */
+  default void init() {}

Review comment:
       Why default?

##########
File path: samza-api/src/main/java/org/apache/samza/checkpoint/Checkpoint.java
##########
@@ -30,22 +33,53 @@
  * of restarting a failed container within a running job.
  */
 public class Checkpoint {
-  private final Map<SystemStreamPartition, String> offsets;
+  private static final short PROTOCOL_VERSION = 1;
+  private final CheckpointId checkpointId;
+  private final Map<SystemStreamPartition, String> inputOffsets;
+  private final Map<String, List<StateCheckpointMarker>> stateCheckpoints;
 
   /**
-   * Constructs a new checkpoint based off a map of Samza stream offsets.
-   * @param offsets Map of Samza streams to their current offset.
+   * Constructs a new checkpoint based off a map of Samza stream offsets, using a default checkpoint id
+   * @param inputOffsets Map of Samza streams to their current offset.
    */
-  public Checkpoint(Map<SystemStreamPartition, String> offsets) {
-    this.offsets = offsets;
+  public Checkpoint(Map<SystemStreamPartition, String>  inputOffsets) {
+    this(CheckpointId.getPlaceholderCheckpointId(), inputOffsets, new HashMap<>());

Review comment:
       Why a fake placeholder id instead of valid but unused id?

##########
File path: samza-api/src/main/java/org/apache/samza/storage/kv/KeyValueStore.java
##########
@@ -35,6 +38,16 @@
  * @param <V> the type of values maintained by this key-value store.
  */
 public interface KeyValueStore<K, V> {
+
+  /**
+   * Initiates the KeyValueStore
+   *
+   * @param containerContext context of the KeyValueStore's container
+   * @param jobContext context of the job the KeyValueStore is in
+   * @param externalContext any external store required for initialization
+   */
+  default void init(ContainerContext containerContext, JobContext jobContext, ExternalContext externalContext) { }

Review comment:
       Minor: Reverse order (External, Job, Container) to match scope.

##########
File path: samza-core/src/main/scala/org/apache/samza/container/TaskInstance.scala
##########
@@ -236,42 +240,33 @@ class TaskInstance(
       tableManager.flush()
     }
 
-    var newestChangelogOffsets: Map[SystemStreamPartition, Option[String]] = null
-    if (storageManager != null) {
-      trace("Flushing state stores for taskName: %s" format taskName)
-      newestChangelogOffsets = storageManager.flush()
-      trace("Got newest changelog offsets for taskName: %s as: %s " format(taskName, newestChangelogOffsets))
-    }
-
     val checkpointId = CheckpointId.create()
-    if (storageManager != null && newestChangelogOffsets != null) {
-      trace("Checkpointing stores for taskName: %s with checkpoint id: %s" format (taskName, checkpointId))
-      storageManager.checkpoint(checkpointId, newestChangelogOffsets.toMap)
-    }
-
-    if (newestChangelogOffsets != null) {
-      newestChangelogOffsets.foreach {case (ssp, newestOffsetOption) =>
-        val offset = new CheckpointedChangelogOffset(checkpointId, newestOffsetOption.orNull).toString
-        allCheckpointOffsets.put(ssp, offset)
-      }
+    var newestStateCheckpointMakers: util.Map[String, util.List[StateCheckpointMarker]] = null
+    // Perform state commit
+    if (commitManager != null) {
+      trace("Flushing state stores for taskName: %s" format taskName)

Review comment:
       Update message to mention commit instead of flushing stores. Flushing stores step should go into the commit / storage manager instead.

##########
File path: samza-core/src/main/scala/org/apache/samza/container/TaskInstance.scala
##########
@@ -236,42 +240,33 @@ class TaskInstance(
       tableManager.flush()
     }
 
-    var newestChangelogOffsets: Map[SystemStreamPartition, Option[String]] = null
-    if (storageManager != null) {
-      trace("Flushing state stores for taskName: %s" format taskName)
-      newestChangelogOffsets = storageManager.flush()
-      trace("Got newest changelog offsets for taskName: %s as: %s " format(taskName, newestChangelogOffsets))
-    }
-
     val checkpointId = CheckpointId.create()
-    if (storageManager != null && newestChangelogOffsets != null) {
-      trace("Checkpointing stores for taskName: %s with checkpoint id: %s" format (taskName, checkpointId))
-      storageManager.checkpoint(checkpointId, newestChangelogOffsets.toMap)
-    }
-
-    if (newestChangelogOffsets != null) {
-      newestChangelogOffsets.foreach {case (ssp, newestOffsetOption) =>
-        val offset = new CheckpointedChangelogOffset(checkpointId, newestOffsetOption.orNull).toString
-        allCheckpointOffsets.put(ssp, offset)
-      }
+    var newestStateCheckpointMakers: util.Map[String, util.List[StateCheckpointMarker]] = null
+    // Perform state commit
+    if (commitManager != null) {
+      trace("Flushing state stores for taskName: %s" format taskName)
+      newestStateCheckpointMakers = commitManager.commit(taskName, checkpointId)
+      trace("Got newest state checkpoint markers for taskName: %s as: %s " format(taskName, newestStateCheckpointMakers))
     }
-    val checkpoint = new Checkpoint(allCheckpointOffsets)
+    val checkpoint = new Checkpoint(checkpointId, allCheckpointOffsets, newestStateCheckpointMakers)
     trace("Got combined checkpoint offsets for taskName: %s as: %s" format (taskName, allCheckpointOffsets))
 
+    // Commit state and input checkpoints offsets atomically

Review comment:
       Instead of 'commit' which is overloaded in this context, let's explain that this is "Write input offsets and state checkpoint markers to the checkpoint topic atomically"

##########
File path: samza-core/src/main/scala/org/apache/samza/container/TaskInstance.scala
##########
@@ -236,42 +240,33 @@ class TaskInstance(
       tableManager.flush()
     }
 
-    var newestChangelogOffsets: Map[SystemStreamPartition, Option[String]] = null
-    if (storageManager != null) {
-      trace("Flushing state stores for taskName: %s" format taskName)
-      newestChangelogOffsets = storageManager.flush()
-      trace("Got newest changelog offsets for taskName: %s as: %s " format(taskName, newestChangelogOffsets))
-    }
-
     val checkpointId = CheckpointId.create()
-    if (storageManager != null && newestChangelogOffsets != null) {
-      trace("Checkpointing stores for taskName: %s with checkpoint id: %s" format (taskName, checkpointId))
-      storageManager.checkpoint(checkpointId, newestChangelogOffsets.toMap)
-    }
-
-    if (newestChangelogOffsets != null) {
-      newestChangelogOffsets.foreach {case (ssp, newestOffsetOption) =>
-        val offset = new CheckpointedChangelogOffset(checkpointId, newestOffsetOption.orNull).toString
-        allCheckpointOffsets.put(ssp, offset)
-      }
+    var newestStateCheckpointMakers: util.Map[String, util.List[StateCheckpointMarker]] = null
+    // Perform state commit
+    if (commitManager != null) {
+      trace("Flushing state stores for taskName: %s" format taskName)
+      newestStateCheckpointMakers = commitManager.commit(taskName, checkpointId)
+      trace("Got newest state checkpoint markers for taskName: %s as: %s " format(taskName, newestStateCheckpointMakers))
     }
-    val checkpoint = new Checkpoint(allCheckpointOffsets)
+    val checkpoint = new Checkpoint(checkpointId, allCheckpointOffsets, newestStateCheckpointMakers)
     trace("Got combined checkpoint offsets for taskName: %s as: %s" format (taskName, allCheckpointOffsets))
 
+    // Commit state and input checkpoints offsets atomically
     offsetManager.writeCheckpoint(taskName, checkpoint)
 
-    if (storageManager != null) {
+    // Perform cleanup on unused checkpoints
+    if (commitManager != null) {
       trace("Remove old checkpoint stores for taskName: %s" format taskName)

Review comment:
       Generalize message and just say 'Cleaning up old checkpoint state'. Also include checkpointId in the trace message.

##########
File path: samza-core/src/main/scala/org/apache/samza/container/TaskInstance.scala
##########
@@ -236,42 +240,33 @@ class TaskInstance(
       tableManager.flush()
     }
 
-    var newestChangelogOffsets: Map[SystemStreamPartition, Option[String]] = null
-    if (storageManager != null) {
-      trace("Flushing state stores for taskName: %s" format taskName)
-      newestChangelogOffsets = storageManager.flush()
-      trace("Got newest changelog offsets for taskName: %s as: %s " format(taskName, newestChangelogOffsets))
-    }
-
     val checkpointId = CheckpointId.create()
-    if (storageManager != null && newestChangelogOffsets != null) {
-      trace("Checkpointing stores for taskName: %s with checkpoint id: %s" format (taskName, checkpointId))
-      storageManager.checkpoint(checkpointId, newestChangelogOffsets.toMap)
-    }
-
-    if (newestChangelogOffsets != null) {
-      newestChangelogOffsets.foreach {case (ssp, newestOffsetOption) =>
-        val offset = new CheckpointedChangelogOffset(checkpointId, newestOffsetOption.orNull).toString
-        allCheckpointOffsets.put(ssp, offset)
-      }
+    var newestStateCheckpointMakers: util.Map[String, util.List[StateCheckpointMarker]] = null
+    // Perform state commit
+    if (commitManager != null) {
+      trace("Flushing state stores for taskName: %s" format taskName)
+      newestStateCheckpointMakers = commitManager.commit(taskName, checkpointId)
+      trace("Got newest state checkpoint markers for taskName: %s as: %s " format(taskName, newestStateCheckpointMakers))
     }
-    val checkpoint = new Checkpoint(allCheckpointOffsets)
+    val checkpoint = new Checkpoint(checkpointId, allCheckpointOffsets, newestStateCheckpointMakers)
     trace("Got combined checkpoint offsets for taskName: %s as: %s" format (taskName, allCheckpointOffsets))
 
+    // Commit state and input checkpoints offsets atomically
     offsetManager.writeCheckpoint(taskName, checkpoint)
 
-    if (storageManager != null) {
+    // Perform cleanup on unused checkpoints
+    if (commitManager != null) {

Review comment:
       Can this null check be removed?

##########
File path: samza-core/src/main/scala/org/apache/samza/storage/TaskStorageCommitManager.java
##########
@@ -0,0 +1,99 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.samza.storage;
+
+import java.util.ArrayList;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.concurrent.CompletableFuture;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.TimeoutException;
+
+import org.apache.samza.SamzaException;
+import org.apache.samza.checkpoint.CheckpointId;
+import org.apache.samza.checkpoint.StateCheckpointMarker;
+import org.apache.samza.container.TaskName;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+
+public class TaskStorageCommitManager {
+
+  private static final Logger LOG = LoggerFactory.getLogger(TaskStorageCommitManager.class);
+  private static final long COMMIT_TIMEOUT_MS = 30000;
+  private final TaskBackupManager storageBackupManager;
+
+  public TaskStorageCommitManager(TaskBackupManager storageBackupManager) {
+    this.storageBackupManager = storageBackupManager;
+  }
+
+  /**
+   * Commits the local state on the remote backup implementation
+   * @return Committed StoreName to StateCheckpointMarker mappings of the committed SSPs
+   */
+  public Map<String, List<StateCheckpointMarker>> commit(TaskName taskName, CheckpointId checkpointId) {
+    Map<String, StateCheckpointMarker> snapshot = storageBackupManager.snapshot(checkpointId);
+    CompletableFuture<Map<String, StateCheckpointMarker>>
+        uploadFuture = storageBackupManager.upload(checkpointId, snapshot);
+
+    try {
+      // TODO: Make async with andThen and add thread management for concurrency
+      Map<String, StateCheckpointMarker> uploadMap = uploadFuture.get(COMMIT_TIMEOUT_MS, TimeUnit.MILLISECONDS);
+      if (uploadMap != null) {
+        LOG.trace("Checkpointing stores for taskName: {}} with checkpoint id: {}", taskName, checkpointId);
+        storageBackupManager.persistToFilesystem(checkpointId, uploadMap);
+      }
+
+      // TODO: call commit on multiple backup managers when available

Review comment:
       I'd recommend handling this (multiple backup managers per store) now to make sure all the interfaces, logging etc are general and consistent.

##########
File path: samza-kafka/src/main/java/org/apache/samza/storage/KafkaChangelogStateBackendFactory.java
##########
@@ -0,0 +1,127 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.samza.storage;
+
+import java.io.File;
+import java.time.Duration;
+import java.util.Collections;
+import java.util.Map;
+import java.util.Set;
+import org.apache.samza.SamzaException;
+import org.apache.samza.config.Config;
+import org.apache.samza.config.JobConfig;
+import org.apache.samza.config.StorageConfig;
+import org.apache.samza.config.TaskConfig;
+import org.apache.samza.container.SamzaContainer;
+import org.apache.samza.job.model.ContainerModel;
+import org.apache.samza.job.model.JobModel;
+import org.apache.samza.job.model.TaskModel;
+import org.apache.samza.system.SSPMetadataCache;
+import org.apache.samza.system.StreamMetadataCache;
+import org.apache.samza.system.SystemAdmins;
+import org.apache.samza.system.SystemStream;
+import org.apache.samza.system.SystemStreamPartition;
+import org.apache.samza.util.Clock;
+
+
+public class KafkaChangelogStateBackendFactory implements StateBackendFactory {
+  private static StreamMetadataCache streamCache;
+  private static SSPMetadataCache sspCache;
+
+  private static StreamMetadataCache getStreamCache(SystemAdmins admins, Clock clock) {
+    if (streamCache == null) {
+      streamCache = new StreamMetadataCache(admins, 5000, clock);
+    }
+    return streamCache;
+  }
+
+  private static SSPMetadataCache getSspCache(SystemAdmins admins, Clock clock, Set<SystemStreamPartition> ssps) {
+    if (sspCache == null) {
+      sspCache = new SSPMetadataCache(admins, Duration.ofSeconds(5), clock, ssps);
+    }
+    return sspCache;
+  }
+
+  @Override
+  public TaskBackupManager getBackupManager(JobModel jobModel, ContainerModel containerModel, TaskModel taskModel,
+      Map<String, StorageEngine> taskStores, Config config, Clock clock) {
+    SystemAdmins systemAdmins = new SystemAdmins(config);
+    StorageConfig storageConfig = new StorageConfig(config);
+    Map<String, SystemStream> storeChangelogs = storageConfig.getStoreChangelogs();
+
+    File defaultFileDir = new File(System.getProperty("user.dir"), "state");

Review comment:
       s/defaultFileDir/defaultStoreBaseDir

##########
File path: samza-api/src/main/java/org/apache/samza/checkpoint/KafkaStateChangelogOffset.java
##########
@@ -26,57 +26,58 @@
  * Checkpointed changelog offset has the format: [checkpointId, offset], separated by a colon.
  */
 @InterfaceStability.Unstable
-public class CheckpointedChangelogOffset {
+public class KafkaStateChangelogOffset {

Review comment:
       Can this be removed in this PR or do we need to keep this around for backwards compatibility?

##########
File path: samza-kafka/src/main/scala/org/apache/samza/storage/KafkaNonTransactionalStateTaskBackupManager.scala
##########
@@ -20,62 +20,70 @@
 package org.apache.samza.storage
 
 import java.io._
+import java.util
+import java.util.concurrent.CompletableFuture
 
 import com.google.common.annotations.VisibleForTesting
 import com.google.common.collect.ImmutableSet
-import org.apache.samza.checkpoint.CheckpointId
+import org.apache.samza.checkpoint.kafka.KafkaStateCheckpointMarker
+import org.apache.samza.checkpoint.{CheckpointId, StateCheckpointMarker}
 import org.apache.samza.container.TaskName
 import org.apache.samza.job.model.TaskMode
 import org.apache.samza.system._
 import org.apache.samza.util.Logging
-import org.apache.samza.util.ScalaJavaUtil.JavaOptionals
 import org.apache.samza.{Partition, SamzaException}
 
+import scala.collection.JavaConversions.mapAsScalaMap
 import scala.collection.JavaConverters._
+import scala.collection.mutable
 
 /**
  * Manage all the storage engines for a given task
  */
-class NonTransactionalStateTaskStorageManager(
+class KafkaNonTransactionalStateTaskBackupManager(
   taskName: TaskName,
-  containerStorageManager: ContainerStorageManager,
-  storeChangelogs: Map[String, SystemStream] = Map(),
+  taskStores: util.Map[String, StorageEngine],
+  storeChangelogs: util.Map[String, SystemStream] = new util.HashMap[String, SystemStream](),
   systemAdmins: SystemAdmins,
   loggedStoreBaseDir: File = new File(System.getProperty("user.dir"), "state"),
-  partition: Partition) extends Logging with TaskStorageManager {
+  partition: Partition) extends Logging with TaskBackupManager {
 
   private val storageManagerUtil = new StorageManagerUtil
-  private val persistedStores = containerStorageManager.getAllStores(taskName).asScala
+  private val persistedStores = taskStores.asScala
     .filter { case (storeName, storageEngine) => storageEngine.getStoreProperties.isPersistedToDisk }
 
-  def getStore(storeName: String): Option[StorageEngine] =  JavaOptionals.toRichOptional(containerStorageManager.getStore(taskName, storeName)).toOption
-
-  def flush(): Map[SystemStreamPartition, Option[String]] = {
+  override def snapshot(checkpointId: CheckpointId): util.Map[String, StateCheckpointMarker] = {
     debug("Flushing stores.")
-    containerStorageManager.getAllStores(taskName).asScala.values.foreach(_.flush)
+    taskStores.asScala.values.foreach(_.flush)
     val newestChangelogSSPOffsets = getNewestChangelogSSPOffsets()
-    writeChangelogOffsetFiles(newestChangelogSSPOffsets)
+    writeChangelogOffsetFiles(KafkaStateCheckpointMarker
+      .stateCheckpointMarkerToSSPmap(newestChangelogSSPOffsets))
     newestChangelogSSPOffsets
   }
 
-  override def checkpoint(checkpointId: CheckpointId,
-    newestChangelogOffsets: Map[SystemStreamPartition, Option[String]]): Unit = {}
+  override def upload(checkpointId: CheckpointId,
+    stateCheckpointMarkers: util.Map[String, StateCheckpointMarker]): CompletableFuture[util.Map[String, StateCheckpointMarker]] = {
+     CompletableFuture.completedFuture(stateCheckpointMarkers)
+  }
+
+  override def persistToFilesystem(checkpointId: CheckpointId,
+    stateCheckpointMarkers: util.Map[String, StateCheckpointMarker]): Unit = {}
 
-  override def removeOldCheckpoints(checkpointId: CheckpointId): Unit = {}
+  override def cleanUp(checkpointId: CheckpointId): Unit = {}
 
   @VisibleForTesting
-  def stop() {
+  def close() {

Review comment:
       override

##########
File path: samza-kafka/src/main/scala/org/apache/samza/storage/KafkaTransactionalStateTaskBackupManager.scala
##########
@@ -73,10 +80,11 @@ class TransactionalStateTaskStorageManager(
       }}
       .toMap
 
-    writeChangelogOffsetFiles(checkpointPaths, storeChangelogs, newestChangelogOffsets)
+    writeChangelogOffsetFiles(checkpointPaths, storeChangelogs,
+       KafkaStateCheckpointMarker.stateCheckpointMarkerToSSPmap(stateCheckpointMarkers).asScala)
   }
 
-  def removeOldCheckpoints(latestCheckpointId: CheckpointId): Unit = {
+  def cleanUp(latestCheckpointId: CheckpointId): Unit = {

Review comment:
       override def. Same for other overriden method.

##########
File path: samza-core/src/main/scala/org/apache/samza/container/TaskInstance.scala
##########
@@ -236,42 +240,33 @@ class TaskInstance(
       tableManager.flush()
     }
 
-    var newestChangelogOffsets: Map[SystemStreamPartition, Option[String]] = null
-    if (storageManager != null) {
-      trace("Flushing state stores for taskName: %s" format taskName)
-      newestChangelogOffsets = storageManager.flush()
-      trace("Got newest changelog offsets for taskName: %s as: %s " format(taskName, newestChangelogOffsets))
-    }
-
     val checkpointId = CheckpointId.create()
-    if (storageManager != null && newestChangelogOffsets != null) {
-      trace("Checkpointing stores for taskName: %s with checkpoint id: %s" format (taskName, checkpointId))
-      storageManager.checkpoint(checkpointId, newestChangelogOffsets.toMap)
-    }
-
-    if (newestChangelogOffsets != null) {
-      newestChangelogOffsets.foreach {case (ssp, newestOffsetOption) =>
-        val offset = new CheckpointedChangelogOffset(checkpointId, newestOffsetOption.orNull).toString
-        allCheckpointOffsets.put(ssp, offset)
-      }
+    var newestStateCheckpointMakers: util.Map[String, util.List[StateCheckpointMarker]] = null

Review comment:
       s/newestStateCheckpointMarker/stateCheckpointMarkers.
   The newest was in the name earlier to indicate that for kafka changelog topics we need to flush them first to get the latest offset. Not relevant in this class  anymore.

##########
File path: samza-api/src/main/java/org/apache/samza/storage/TaskBackupManager.java
##########
@@ -0,0 +1,81 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.samza.storage;
+
+import java.util.Map;
+import java.util.concurrent.CompletableFuture;
+
+import org.apache.samza.checkpoint.CheckpointId;
+import org.apache.samza.checkpoint.StateCheckpointMarker;
+
+/**
+ * <p>
+ * TaskBackupManager is the interface that must be implemented for
+ * any remote system that Samza persists its state to. The interface will be
+ * evoked in the following way:
+ * </p>
+ *
+ * <ul>
+ *   <li>Snapshot will be called before Upload.</li>
+ *   <li>persistToFilesystem will be called after Upload is completed</li>
+ *   <li>Cleanup is only called after Upload and persistToFilesystem has successfully completed</li>
+ * </ul>
+ */
+public interface TaskBackupManager {
+
+  /**
+   * Initiates the TaskBackupManagerIntance
+   */
+  default void init() {}
+
+  /**
+   * Commit operation that is synchronous to processing
+   * @param checkpointId Checkpoint id of the current commit
+   * @return The storename to checkpoint of the snapshotted local store
+   */
+  Map<String, StateCheckpointMarker> snapshot(CheckpointId checkpointId);
+
+  /**
+   * Commit operation that is asynchronous to message processing,
+   * @param checkpointId Checkpoint id of the current commit
+   * @param stateCheckpointMarkers The map of storename to checkpoint makers returned by the snapshot
+   * @return The future of storename to checkpoint map of the uploaded local store
+   */
+  CompletableFuture<Map<String, StateCheckpointMarker>> upload(CheckpointId checkpointId, Map<String, StateCheckpointMarker> stateCheckpointMarkers);
+
+  /**
+   * Persist the state locally to the file system
+   * @param checkpointId The id of the checkpoint to be committed
+   * @param stateCheckpointMarkers Uploaded storename to checkpoints markers to be persisted locally
+   */
+  void persistToFilesystem(CheckpointId checkpointId, Map<String, StateCheckpointMarker> stateCheckpointMarkers);
+
+  /**
+   * Cleanup any local or remote state for obsolete checkpoint information that are older than checkpointId
+   * @param checkpointId The id of the latest successfully committed checkpoint
+   */
+  void cleanUp(CheckpointId checkpointId);
+
+  /**
+   * Used for testing as a shutdown hook to cleanup any allocated resources

Review comment:
       Let's remove "used for testing" and make this a part of the public interface to mirror init.

##########
File path: samza-core/src/main/scala/org/apache/samza/container/TaskInstance.scala
##########
@@ -236,42 +240,33 @@ class TaskInstance(
       tableManager.flush()
     }
 
-    var newestChangelogOffsets: Map[SystemStreamPartition, Option[String]] = null
-    if (storageManager != null) {
-      trace("Flushing state stores for taskName: %s" format taskName)
-      newestChangelogOffsets = storageManager.flush()
-      trace("Got newest changelog offsets for taskName: %s as: %s " format(taskName, newestChangelogOffsets))
-    }
-
     val checkpointId = CheckpointId.create()
-    if (storageManager != null && newestChangelogOffsets != null) {
-      trace("Checkpointing stores for taskName: %s with checkpoint id: %s" format (taskName, checkpointId))
-      storageManager.checkpoint(checkpointId, newestChangelogOffsets.toMap)
-    }
-
-    if (newestChangelogOffsets != null) {
-      newestChangelogOffsets.foreach {case (ssp, newestOffsetOption) =>
-        val offset = new CheckpointedChangelogOffset(checkpointId, newestOffsetOption.orNull).toString
-        allCheckpointOffsets.put(ssp, offset)
-      }
+    var newestStateCheckpointMakers: util.Map[String, util.List[StateCheckpointMarker]] = null

Review comment:
       Will the commit manager ever be null? If not, instead of a mutable nullable variable, just return an empty map from commitManager if there are no state checkpoint markers.

##########
File path: samza-core/src/main/scala/org/apache/samza/container/SamzaContainer.scala
##########
@@ -52,7 +52,8 @@ import org.apache.samza.util.{Util, _}
 import org.apache.samza.SamzaException
 import org.apache.samza.clustermanager.StandbyTaskUtil
 
-import scala.collection.JavaConversions
+import scala.collection.JavaConversions.mapAsScalaMap

Review comment:
       JavaConversions is deprecated. User JavaConverters instead.
   
   https://stackoverflow.com/questions/8301947/what-is-the-difference-between-javaconverters-and-javaconversions-in-scala

##########
File path: samza-kafka/src/main/java/org/apache/samza/checkpoint/kafka/KafkaStateCheckpointMarker.java
##########
@@ -0,0 +1,118 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.samza.checkpoint.kafka;
+
+
+import java.util.HashMap;
+import java.util.Map;
+import java.util.Objects;
+import org.apache.commons.lang3.StringUtils;
+import org.apache.samza.Partition;
+import org.apache.samza.annotation.InterfaceStability;
+import org.apache.samza.checkpoint.StateCheckpointMarker;
+import org.apache.samza.storage.KafkaChangelogStateBackendFactory;
+import org.apache.samza.system.SystemStreamPartition;
+import scala.Option;
+
+
+@InterfaceStability.Unstable
+public class KafkaStateCheckpointMarker implements StateCheckpointMarker {
+  private static final short PROTO_VERSION = 1;
+  private static final String FACTORY_NAME = KafkaChangelogStateBackendFactory.class.getName();
+  private static final String SEPARATOR = ";";
+
+  // One offset per SSP
+  private final SystemStreamPartition ssp;
+  private final String changelogOffset;
+
+  public KafkaStateCheckpointMarker(SystemStreamPartition ssp, String changelogOffset) {
+    this.ssp = ssp;
+    this.changelogOffset = changelogOffset;
+  }
+
+  public static short getProtocolVersion() {
+    return PROTO_VERSION;
+  }
+
+  public SystemStreamPartition getSsp() {
+    return ssp;
+  }
+
+  public String getChangelogOffset() {
+    return changelogOffset;
+  }
+
+  @Override
+  public String getFactoryName() {
+    return FACTORY_NAME;
+  }
+
+  @Override
+  public boolean equals(Object o) {
+    if (this == o) return true;
+    if (o == null || getClass() != o.getClass()) return false;
+    KafkaStateCheckpointMarker that = (KafkaStateCheckpointMarker) o;
+    return Objects.equals(ssp, that.ssp) &&
+        Objects.equals(changelogOffset, that.changelogOffset);
+  }
+
+  @Override
+  public int hashCode() {
+    return Objects.hash(PROTO_VERSION, ssp, changelogOffset);
+  }
+
+  @Override
+  public String toString() {
+    return String.format("%s%s%s%s%s%s%s%s%s", PROTO_VERSION, SEPARATOR,
+      ssp.getSystem(), SEPARATOR, ssp.getStream(), SEPARATOR, ssp.getPartition(), SEPARATOR, changelogOffset);
+  }
+
+  public static KafkaStateCheckpointMarker fromString(String message) {
+    if (StringUtils.isBlank(message)) {
+      throw new IllegalArgumentException("Invalid KafkaStateCheckpointMarker format: " + message);
+    }
+    String[] payload = message.split(KafkaStateCheckpointMarker.SEPARATOR);
+    if (payload.length != 5) {
+      throw new IllegalArgumentException("Invalid KafkaStateCheckpointMarker argument count: " + message);
+    }
+    if (Short.parseShort(payload[0]) != PROTO_VERSION) {
+      throw new IllegalArgumentException("Invalid KafkaStateCheckpointMarker protocol version: " + message);
+    }
+    Partition partition = new Partition(Integer.parseInt(payload[3]));
+    String offset = null;
+    if (!"null".equals(payload[4])) {
+      offset = payload[4];
+    }
+
+    return new KafkaStateCheckpointMarker(new SystemStreamPartition(payload[1], payload[2], partition), offset);
+  }
+
+  public static Map<SystemStreamPartition, Option<String>> stateCheckpointMarkerToSSPmap(Map<String, StateCheckpointMarker> markers) {

Review comment:
       Javadoc for public method.

##########
File path: samza-core/src/main/scala/org/apache/samza/container/TaskInstance.scala
##########
@@ -224,8 +228,8 @@ class TaskInstance(
     val allCheckpointOffsets = new java.util.HashMap[SystemStreamPartition, String]()
     val inputCheckpoint = offsetManager.buildCheckpoint(taskName)
     if (inputCheckpoint != null) {
-      trace("Got input offsets for taskName: %s as: %s" format(taskName, inputCheckpoint.getOffsets))
-      allCheckpointOffsets.putAll(inputCheckpoint.getOffsets)
+      trace("Got input offsets for taskName: %s as: %s" format(taskName, inputCheckpoint.getInputOffsets))
+      allCheckpointOffsets.putAll(inputCheckpoint.getInputOffsets)

Review comment:
       It looks like we don't need a separate allCheckpointOffsets map anymore and inputCheckpoint will suffice.

##########
File path: samza-core/src/main/scala/org/apache/samza/storage/TaskStorageCommitManager.java
##########
@@ -0,0 +1,99 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.samza.storage;
+
+import java.util.ArrayList;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.concurrent.CompletableFuture;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.TimeoutException;
+
+import org.apache.samza.SamzaException;
+import org.apache.samza.checkpoint.CheckpointId;
+import org.apache.samza.checkpoint.StateCheckpointMarker;
+import org.apache.samza.container.TaskName;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+
+public class TaskStorageCommitManager {
+
+  private static final Logger LOG = LoggerFactory.getLogger(TaskStorageCommitManager.class);
+  private static final long COMMIT_TIMEOUT_MS = 30000;
+  private final TaskBackupManager storageBackupManager;
+
+  public TaskStorageCommitManager(TaskBackupManager storageBackupManager) {
+    this.storageBackupManager = storageBackupManager;

Review comment:
       Does this need to be a per store map?

##########
File path: samza-core/src/main/scala/org/apache/samza/storage/TaskStorageCommitManager.java
##########
@@ -0,0 +1,99 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.samza.storage;
+
+import java.util.ArrayList;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.concurrent.CompletableFuture;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.TimeoutException;
+
+import org.apache.samza.SamzaException;
+import org.apache.samza.checkpoint.CheckpointId;
+import org.apache.samza.checkpoint.StateCheckpointMarker;
+import org.apache.samza.container.TaskName;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+
+public class TaskStorageCommitManager {
+
+  private static final Logger LOG = LoggerFactory.getLogger(TaskStorageCommitManager.class);
+  private static final long COMMIT_TIMEOUT_MS = 30000;
+  private final TaskBackupManager storageBackupManager;
+
+  public TaskStorageCommitManager(TaskBackupManager storageBackupManager) {
+    this.storageBackupManager = storageBackupManager;
+  }
+
+  /**
+   * Commits the local state on the remote backup implementation
+   * @return Committed StoreName to StateCheckpointMarker mappings of the committed SSPs
+   */
+  public Map<String, List<StateCheckpointMarker>> commit(TaskName taskName, CheckpointId checkpointId) {
+    Map<String, StateCheckpointMarker> snapshot = storageBackupManager.snapshot(checkpointId);
+    CompletableFuture<Map<String, StateCheckpointMarker>>
+        uploadFuture = storageBackupManager.upload(checkpointId, snapshot);
+
+    try {
+      // TODO: Make async with andThen and add thread management for concurrency
+      Map<String, StateCheckpointMarker> uploadMap = uploadFuture.get(COMMIT_TIMEOUT_MS, TimeUnit.MILLISECONDS);
+      if (uploadMap != null) {
+        LOG.trace("Checkpointing stores for taskName: {}} with checkpoint id: {}", taskName, checkpointId);
+        storageBackupManager.persistToFilesystem(checkpointId, uploadMap);
+      }
+
+      // TODO: call commit on multiple backup managers when available
+      return mergeCheckpoints(uploadMap);
+    } catch (TimeoutException e) {
+      throw new SamzaException("Upload timed out, commitTimeoutMs: " + COMMIT_TIMEOUT_MS, e);
+    } catch (Exception e) {
+      throw new SamzaException("Upload commit portion could not be completed", e);

Review comment:
       Include store and task information in error message.

##########
File path: samza-core/src/main/scala/org/apache/samza/container/TaskInstance.scala
##########
@@ -73,8 +76,9 @@ class TaskInstance(
 
   private val kvStoreSupplier = ScalaJavaUtil.toJavaFunction(
     (storeName: String) => {
-      if (storageManager != null && storageManager.getStore(storeName).isDefined) {
-        storageManager.getStore(storeName).get.asInstanceOf[KeyValueStore[_, _]]
+      if (containerStorageManager != null) {
+        val storeOption = JavaOptionals.toRichOptional(containerStorageManager.getStore(taskName, storeName)).toOption

Review comment:
       Probably don't need to convert to java optional here just to do isDefined

##########
File path: samza-core/src/main/scala/org/apache/samza/container/TaskInstance.scala
##########
@@ -236,42 +240,33 @@ class TaskInstance(
       tableManager.flush()
     }
 
-    var newestChangelogOffsets: Map[SystemStreamPartition, Option[String]] = null
-    if (storageManager != null) {
-      trace("Flushing state stores for taskName: %s" format taskName)
-      newestChangelogOffsets = storageManager.flush()
-      trace("Got newest changelog offsets for taskName: %s as: %s " format(taskName, newestChangelogOffsets))
-    }
-
     val checkpointId = CheckpointId.create()
-    if (storageManager != null && newestChangelogOffsets != null) {
-      trace("Checkpointing stores for taskName: %s with checkpoint id: %s" format (taskName, checkpointId))
-      storageManager.checkpoint(checkpointId, newestChangelogOffsets.toMap)
-    }
-
-    if (newestChangelogOffsets != null) {
-      newestChangelogOffsets.foreach {case (ssp, newestOffsetOption) =>
-        val offset = new CheckpointedChangelogOffset(checkpointId, newestOffsetOption.orNull).toString
-        allCheckpointOffsets.put(ssp, offset)
-      }
+    var newestStateCheckpointMakers: util.Map[String, util.List[StateCheckpointMarker]] = null
+    // Perform state commit
+    if (commitManager != null) {
+      trace("Flushing state stores for taskName: %s" format taskName)
+      newestStateCheckpointMakers = commitManager.commit(taskName, checkpointId)
+      trace("Got newest state checkpoint markers for taskName: %s as: %s " format(taskName, newestStateCheckpointMakers))
     }
-    val checkpoint = new Checkpoint(allCheckpointOffsets)
+    val checkpoint = new Checkpoint(checkpointId, allCheckpointOffsets, newestStateCheckpointMakers)
     trace("Got combined checkpoint offsets for taskName: %s as: %s" format (taskName, allCheckpointOffsets))

Review comment:
       Update trace message, log state checkpoint markers as well.

##########
File path: samza-kafka/src/main/java/org/apache/samza/storage/KafkaChangelogStateBackendFactory.java
##########
@@ -0,0 +1,127 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.samza.storage;
+
+import java.io.File;
+import java.time.Duration;
+import java.util.Collections;
+import java.util.Map;
+import java.util.Set;
+import org.apache.samza.SamzaException;
+import org.apache.samza.config.Config;
+import org.apache.samza.config.JobConfig;
+import org.apache.samza.config.StorageConfig;
+import org.apache.samza.config.TaskConfig;
+import org.apache.samza.container.SamzaContainer;
+import org.apache.samza.job.model.ContainerModel;
+import org.apache.samza.job.model.JobModel;
+import org.apache.samza.job.model.TaskModel;
+import org.apache.samza.system.SSPMetadataCache;
+import org.apache.samza.system.StreamMetadataCache;
+import org.apache.samza.system.SystemAdmins;
+import org.apache.samza.system.SystemStream;
+import org.apache.samza.system.SystemStreamPartition;
+import org.apache.samza.util.Clock;
+
+
+public class KafkaChangelogStateBackendFactory implements StateBackendFactory {
+  private static StreamMetadataCache streamCache;
+  private static SSPMetadataCache sspCache;
+
+  private static StreamMetadataCache getStreamCache(SystemAdmins admins, Clock clock) {
+    if (streamCache == null) {
+      streamCache = new StreamMetadataCache(admins, 5000, clock);

Review comment:
       Does this need to be thread safe? Same for method below.

##########
File path: samza-kv/src/main/scala/org/apache/samza/storage/kv/KeyValueStorageEngine.scala
##########
@@ -31,6 +31,7 @@ import java.util.Optional
 
 import com.google.common.annotations.VisibleForTesting
 import org.apache.samza.checkpoint.CheckpointId
+import org.apache.samza.context.{ContainerContext, Context, ExternalContext, JobContext}

Review comment:
       Unused import?

##########
File path: samza-kafka/src/main/java/org/apache/samza/storage/KafkaChangelogStateBackendFactory.java
##########
@@ -0,0 +1,127 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.samza.storage;
+
+import java.io.File;
+import java.time.Duration;
+import java.util.Collections;
+import java.util.Map;
+import java.util.Set;
+import org.apache.samza.SamzaException;
+import org.apache.samza.config.Config;
+import org.apache.samza.config.JobConfig;
+import org.apache.samza.config.StorageConfig;
+import org.apache.samza.config.TaskConfig;
+import org.apache.samza.container.SamzaContainer;
+import org.apache.samza.job.model.ContainerModel;
+import org.apache.samza.job.model.JobModel;
+import org.apache.samza.job.model.TaskModel;
+import org.apache.samza.system.SSPMetadataCache;
+import org.apache.samza.system.StreamMetadataCache;
+import org.apache.samza.system.SystemAdmins;
+import org.apache.samza.system.SystemStream;
+import org.apache.samza.system.SystemStreamPartition;
+import org.apache.samza.util.Clock;
+
+
+public class KafkaChangelogStateBackendFactory implements StateBackendFactory {
+  private static StreamMetadataCache streamCache;
+  private static SSPMetadataCache sspCache;
+
+  private static StreamMetadataCache getStreamCache(SystemAdmins admins, Clock clock) {

Review comment:
       Move private methods below public methods.

##########
File path: samza-kafka/src/main/java/org/apache/samza/storage/KafkaChangelogStateBackendFactory.java
##########
@@ -0,0 +1,127 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.samza.storage;
+
+import java.io.File;
+import java.time.Duration;
+import java.util.Collections;
+import java.util.Map;
+import java.util.Set;
+import org.apache.samza.SamzaException;
+import org.apache.samza.config.Config;
+import org.apache.samza.config.JobConfig;
+import org.apache.samza.config.StorageConfig;
+import org.apache.samza.config.TaskConfig;
+import org.apache.samza.container.SamzaContainer;
+import org.apache.samza.job.model.ContainerModel;
+import org.apache.samza.job.model.JobModel;
+import org.apache.samza.job.model.TaskModel;
+import org.apache.samza.system.SSPMetadataCache;
+import org.apache.samza.system.StreamMetadataCache;
+import org.apache.samza.system.SystemAdmins;
+import org.apache.samza.system.SystemStream;
+import org.apache.samza.system.SystemStreamPartition;
+import org.apache.samza.util.Clock;
+
+
+public class KafkaChangelogStateBackendFactory implements StateBackendFactory {
+  private static StreamMetadataCache streamCache;
+  private static SSPMetadataCache sspCache;
+
+  private static StreamMetadataCache getStreamCache(SystemAdmins admins, Clock clock) {
+    if (streamCache == null) {
+      streamCache = new StreamMetadataCache(admins, 5000, clock);
+    }
+    return streamCache;
+  }
+
+  private static SSPMetadataCache getSspCache(SystemAdmins admins, Clock clock, Set<SystemStreamPartition> ssps) {
+    if (sspCache == null) {
+      sspCache = new SSPMetadataCache(admins, Duration.ofSeconds(5), clock, ssps);
+    }
+    return sspCache;
+  }
+
+  @Override
+  public TaskBackupManager getBackupManager(JobModel jobModel, ContainerModel containerModel, TaskModel taskModel,
+      Map<String, StorageEngine> taskStores, Config config, Clock clock) {
+    SystemAdmins systemAdmins = new SystemAdmins(config);
+    StorageConfig storageConfig = new StorageConfig(config);
+    Map<String, SystemStream> storeChangelogs = storageConfig.getStoreChangelogs();
+
+    File defaultFileDir = new File(System.getProperty("user.dir"), "state");
+    File loggedStoreBaseDir = SamzaContainer.getLoggedStorageBaseDir(new JobConfig(config), defaultFileDir);
+
+    if (new TaskConfig(config).getTransactionalStateCheckpointEnabled()) {
+      return new KafkaTransactionalStateTaskBackupManager(taskModel.getTaskName(),
+          taskStores, storeChangelogs, systemAdmins, loggedStoreBaseDir, taskModel.getChangelogPartition(),
+          taskModel.getTaskMode(), new StorageManagerUtil());
+    } else {
+      return new KafkaNonTransactionalStateTaskBackupManager(taskModel.getTaskName(),
+          taskStores, storeChangelogs, systemAdmins, loggedStoreBaseDir, taskModel.getChangelogPartition());
+    }
+  }
+
+  @Override
+  public TaskRestoreManager getRestoreManager(JobModel jobModel, ContainerModel containerModel, TaskModel taskModel, Map<String, StorageEngine> taskStores, Config config, Clock clock) {
+    SystemAdmins systemAdmins = new SystemAdmins(config);
+    Map<String, SystemStream> storeChangelogs = new StorageConfig(config).getStoreChangelogs();
+    Map<String, SystemStream> filteredStoreChangelogs = ContainerStorageManager
+        .getChangelogSystemStreams(containerModel, storeChangelogs, null);
+
+    File defaultFileDir = new File(System.getProperty("user.dir"), "state");
+    File loggedStoreBaseDir = SamzaContainer.getLoggedStorageBaseDir(new JobConfig(config), defaultFileDir);
+    File nonLoggedStoreBaseDir = SamzaContainer.getNonLoggedStorageBaseDir(new JobConfig(config), defaultFileDir);
+
+    if (new TaskConfig(config).getTransactionalStateRestoreEnabled()) {
+      return new TransactionalStateTaskRestoreManager(
+          taskModel,
+          taskStores,
+          filteredStoreChangelogs,
+          systemAdmins,
+          null, // TODO @dchen have the restore managers create and manage Kafka consume lifecycle
+          KafkaChangelogStateBackendFactory
+              .getSspCache(systemAdmins, clock, Collections.emptySet()),
+          loggedStoreBaseDir,
+          nonLoggedStoreBaseDir,
+          config,
+          clock
+      );
+    } else {
+      return new NonTransactionalStateTaskRestoreManager(
+          taskModel,
+          filteredStoreChangelogs,
+          taskStores,
+          systemAdmins,
+          KafkaChangelogStateBackendFactory.getStreamCache(systemAdmins, clock),
+          null, // TODO  @dchen have the restore managers create and manage Kafka consume lifecycle

Review comment:
       What does this mean? Is this required for this PR to work E2E with existing jobs?

##########
File path: samza-api/src/main/java/org/apache/samza/storage/TaskBackupManager.java
##########
@@ -0,0 +1,81 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.samza.storage;
+
+import java.util.Map;
+import java.util.concurrent.CompletableFuture;
+
+import org.apache.samza.checkpoint.CheckpointId;
+import org.apache.samza.checkpoint.StateCheckpointMarker;
+
+/**
+ * <p>
+ * TaskBackupManager is the interface that must be implemented for
+ * any remote system that Samza persists its state to. The interface will be
+ * evoked in the following way:
+ * </p>
+ *
+ * <ul>
+ *   <li>Snapshot will be called before Upload.</li>
+ *   <li>persistToFilesystem will be called after Upload is completed</li>
+ *   <li>Cleanup is only called after Upload and persistToFilesystem has successfully completed</li>
+ * </ul>
+ */
+public interface TaskBackupManager {

Review comment:
       Let's include some more details in the class and method javadocs explaining what these concepts (snapshots, uploads, cleanup etc.) are. Let's assume that the reader has not read the design doc and is not familiar with the new commit process.

##########
File path: samza-core/src/main/scala/org/apache/samza/storage/TaskStorageCommitManager.java
##########
@@ -0,0 +1,99 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.samza.storage;
+
+import java.util.ArrayList;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.concurrent.CompletableFuture;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.TimeoutException;
+
+import org.apache.samza.SamzaException;
+import org.apache.samza.checkpoint.CheckpointId;
+import org.apache.samza.checkpoint.StateCheckpointMarker;
+import org.apache.samza.container.TaskName;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+
+public class TaskStorageCommitManager {
+
+  private static final Logger LOG = LoggerFactory.getLogger(TaskStorageCommitManager.class);
+  private static final long COMMIT_TIMEOUT_MS = 30000;
+  private final TaskBackupManager storageBackupManager;
+
+  public TaskStorageCommitManager(TaskBackupManager storageBackupManager) {
+    this.storageBackupManager = storageBackupManager;
+  }
+
+  /**
+   * Commits the local state on the remote backup implementation
+   * @return Committed StoreName to StateCheckpointMarker mappings of the committed SSPs
+   */
+  public Map<String, List<StateCheckpointMarker>> commit(TaskName taskName, CheckpointId checkpointId) {
+    Map<String, StateCheckpointMarker> snapshot = storageBackupManager.snapshot(checkpointId);
+    CompletableFuture<Map<String, StateCheckpointMarker>>
+        uploadFuture = storageBackupManager.upload(checkpointId, snapshot);
+
+    try {
+      // TODO: Make async with andThen and add thread management for concurrency
+      Map<String, StateCheckpointMarker> uploadMap = uploadFuture.get(COMMIT_TIMEOUT_MS, TimeUnit.MILLISECONDS);

Review comment:
       It'll be better to not include the commit timeout in this version until we have the concurrency handled. Otherwise large commits might timeout and fail the job instead of just stalling processing for some time.

##########
File path: samza-kafka/src/main/scala/org/apache/samza/storage/KafkaNonTransactionalStateTaskBackupManager.scala
##########
@@ -20,62 +20,70 @@
 package org.apache.samza.storage
 
 import java.io._
+import java.util
+import java.util.concurrent.CompletableFuture
 
 import com.google.common.annotations.VisibleForTesting
 import com.google.common.collect.ImmutableSet
-import org.apache.samza.checkpoint.CheckpointId
+import org.apache.samza.checkpoint.kafka.KafkaStateCheckpointMarker
+import org.apache.samza.checkpoint.{CheckpointId, StateCheckpointMarker}
 import org.apache.samza.container.TaskName
 import org.apache.samza.job.model.TaskMode
 import org.apache.samza.system._
 import org.apache.samza.util.Logging
-import org.apache.samza.util.ScalaJavaUtil.JavaOptionals
 import org.apache.samza.{Partition, SamzaException}
 
+import scala.collection.JavaConversions.mapAsScalaMap
 import scala.collection.JavaConverters._
+import scala.collection.mutable
 
 /**
  * Manage all the storage engines for a given task
  */
-class NonTransactionalStateTaskStorageManager(
+class KafkaNonTransactionalStateTaskBackupManager(
   taskName: TaskName,
-  containerStorageManager: ContainerStorageManager,
-  storeChangelogs: Map[String, SystemStream] = Map(),
+  taskStores: util.Map[String, StorageEngine],
+  storeChangelogs: util.Map[String, SystemStream] = new util.HashMap[String, SystemStream](),
   systemAdmins: SystemAdmins,
   loggedStoreBaseDir: File = new File(System.getProperty("user.dir"), "state"),
-  partition: Partition) extends Logging with TaskStorageManager {
+  partition: Partition) extends Logging with TaskBackupManager {
 
   private val storageManagerUtil = new StorageManagerUtil
-  private val persistedStores = containerStorageManager.getAllStores(taskName).asScala
+  private val persistedStores = taskStores.asScala
     .filter { case (storeName, storageEngine) => storageEngine.getStoreProperties.isPersistedToDisk }
 
-  def getStore(storeName: String): Option[StorageEngine] =  JavaOptionals.toRichOptional(containerStorageManager.getStore(taskName, storeName)).toOption
-
-  def flush(): Map[SystemStreamPartition, Option[String]] = {
+  override def snapshot(checkpointId: CheckpointId): util.Map[String, StateCheckpointMarker] = {
     debug("Flushing stores.")
-    containerStorageManager.getAllStores(taskName).asScala.values.foreach(_.flush)
+    taskStores.asScala.values.foreach(_.flush)
     val newestChangelogSSPOffsets = getNewestChangelogSSPOffsets()
-    writeChangelogOffsetFiles(newestChangelogSSPOffsets)
+    writeChangelogOffsetFiles(KafkaStateCheckpointMarker
+      .stateCheckpointMarkerToSSPmap(newestChangelogSSPOffsets))
     newestChangelogSSPOffsets
   }
 
-  override def checkpoint(checkpointId: CheckpointId,
-    newestChangelogOffsets: Map[SystemStreamPartition, Option[String]]): Unit = {}
+  override def upload(checkpointId: CheckpointId,
+    stateCheckpointMarkers: util.Map[String, StateCheckpointMarker]): CompletableFuture[util.Map[String, StateCheckpointMarker]] = {
+     CompletableFuture.completedFuture(stateCheckpointMarkers)
+  }
+
+  override def persistToFilesystem(checkpointId: CheckpointId,

Review comment:
       Why is this a no-op? Isn't this what should be calling `writeChangelogOffsetFiles`?

##########
File path: samza-core/src/main/scala/org/apache/samza/storage/TaskStorageCommitManager.java
##########
@@ -0,0 +1,99 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.samza.storage;
+
+import java.util.ArrayList;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.concurrent.CompletableFuture;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.TimeoutException;
+
+import org.apache.samza.SamzaException;
+import org.apache.samza.checkpoint.CheckpointId;
+import org.apache.samza.checkpoint.StateCheckpointMarker;
+import org.apache.samza.container.TaskName;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+
+public class TaskStorageCommitManager {
+
+  private static final Logger LOG = LoggerFactory.getLogger(TaskStorageCommitManager.class);
+  private static final long COMMIT_TIMEOUT_MS = 30000;
+  private final TaskBackupManager storageBackupManager;
+
+  public TaskStorageCommitManager(TaskBackupManager storageBackupManager) {
+    this.storageBackupManager = storageBackupManager;
+  }
+
+  /**
+   * Commits the local state on the remote backup implementation
+   * @return Committed StoreName to StateCheckpointMarker mappings of the committed SSPs
+   */
+  public Map<String, List<StateCheckpointMarker>> commit(TaskName taskName, CheckpointId checkpointId) {
+    Map<String, StateCheckpointMarker> snapshot = storageBackupManager.snapshot(checkpointId);

Review comment:
       Trace log returned SCMs

##########
File path: samza-core/src/main/scala/org/apache/samza/storage/TaskStorageCommitManager.java
##########
@@ -0,0 +1,99 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.samza.storage;
+
+import java.util.ArrayList;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.concurrent.CompletableFuture;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.TimeoutException;
+
+import org.apache.samza.SamzaException;
+import org.apache.samza.checkpoint.CheckpointId;
+import org.apache.samza.checkpoint.StateCheckpointMarker;
+import org.apache.samza.container.TaskName;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+
+public class TaskStorageCommitManager {
+
+  private static final Logger LOG = LoggerFactory.getLogger(TaskStorageCommitManager.class);
+  private static final long COMMIT_TIMEOUT_MS = 30000;
+  private final TaskBackupManager storageBackupManager;
+
+  public TaskStorageCommitManager(TaskBackupManager storageBackupManager) {
+    this.storageBackupManager = storageBackupManager;
+  }
+
+  /**
+   * Commits the local state on the remote backup implementation
+   * @return Committed StoreName to StateCheckpointMarker mappings of the committed SSPs
+   */
+  public Map<String, List<StateCheckpointMarker>> commit(TaskName taskName, CheckpointId checkpointId) {
+    Map<String, StateCheckpointMarker> snapshot = storageBackupManager.snapshot(checkpointId);
+    CompletableFuture<Map<String, StateCheckpointMarker>>
+        uploadFuture = storageBackupManager.upload(checkpointId, snapshot);
+
+    try {
+      // TODO: Make async with andThen and add thread management for concurrency
+      Map<String, StateCheckpointMarker> uploadMap = uploadFuture.get(COMMIT_TIMEOUT_MS, TimeUnit.MILLISECONDS);
+      if (uploadMap != null) {
+        LOG.trace("Checkpointing stores for taskName: {}} with checkpoint id: {}", taskName, checkpointId);
+        storageBackupManager.persistToFilesystem(checkpointId, uploadMap);
+      }
+
+      // TODO: call commit on multiple backup managers when available
+      return mergeCheckpoints(uploadMap);
+    } catch (TimeoutException e) {
+      throw new SamzaException("Upload timed out, commitTimeoutMs: " + COMMIT_TIMEOUT_MS, e);
+    } catch (Exception e) {
+      throw new SamzaException("Upload commit portion could not be completed", e);
+    }
+  }
+
+  public void cleanUp(CheckpointId checkpointId) {
+    storageBackupManager.cleanUp(checkpointId);
+  }
+
+  private Map<String, List<StateCheckpointMarker>> mergeCheckpoints(Map<String, StateCheckpointMarker>... stateCheckpoints) {
+    if (stateCheckpoints == null || stateCheckpoints.length < 1) {
+      return null;
+    }
+    Map<String, StateCheckpointMarker> firstCheckpoint = stateCheckpoints[0];
+    if (firstCheckpoint == null) {
+      return null;

Review comment:
       Return empty collections instead of nulls wherever applicable. Avoids accidental NPEs.

##########
File path: samza-core/src/main/scala/org/apache/samza/storage/TaskStorageCommitManager.java
##########
@@ -0,0 +1,99 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.samza.storage;
+
+import java.util.ArrayList;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.concurrent.CompletableFuture;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.TimeoutException;
+
+import org.apache.samza.SamzaException;
+import org.apache.samza.checkpoint.CheckpointId;
+import org.apache.samza.checkpoint.StateCheckpointMarker;
+import org.apache.samza.container.TaskName;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+
+public class TaskStorageCommitManager {
+
+  private static final Logger LOG = LoggerFactory.getLogger(TaskStorageCommitManager.class);
+  private static final long COMMIT_TIMEOUT_MS = 30000;
+  private final TaskBackupManager storageBackupManager;
+
+  public TaskStorageCommitManager(TaskBackupManager storageBackupManager) {
+    this.storageBackupManager = storageBackupManager;
+  }
+
+  /**
+   * Commits the local state on the remote backup implementation
+   * @return Committed StoreName to StateCheckpointMarker mappings of the committed SSPs
+   */
+  public Map<String, List<StateCheckpointMarker>> commit(TaskName taskName, CheckpointId checkpointId) {
+    Map<String, StateCheckpointMarker> snapshot = storageBackupManager.snapshot(checkpointId);
+    CompletableFuture<Map<String, StateCheckpointMarker>>
+        uploadFuture = storageBackupManager.upload(checkpointId, snapshot);
+
+    try {
+      // TODO: Make async with andThen and add thread management for concurrency
+      Map<String, StateCheckpointMarker> uploadMap = uploadFuture.get(COMMIT_TIMEOUT_MS, TimeUnit.MILLISECONDS);
+      if (uploadMap != null) {
+        LOG.trace("Checkpointing stores for taskName: {}} with checkpoint id: {}", taskName, checkpointId);

Review comment:
       Clarify log message for what this step is doing ("persisting ..."  instead of more general "checkpointing ...")

##########
File path: samza-kafka/src/main/java/org/apache/samza/checkpoint/kafka/KafkaStateCheckpointMarker.java
##########
@@ -0,0 +1,118 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.samza.checkpoint.kafka;
+
+
+import java.util.HashMap;
+import java.util.Map;
+import java.util.Objects;
+import org.apache.commons.lang3.StringUtils;
+import org.apache.samza.Partition;
+import org.apache.samza.annotation.InterfaceStability;
+import org.apache.samza.checkpoint.StateCheckpointMarker;
+import org.apache.samza.storage.KafkaChangelogStateBackendFactory;
+import org.apache.samza.system.SystemStreamPartition;
+import scala.Option;
+
+
+@InterfaceStability.Unstable
+public class KafkaStateCheckpointMarker implements StateCheckpointMarker {
+  private static final short PROTO_VERSION = 1;
+  private static final String FACTORY_NAME = KafkaChangelogStateBackendFactory.class.getName();
+  private static final String SEPARATOR = ";";
+
+  // One offset per SSP
+  private final SystemStreamPartition ssp;
+  private final String changelogOffset;
+
+  public KafkaStateCheckpointMarker(SystemStreamPartition ssp, String changelogOffset) {
+    this.ssp = ssp;
+    this.changelogOffset = changelogOffset;
+  }
+
+  public static short getProtocolVersion() {
+    return PROTO_VERSION;
+  }
+
+  public SystemStreamPartition getSsp() {
+    return ssp;
+  }
+
+  public String getChangelogOffset() {
+    return changelogOffset;
+  }
+
+  @Override
+  public String getFactoryName() {
+    return FACTORY_NAME;
+  }
+
+  @Override
+  public boolean equals(Object o) {

Review comment:
       Minor: Move equals/hashcode/toString below other public methods.

##########
File path: samza-api/src/main/java/org/apache/samza/checkpoint/Checkpoint.java
##########
@@ -30,22 +33,53 @@
  * of restarting a failed container within a running job.
  */
 public class Checkpoint {
-  private final Map<SystemStreamPartition, String> offsets;
+  private static final short PROTOCOL_VERSION = 1;
+  private final CheckpointId checkpointId;
+  private final Map<SystemStreamPartition, String> inputOffsets;
+  private final Map<String, List<StateCheckpointMarker>> stateCheckpoints;
 
   /**
-   * Constructs a new checkpoint based off a map of Samza stream offsets.
-   * @param offsets Map of Samza streams to their current offset.
+   * Constructs a new checkpoint based off a map of Samza stream offsets, using a default checkpoint id
+   * @param inputOffsets Map of Samza streams to their current offset.
    */
-  public Checkpoint(Map<SystemStreamPartition, String> offsets) {
-    this.offsets = offsets;
+  public Checkpoint(Map<SystemStreamPartition, String>  inputOffsets) {
+    this(CheckpointId.getPlaceholderCheckpointId(), inputOffsets, new HashMap<>());

Review comment:
       Minor: Use Collections.EMPTY_MAP or an empty map so that this cannot be accidentally modified.

##########
File path: samza-kafka/src/main/java/org/apache/samza/storage/KafkaChangelogStateBackendFactory.java
##########
@@ -0,0 +1,127 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.samza.storage;
+
+import java.io.File;
+import java.time.Duration;
+import java.util.Collections;
+import java.util.Map;
+import java.util.Set;
+import org.apache.samza.SamzaException;
+import org.apache.samza.config.Config;
+import org.apache.samza.config.JobConfig;
+import org.apache.samza.config.StorageConfig;
+import org.apache.samza.config.TaskConfig;
+import org.apache.samza.container.SamzaContainer;
+import org.apache.samza.job.model.ContainerModel;
+import org.apache.samza.job.model.JobModel;
+import org.apache.samza.job.model.TaskModel;
+import org.apache.samza.system.SSPMetadataCache;
+import org.apache.samza.system.StreamMetadataCache;
+import org.apache.samza.system.SystemAdmins;
+import org.apache.samza.system.SystemStream;
+import org.apache.samza.system.SystemStreamPartition;
+import org.apache.samza.util.Clock;
+
+
+public class KafkaChangelogStateBackendFactory implements StateBackendFactory {
+  private static StreamMetadataCache streamCache;
+  private static SSPMetadataCache sspCache;
+
+  private static StreamMetadataCache getStreamCache(SystemAdmins admins, Clock clock) {

Review comment:
       Clarify in javadocs or inline comment at call site what we're doing here (sharing metadata cache across all state backend instances)

##########
File path: samza-api/src/main/java/org/apache/samza/storage/TaskBackupManager.java
##########
@@ -0,0 +1,81 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.samza.storage;
+
+import java.util.Map;
+import java.util.concurrent.CompletableFuture;
+
+import org.apache.samza.checkpoint.CheckpointId;
+import org.apache.samza.checkpoint.StateCheckpointMarker;
+
+/**
+ * <p>
+ * TaskBackupManager is the interface that must be implemented for
+ * any remote system that Samza persists its state to. The interface will be
+ * evoked in the following way:
+ * </p>
+ *
+ * <ul>
+ *   <li>Snapshot will be called before Upload.</li>
+ *   <li>persistToFilesystem will be called after Upload is completed</li>
+ *   <li>Cleanup is only called after Upload and persistToFilesystem has successfully completed</li>
+ * </ul>
+ */
+public interface TaskBackupManager {
+
+  /**
+   * Initiates the TaskBackupManagerIntance
+   */
+  default void init() {}

Review comment:
       Do we need to pass in the Context classes to init?

##########
File path: samza-api/src/main/java/org/apache/samza/checkpoint/StateCheckpointMarkerSerde.java
##########
@@ -0,0 +1,60 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.samza.checkpoint;
+
+import org.apache.commons.lang3.StringUtils;
+import org.apache.samza.SamzaException;
+
+
+public class StateCheckpointMarkerSerde<T extends StateCheckpointMarker> {
+  private static final short PROTOCOL_VERSION = 1;
+  private static final String SCM_SEPARATOR = ":";
+
+  public String serialize(T payload, StateCheckpointPayloadSerde<T> serde) {
+    StringBuilder builder = new StringBuilder();
+    builder.append(PROTOCOL_VERSION);
+    builder.append(SCM_SEPARATOR);
+    builder.append(serde.getClass().getName());
+    builder.append(SCM_SEPARATOR);
+    builder.append(serde.serialize(payload));
+
+    return builder.toString();
+  }
+
+  public T deserializePayload(String serializedSCM) {
+    if (StringUtils.isBlank(serializedSCM)) {
+      throw new IllegalArgumentException("Invalid remote store checkpoint message: " + serializedSCM);
+    }
+    String[] parts = serializedSCM.split(SCM_SEPARATOR);
+    if (parts.length != 3) {
+      throw new IllegalArgumentException("Invalid RemoteStore Metadata offset: " + serializedSCM);

Review comment:
       Also let's make sure the call site logs/includes the source task + store name information when this exception happens.




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org