You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@heron.apache.org by nl...@apache.org on 2018/07/07 18:23:19 UTC

[incubator-heron] branch master updated: Refactor StatefulStorage API (#2891)

This is an automated email from the ASF dual-hosted git repository.

nlu90 pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/incubator-heron.git


The following commit(s) were added to refs/heads/master by this push:
     new 581344e  Refactor StatefulStorage API (#2891)
581344e is described below

commit 581344ea42033a2f5e826d487c355061cee58241
Author: Ning Wang <nw...@twitter.com>
AuthorDate: Sat Jul 7 11:23:16 2018 -0700

    Refactor StatefulStorage API (#2891)
    
    * Refactor StatefulStorage
    
    * fix test
    
    * fix CheckpointManagerServerTest
    
    * component metadata is optional
    
    * Update names to be more consistent
    
    * remove data_version and partition_id from protobuf for now
    
    * Remove metadata field from save/get checkpoint requests
    
    * More name and comment changes
    
    * revert refactor changes in CheckpointManagerServer
    
    * Rename InstanceStateCheckpointPartition to InstanceStateCheckpoint
---
 .../apache/heron/ckptmgr/CheckpointManager.java    |  2 +-
 .../heron/ckptmgr/CheckpointManagerServer.java     | 69 ++++++++++++--------
 .../heron/ckptmgr/CheckpointManagerServerTest.java | 32 ++++++----
 heron/proto/ckptmgr.proto                          | 43 +++++++++----
 .../heron/spi/statefulstorage/Checkpoint.java      | 38 ++---------
 .../{Checkpoint.java => CheckpointInfo.java}       | 48 ++++++--------
 .../spi/statefulstorage/CheckpointMetadata.java    | 47 ++++++++++++++
 .../spi/statefulstorage/IStatefulStorage.java      | 74 +++++++++++++++-------
 .../heron/statefulstorage/dlog/DlogStorage.java    | 48 +++++++++-----
 .../heron/statefulstorage/hdfs/HDFSStorage.java    | 66 +++++++++++--------
 .../localfs/LocalFileSystemStorage.java            | 71 ++++++++++++---------
 .../statefulstorage/dlog/DlogStorageTest.java      | 43 +++++--------
 .../statefulstorage/hdfs/HDFSStorageTest.java      | 25 ++++----
 .../localfs/LocalFileSystemStorageTest.java        | 16 +++--
 heron/stmgr/src/cpp/manager/ckptmgr-client.cpp     |  3 +-
 heron/stmgr/src/cpp/manager/ckptmgr-client.h       |  3 +-
 heron/stmgr/src/cpp/manager/instance-server.h      |  2 +-
 heron/stmgr/src/cpp/manager/stateful-restorer.cpp  |  4 +-
 heron/stmgr/src/cpp/manager/stmgr.h                |  5 +-
 .../tests/cpp/server/dummy_ckptmgr_client.cpp      |  9 +--
 .../stmgr/tests/cpp/server/dummy_instance_server.h |  2 +-
 21 files changed, 385 insertions(+), 265 deletions(-)

diff --git a/heron/ckptmgr/src/java/org/apache/heron/ckptmgr/CheckpointManager.java b/heron/ckptmgr/src/java/org/apache/heron/ckptmgr/CheckpointManager.java
index 5c590ff..4798386 100644
--- a/heron/ckptmgr/src/java/org/apache/heron/ckptmgr/CheckpointManager.java
+++ b/heron/ckptmgr/src/java/org/apache/heron/ckptmgr/CheckpointManager.java
@@ -193,7 +193,7 @@ public class CheckpointManager {
     }
 
     try {
-      statefulStorage.init(
+      statefulStorage.init(topologyName,
           Collections.unmodifiableMap(checkpointManagerConfig.getStatefulStorageConfig()));
     } catch (StatefulStorageException e) {
       throw new CheckpointManagerException(classname + " init threw exception", e);
diff --git a/heron/ckptmgr/src/java/org/apache/heron/ckptmgr/CheckpointManagerServer.java b/heron/ckptmgr/src/java/org/apache/heron/ckptmgr/CheckpointManagerServer.java
index 46ce36b..4b4654a 100644
--- a/heron/ckptmgr/src/java/org/apache/heron/ckptmgr/CheckpointManagerServer.java
+++ b/heron/ckptmgr/src/java/org/apache/heron/ckptmgr/CheckpointManagerServer.java
@@ -34,6 +34,7 @@ import org.apache.heron.common.network.REQID;
 import org.apache.heron.proto.ckptmgr.CheckpointManager;
 import org.apache.heron.proto.system.Common;
 import org.apache.heron.spi.statefulstorage.Checkpoint;
+import org.apache.heron.spi.statefulstorage.CheckpointInfo;
 import org.apache.heron.spi.statefulstorage.IStatefulStorage;
 import org.apache.heron.spi.statefulstorage.StatefulStorageException;
 
@@ -48,9 +49,14 @@ public class CheckpointManagerServer extends HeronServer {
   private SocketChannel connection;
 
   public CheckpointManagerServer(
-      String topologyName, String topologyId, String checkpointMgrId,
-      IStatefulStorage statefulStorage, NIOLooper looper, String host,
-      int port, HeronSocketOptions options) {
+      String topologyName,
+      String topologyId,
+      String checkpointMgrId,
+      IStatefulStorage statefulStorage,
+      NIOLooper looper,
+      String host,
+      int port,
+      HeronSocketOptions options) {
     super(looper, host, port, options);
 
     this.topologyName = topologyName;
@@ -115,8 +121,7 @@ public class CheckpointManagerServer extends HeronServer {
     String errorMessage = "";
 
     try {
-      statefulStorage.dispose(topologyName,
-                                 request.getOldestCheckpointPreserved(), deleteAll);
+      statefulStorage.dispose(request.getOldestCheckpointPreserved(), deleteAll);
       LOG.info("Dispose checkpoint successful");
     } catch (StatefulStorageException e) {
       errorMessage = String.format("Request to dispose checkpoint failed for oldest Checkpoint "
@@ -203,25 +208,30 @@ public class CheckpointManagerServer extends HeronServer {
       SocketChannel channel,
       CheckpointManager.SaveInstanceStateRequest request
   ) {
-    Checkpoint checkpoint = new Checkpoint(topologyName, request.getInstance(),
-                                           request.getCheckpoint());
+    CheckpointInfo info =
+        new CheckpointInfo(request.getCheckpoint().getCheckpointId(),
+                           request.getInstance());
+    Checkpoint checkpoint = new Checkpoint(request.getCheckpoint());
+
     LOG.info(String.format("Got a save checkpoint request for checkpointId %s "
-                           + " component %s instance %s on connection %s",
-                           checkpoint.getCheckpointId(), checkpoint.getComponent(),
-                           checkpoint.getInstance(), channel.socket().getRemoteSocketAddress()));
+                           + " component %s instanceId %s on connection %s",
+                           info.getCheckpointId(),
+                           info.getComponent(),
+                           info.getInstanceId(),
+                           channel.socket().getRemoteSocketAddress()));
 
     Common.StatusCode statusCode = Common.StatusCode.OK;
     String errorMessage = "";
     try {
-      statefulStorage.store(checkpoint);
-      LOG.info(String.format("Saved checkpoint for checkpointId %s compnent %s instance %s",
-                             checkpoint.getCheckpointId(), checkpoint.getComponent(),
-                             checkpoint.getInstance()));
+      statefulStorage.storeCheckpoint(info, checkpoint);
+      LOG.info(String.format("Saved checkpoint for checkpointId %s compnent %s instanceId %s",
+                             info.getCheckpointId(), info.getComponent(),
+                             info.getInstanceId()));
     } catch (StatefulStorageException e) {
       errorMessage = String.format("Save checkpoint not successful for checkpointId "
-                                   + "%s component %s instance %s",
-                                   checkpoint.getCheckpointId(), checkpoint.getComponent(),
-                                   checkpoint.getInstance());
+                                   + "%s component %s instanceId %s",
+                                   info.getCheckpointId(), info.getComponent(),
+                                   info.getInstanceId());
       statusCode = Common.StatusCode.NOTOK;
       LOG.log(Level.WARNING, errorMessage, e);
     }
@@ -241,11 +251,12 @@ public class CheckpointManagerServer extends HeronServer {
       SocketChannel channel,
       CheckpointManager.GetInstanceStateRequest request
   ) {
+    CheckpointInfo info = new CheckpointInfo(request.getCheckpointId(), request.getInstance());
     LOG.info(String.format("Got a get checkpoint request for checkpointId %s "
-                           + " component %s taskId %d on connection %s",
-                           request.getCheckpointId(),
-                           request.getInstance().getInfo().getComponentName(),
-                           request.getInstance().getInfo().getTaskId(),
+                           + " component %s instanceId %d on connection %s",
+                           info.getCheckpointId(),
+                           info.getComponent(),
+                           info.getInstanceId(),
                            channel.socket().getRemoteSocketAddress()));
 
     CheckpointManager.GetInstanceStateResponse.Builder responseBuilder =
@@ -265,18 +276,20 @@ public class CheckpointManagerServer extends HeronServer {
       responseBuilder.setCheckpoint(dummyState);
     } else {
       try {
-        Checkpoint checkpoint = statefulStorage.restore(topologyName, request.getCheckpointId(),
-                                                request.getInstance());
+        Checkpoint checkpoint = statefulStorage.restoreCheckpoint(info);
         LOG.info(String.format("Get checkpoint successful for checkpointId %s "
-                               + "component %s taskId %d", checkpoint.getCheckpointId(),
-                               checkpoint.getComponent(), checkpoint.getTaskId()));
+                               + "component %s instanceId %d",
+                               info.getCheckpointId(),
+                               info.getComponent(),
+                               info.getInstanceId()));
         // Set the checkpoint-state in response
         responseBuilder.setCheckpoint(checkpoint.getCheckpoint());
       } catch (StatefulStorageException e) {
         errorMessage = String.format("Get checkpoint not successful for checkpointId %s "
-                                     + "component %s taskId %d", request.getCheckpointId(),
-                                     request.getInstance().getInfo().getComponentName(),
-                                     request.getInstance().getInfo().getTaskId());
+                                     + "component %s instanceId %d",
+                                     info.getCheckpointId(),
+                                     info.getComponent(),
+                                     info.getInstanceId());
         LOG.log(Level.WARNING, errorMessage, e);
         statusCode = Common.StatusCode.NOTOK;
       }
diff --git a/heron/ckptmgr/tests/java/org/apache/heron/ckptmgr/CheckpointManagerServerTest.java b/heron/ckptmgr/tests/java/org/apache/heron/ckptmgr/CheckpointManagerServerTest.java
index 1ce6bff..c6bffc7 100644
--- a/heron/ckptmgr/tests/java/org/apache/heron/ckptmgr/CheckpointManagerServerTest.java
+++ b/heron/ckptmgr/tests/java/org/apache/heron/ckptmgr/CheckpointManagerServerTest.java
@@ -38,6 +38,7 @@ import org.apache.heron.common.testhelpers.HeronServerTester;
 import org.apache.heron.proto.ckptmgr.CheckpointManager;
 import org.apache.heron.proto.system.PhysicalPlans;
 import org.apache.heron.spi.statefulstorage.Checkpoint;
+import org.apache.heron.spi.statefulstorage.CheckpointInfo;
 import org.apache.heron.spi.statefulstorage.IStatefulStorage;
 
 import static org.apache.heron.common.testhelpers.HeronServerTester.RESPONSE_RECEIVED_TIMEOUT;
@@ -56,7 +57,8 @@ public class CheckpointManagerServerTest {
   private static final String CHECKPOINT_ID = "checkpoint_id";
   private static final String CHECKPOINT_MANAGER_ID = "ckptmgr_id";
 
-  private static CheckpointManager.InstanceStateCheckpoint instanceStateCheckpoint;
+  private static CheckpointManager.InstanceStateCheckpoint checkpointPartition;
+  private static CheckpointManager.CheckpointComponentMetadata checkpointComponentMetadata;
   private static CheckpointManager.SaveInstanceStateRequest saveInstanceStateRequest;
   private static CheckpointManager.GetInstanceStateRequest getInstanceStateRequest;
   private static CheckpointManager.CleanStatefulCheckpointRequest cleanStatefulCheckpointRequest;
@@ -90,14 +92,19 @@ public class CheckpointManagerServerTest {
         .setInfo(info)
         .build();
 
-    instanceStateCheckpoint = CheckpointManager.InstanceStateCheckpoint.newBuilder()
-            .setCheckpointId(CHECKPOINT_ID)
-            .setState(ByteString.copyFrom(BYTES))
-            .build();
+    checkpointPartition = CheckpointManager.InstanceStateCheckpoint.newBuilder()
+        .setCheckpointId(CHECKPOINT_ID)
+        .setState(ByteString.copyFrom(BYTES))
+        .build();
+
+    checkpointComponentMetadata = CheckpointManager.CheckpointComponentMetadata.newBuilder()
+        .setComponentName(COMPONENT_NAME)
+        .setParallelism(2)
+        .build();
 
     saveInstanceStateRequest = CheckpointManager.SaveInstanceStateRequest.newBuilder()
         .setInstance(instance)
-        .setCheckpoint(instanceStateCheckpoint)
+        .setCheckpoint(checkpointPartition)
         .build();
 
     getInstanceStateRequest = CheckpointManager.GetInstanceStateRequest.newBuilder()
@@ -154,7 +161,8 @@ public class CheckpointManagerServerTest {
               @Override
               public void handleResponse(HeronClient client, StatusCode status,
                                          Object ctx, Message response) throws Exception {
-                verify(statefulStorage).store(any(Checkpoint.class));
+                verify(statefulStorage).storeCheckpoint(
+                    any(CheckpointInfo.class), any(Checkpoint.class));
                 assertEquals(CHECKPOINT_ID,
                     ((CheckpointManager.SaveInstanceStateResponse) response).getCheckpointId());
                 assertEquals(instance,
@@ -166,8 +174,10 @@ public class CheckpointManagerServerTest {
 
   @Test
   public void testGetInstanceState() throws Exception {
-    final Checkpoint checkpoint = new Checkpoint(TOPOLOGY_NAME, instance, instanceStateCheckpoint);
-    when(statefulStorage.restore(TOPOLOGY_NAME, CHECKPOINT_ID, instance)).thenReturn(checkpoint);
+    final CheckpointInfo info = new CheckpointInfo(CHECKPOINT_ID, instance);
+    final Checkpoint checkpoint = new Checkpoint(checkpointPartition);
+    when(statefulStorage.restoreCheckpoint(any(CheckpointInfo.class)))
+        .thenReturn(checkpoint);
 
     runTest(TestRequestHandler.RequestType.GET_INSTANCE_STATE,
         new HeronServerTester.SuccessResponseHandler(
@@ -176,7 +186,7 @@ public class CheckpointManagerServerTest {
               @Override
               public void handleResponse(HeronClient client, StatusCode status,
                                          Object ctx, Message response) throws Exception {
-                verify(statefulStorage).restore(TOPOLOGY_NAME, CHECKPOINT_ID, instance);
+                verify(statefulStorage).restoreCheckpoint(info);
                 assertEquals(checkpoint.getCheckpoint(),
                     ((CheckpointManager.GetInstanceStateResponse) response).getCheckpoint());
               }
@@ -193,7 +203,7 @@ public class CheckpointManagerServerTest {
               @Override
               public void handleResponse(HeronClient client, StatusCode status,
                                          Object ctx, Message response) throws Exception {
-                verify(statefulStorage).dispose(anyString(), anyString(), anyBoolean());
+                verify(statefulStorage).dispose(anyString(), anyBoolean());
               }
             })
     );
diff --git a/heron/proto/ckptmgr.proto b/heron/proto/ckptmgr.proto
index daf5556..68935cc 100644
--- a/heron/proto/ckptmgr.proto
+++ b/heron/proto/ckptmgr.proto
@@ -82,7 +82,9 @@ message StatefulConsistentCheckpoints {
   repeated StatefulConsistentCheckpoint consistent_checkpoints = 1;
 }
 
-// tmaster -> stmgr messages
+/*
+ * tmaster <-> stmgr messages
+ */
 
 // Message sent to stmgrs by the tmaster to initiate checkpointing
 message StartStatefulCheckpoint {
@@ -140,7 +142,9 @@ message StartStmgrStatefulProcessing {
   required string checkpoint_id = 1;
 }
 
-// tmaster -> ckptmgr messages
+/*
+ * tmaster -> ckptmgr messages
+ */
 
 // This is the message that a tmaster sends
 // when it wants to register itself
@@ -171,7 +175,23 @@ message CleanStatefulCheckpointResponse {
   repeated string cleaned_checkpoint_ids = 2;
 }
 
-// stmgr -> ckptmgr messages
+/*
+ * stmgr -> ckptmgr messages
+ */
+
+// This message encapsulates the info associated with
+// state of an instance/partition
+message InstanceStateCheckpoint {
+  required string checkpoint_id = 1;
+  required bytes state = 2;
+}
+
+// This message encapsulates the info associated with
+// checkpoint metadata of a component
+message CheckpointComponentMetadata {
+  required string component_name = 1;
+  required int32 parallelism = 2;
+}
 
 // This is the message that a stmgr sends
 // when it wants to register itself with
@@ -194,6 +214,8 @@ message RegisterStMgrResponse {
 message SaveInstanceStateRequest {
   // Information about the instance whose state this is
   required heron.proto.system.Instance instance = 1;
+  // TODO(nwang): currently repartition is not supported and each request
+  //     contains a single partition.
   required InstanceStateCheckpoint checkpoint = 2;
 }
 
@@ -221,7 +243,9 @@ message GetInstanceStateResponse {
   optional InstanceStateCheckpoint checkpoint = 4;
 }
 
-// stmgr -> Instance messages
+/*
+ * stmgr -> Instance messages
+ */
 
 // This is the message that the stmgr sends to its
 // local tasks to begin initiating checkpoint
@@ -229,13 +253,6 @@ message InitiateStatefulCheckpoint {
   required string checkpoint_id = 1;
 }
 
-// This proto encapsulates the info associated with
-// state of an instance
-message InstanceStateCheckpoint {
-  required string checkpoint_id = 1;
-  required bytes state = 2;
-}
-
 // This is the message that the instance sends
 // when it wants stmgr to store its state
 message StoreInstanceStateCheckpoint {
@@ -262,7 +279,9 @@ message StartInstanceStatefulProcessing {
   required string checkpoint_id = 1;
 }
 
-// stmgr -> stmgr messages
+/*
+ * stmgr -> stmgr messages
+ */
 
 // Once stmgr receives StoreInstanceStateCheckpoint from an instance,
 // it sends this message to all of that instance's 
diff --git a/heron/spi/src/java/org/apache/heron/spi/statefulstorage/Checkpoint.java b/heron/spi/src/java/org/apache/heron/spi/statefulstorage/Checkpoint.java
index 5796d15..029f4f6 100644
--- a/heron/spi/src/java/org/apache/heron/spi/statefulstorage/Checkpoint.java
+++ b/heron/spi/src/java/org/apache/heron/spi/statefulstorage/Checkpoint.java
@@ -20,51 +20,27 @@
 package org.apache.heron.spi.statefulstorage;
 
 import org.apache.heron.proto.ckptmgr.CheckpointManager;
-import org.apache.heron.proto.system.PhysicalPlans;
 
+/**
+ * The checkpoint data from an instance. It contains the instance information and the checkpoint
+ * data for the instance.
+ * TODO(nwang): Currently each instance has only one partition.
+ */
 public class Checkpoint {
-  private final String topologyName;
-  private final String checkpointId;
-  private final PhysicalPlans.Instance instanceInfo;
-
   private CheckpointManager.InstanceStateCheckpoint checkpoint;
   private int nBytes;
 
-  public Checkpoint(String topologyName, PhysicalPlans.Instance instanceInfo,
-                    CheckpointManager.InstanceStateCheckpoint checkpoint) {
-    this.topologyName = topologyName;
-    this.checkpointId = checkpoint.getCheckpointId();
-    this.instanceInfo = instanceInfo;
+  public Checkpoint(CheckpointManager.InstanceStateCheckpoint checkpoint) {
     this.checkpoint = checkpoint;
     this.nBytes = checkpoint.getSerializedSize();
   }
 
-  public String getTopologyName() {
-    return topologyName;
-  }
-
-  public String getCheckpointId() {
-    return checkpointId;
-  }
-
-  public String getComponent() {
-    return instanceInfo.getInfo().getComponentName();
-  }
-
-  public String getInstance() {
-    return instanceInfo.getInstanceId();
-  }
-
-  public int getTaskId() {
-    return instanceInfo.getInfo().getTaskId();
-  }
-
   public CheckpointManager.InstanceStateCheckpoint getCheckpoint() {
     return this.checkpoint;
   }
 
   @Override
   public String toString() {
-    return String.format("%s %s %s %s", topologyName, checkpointId, getComponent(), getInstance());
+    return String.format("Checkpoint(%d bytes)", nBytes);
   }
 }
diff --git a/heron/spi/src/java/org/apache/heron/spi/statefulstorage/Checkpoint.java b/heron/spi/src/java/org/apache/heron/spi/statefulstorage/CheckpointInfo.java
similarity index 50%
copy from heron/spi/src/java/org/apache/heron/spi/statefulstorage/Checkpoint.java
copy to heron/spi/src/java/org/apache/heron/spi/statefulstorage/CheckpointInfo.java
index 5796d15..38c6902 100644
--- a/heron/spi/src/java/org/apache/heron/spi/statefulstorage/Checkpoint.java
+++ b/heron/spi/src/java/org/apache/heron/spi/statefulstorage/CheckpointInfo.java
@@ -19,28 +19,25 @@
 
 package org.apache.heron.spi.statefulstorage;
 
-import org.apache.heron.proto.ckptmgr.CheckpointManager;
 import org.apache.heron.proto.system.PhysicalPlans;
 
-public class Checkpoint {
-  private final String topologyName;
+/**
+ * The information for one checkpoint blob. It can be used to reference checkpoint and
+ * metadata (instance id is ignored).
+ */
+public class CheckpointInfo {
+  // The checkpoint ID under the topology.
   private final String checkpointId;
-  private final PhysicalPlans.Instance instanceInfo;
+  // The name of the component.
+  private final String componentName;
+  // TODO(nwang): Currently only support one partition per instance.
+  private final int instanceId;
 
-  private CheckpointManager.InstanceStateCheckpoint checkpoint;
-  private int nBytes;
-
-  public Checkpoint(String topologyName, PhysicalPlans.Instance instanceInfo,
-                    CheckpointManager.InstanceStateCheckpoint checkpoint) {
-    this.topologyName = topologyName;
-    this.checkpointId = checkpoint.getCheckpointId();
-    this.instanceInfo = instanceInfo;
-    this.checkpoint = checkpoint;
-    this.nBytes = checkpoint.getSerializedSize();
-  }
+  public CheckpointInfo(String checkpointId, PhysicalPlans.Instance instance) {
 
-  public String getTopologyName() {
-    return topologyName;
+    this.checkpointId = checkpointId;
+    this.componentName = instance.getInfo().getComponentName();
+    this.instanceId = instance.getInfo().getComponentIndex();
   }
 
   public String getCheckpointId() {
@@ -48,23 +45,16 @@ public class Checkpoint {
   }
 
   public String getComponent() {
-    return instanceInfo.getInfo().getComponentName();
-  }
-
-  public String getInstance() {
-    return instanceInfo.getInstanceId();
-  }
-
-  public int getTaskId() {
-    return instanceInfo.getInfo().getTaskId();
+    return componentName;
   }
 
-  public CheckpointManager.InstanceStateCheckpoint getCheckpoint() {
-    return this.checkpoint;
+  public int getInstanceId() {
+    return instanceId;
   }
 
   @Override
   public String toString() {
-    return String.format("%s %s %s %s", topologyName, checkpointId, getComponent(), getInstance());
+    return String.format("CheckpointInfo(%s %s %d)",
+                         checkpointId, componentName, instanceId);
   }
 }
diff --git a/heron/spi/src/java/org/apache/heron/spi/statefulstorage/CheckpointMetadata.java b/heron/spi/src/java/org/apache/heron/spi/statefulstorage/CheckpointMetadata.java
new file mode 100644
index 0000000..4888ebf
--- /dev/null
+++ b/heron/spi/src/java/org/apache/heron/spi/statefulstorage/CheckpointMetadata.java
@@ -0,0 +1,47 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.heron.spi.statefulstorage;
+
+import org.apache.heron.proto.ckptmgr.CheckpointManager.CheckpointComponentMetadata;
+
+/**
+ * The checkpoint metadata for a component.
+ */
+public class CheckpointMetadata {
+  private CheckpointComponentMetadata metadata;
+  private int nBytes;
+
+  public CheckpointMetadata(String componentName, int parallelism) {
+    this.metadata = CheckpointComponentMetadata.newBuilder()
+      .setComponentName(componentName)
+      .setParallelism(parallelism)
+      .build();
+    this.nBytes = this.metadata.getSerializedSize();
+  }
+
+  public CheckpointComponentMetadata getMetadata() {
+    return metadata;
+  }
+
+  @Override
+  public String toString() {
+    return String.format("CheckpointMetadata(%d bytes)", nBytes);
+  }
+}
diff --git a/heron/spi/src/java/org/apache/heron/spi/statefulstorage/IStatefulStorage.java b/heron/spi/src/java/org/apache/heron/spi/statefulstorage/IStatefulStorage.java
index bd0965f..0698bf4 100644
--- a/heron/spi/src/java/org/apache/heron/spi/statefulstorage/IStatefulStorage.java
+++ b/heron/spi/src/java/org/apache/heron/spi/statefulstorage/IStatefulStorage.java
@@ -21,37 +21,67 @@ package org.apache.heron.spi.statefulstorage;
 
 import java.util.Map;
 
-import org.apache.heron.proto.system.PhysicalPlans;
-
+/**
+ * The interface of all storage classes for checkpoints.
+ * For each checkpoint, two types of data are stored:
+ * - Component Meta Data (one per component).
+ * - Instance Checkpoint Data (one per instance or patition)
+ * Each Stateful Storage implementation needs to handle them accordingly.
+ */
 public interface IStatefulStorage {
   /**
    * Initialize the Stateful Storage
-   *
+   * @param topologyName The name of the topology.
    * @param conf An unmodifiableMap containing basic configuration
-   * Attempts to modify the returned map,
-   * whether direct or via its collection views, result in an UnsupportedOperationException.
    */
-  void init(Map<String, Object> conf) throws StatefulStorageException;
+  void init(String topologyName, final Map<String, Object> conf)
+      throws StatefulStorageException;
 
   /**
    * Closes the Stateful Storage
    */
   void close();
 
-  // Store the checkpoint
-  void store(Checkpoint checkpoint) throws StatefulStorageException;
-
-  // Retrieve the checkpoint
-  Checkpoint restore(String topologyName, String checkpointId,
-                     PhysicalPlans.Instance instanceInfo) throws StatefulStorageException;
-
-  // TODO(mfu): We should refactor all interfaces in IStatefulStorage,
-  // TODO(mfu): instead providing Class Checkpoint, we should provide an Context class,
-  // TODO(mfu): It should:
-  // TODO(mfu): 1. Provide meta data access, like topologyName
-  // TODO(mfu): 2. Provide utils method to parse the protobuf object, like getTaskId()
-  // TODO(mfu): 3. Common methods, like getCheckpointDir()
-  // Dispose the checkpoint
-  void dispose(String topologyName, String oldestCheckpointId, boolean deleteAll)
-                  throws StatefulStorageException;
+  /**
+   * Store instance checkpoint.
+   * @param info The information (reference key) for the checkpoint partition.
+   * @param checkpoint The checkpoint data.
+   */
+  void storeCheckpoint(final CheckpointInfo info, final Checkpoint checkpoint)
+      throws StatefulStorageException;
+
+  /**
+   * Retrieve instance checkpoint.
+   * @param info The information (reference key) for the checkpoint partition.
+   * @return The checkpoint data from the specified blob id.
+   */
+  Checkpoint restoreCheckpoint(final CheckpointInfo info)
+        throws StatefulStorageException;
+
+  /**
+   * Store medata data for component. Ideally in distributed storages this function should only
+   * be called once for each component. In local storages, the function should be called by
+   * every instance and the data should be stored with the checkpoint data for each partition.
+   * @param info The information (reference key) for the checkpoint partition.
+   * @param metadata The checkpoint metadata from a component.
+   */
+  void storeComponentMetaData(final CheckpointInfo info, final CheckpointMetadata metadata)
+      throws StatefulStorageException;
+
+  /**
+   * Retrieve component metadata.
+   * @param info The information (reference key) for the checkpoint partition.
+   * @return The checkpoint metadata for the component.
+   */
+  CheckpointMetadata restoreComponentMetadata(final CheckpointInfo info)
+      throws StatefulStorageException;
+
+  /**
+   * Dispose checkpoints.
+   * @param oldestCheckpointPreserved The oldest checkpoint id to be preserved. All checkpoints
+   *        before this id should be deleted.
+   * @param deleteAll Ignore the checkpoint Id and delete all checkpoints.
+   */
+  void dispose(String oldestCheckpointPreserved, boolean deleteAll)
+      throws StatefulStorageException;
 }
diff --git a/heron/statefulstorages/src/java/org/apache/heron/statefulstorage/dlog/DlogStorage.java b/heron/statefulstorages/src/java/org/apache/heron/statefulstorage/dlog/DlogStorage.java
index a0d3247..9999f36 100644
--- a/heron/statefulstorages/src/java/org/apache/heron/statefulstorage/dlog/DlogStorage.java
+++ b/heron/statefulstorages/src/java/org/apache/heron/statefulstorage/dlog/DlogStorage.java
@@ -37,8 +37,9 @@ import org.apache.heron.common.basics.SysUtils;
 import org.apache.heron.dlog.DLInputStream;
 import org.apache.heron.dlog.DLOutputStream;
 import org.apache.heron.proto.ckptmgr.CheckpointManager;
-import org.apache.heron.proto.system.PhysicalPlans;
 import org.apache.heron.spi.statefulstorage.Checkpoint;
+import org.apache.heron.spi.statefulstorage.CheckpointInfo;
+import org.apache.heron.spi.statefulstorage.CheckpointMetadata;
 import org.apache.heron.spi.statefulstorage.IStatefulStorage;
 import org.apache.heron.spi.statefulstorage.StatefulStorageException;
 
@@ -52,6 +53,7 @@ public class DlogStorage implements IStatefulStorage {
   private String checkpointNamespaceUriStr;
   private URI checkpointNamespaceUri;
   private int numReplicas = 3;
+  private String topologyName;
 
   // the namespace instance
   private final Supplier<NamespaceBuilder> nsBuilderSupplier;
@@ -66,11 +68,13 @@ public class DlogStorage implements IStatefulStorage {
   }
 
   @Override
-  public void init(Map<String, Object> conf) throws StatefulStorageException {
+  public void init(String topology, Map<String, Object> conf)
+      throws StatefulStorageException {
 
     LOG.info("Initializing ... Config: " + conf.toString());
     LOG.info("Class path: " + System.getProperty("java.class.path"));
 
+    this.topologyName = topology;
     checkpointNamespaceUriStr = (String) conf.get(NS_URI_KEY);
     checkpointNamespaceUri = URI.create(checkpointNamespaceUriStr);
     Integer numReplicasValue = (Integer) conf.get(NUM_REPLICAS_KEY);
@@ -128,12 +132,13 @@ public class DlogStorage implements IStatefulStorage {
   }
 
   @Override
-  public void store(Checkpoint checkpoint) throws StatefulStorageException {
+  public void storeCheckpoint(CheckpointInfo info, Checkpoint checkpoint)
+      throws StatefulStorageException {
     String checkpointPath = getCheckpointPath(
-        checkpoint.getTopologyName(),
-        checkpoint.getCheckpointId(),
-        checkpoint.getComponent(),
-        checkpoint.getTaskId());
+        topologyName,
+        info.getCheckpointId(),
+        info.getComponent(),
+        info.getInstanceId());
 
     OutputStream out = null;
     try {
@@ -147,15 +152,13 @@ public class DlogStorage implements IStatefulStorage {
   }
 
   @Override
-  public Checkpoint restore(String topologyName,
-                            String checkpointId,
-                            PhysicalPlans.Instance instanceInfo)
+  public Checkpoint restoreCheckpoint(CheckpointInfo info)
       throws StatefulStorageException {
     String checkpointPath = getCheckpointPath(
         topologyName,
-        checkpointId,
-        instanceInfo.getInfo().getComponentName(),
-        instanceInfo.getInfo().getTaskId());
+        info.getCheckpointId(),
+        info.getComponent(),
+        info.getInstanceId());
 
     InputStream in = null;
     CheckpointManager.InstanceStateCheckpoint state;
@@ -168,13 +171,24 @@ public class DlogStorage implements IStatefulStorage {
       SysUtils.closeIgnoringExceptions(in);
     }
 
-    return new Checkpoint(topologyName, instanceInfo, state);
+    return new Checkpoint(state);
+  }
+
+  @Override
+  public void storeComponentMetaData(CheckpointInfo info, CheckpointMetadata metadata)
+      throws StatefulStorageException {
+    // TODO(nwang): To implement
+  }
+
+  @Override
+  public CheckpointMetadata restoreComponentMetadata(CheckpointInfo info)
+      throws StatefulStorageException {
+    // TODO(nwang): To implement
+    return null;
   }
 
   @Override
-  public void dispose(String topologyName,
-                      String oldestCheckpointId,
-                      boolean deleteAll)
+  public void dispose(String oldestCheckpointId, boolean deleteAll)
       throws StatefulStorageException {
 
     // Currently dlog doesn't support recursive deletion. so we have to fetch all the checkpoints
diff --git a/heron/statefulstorages/src/java/org/apache/heron/statefulstorage/hdfs/HDFSStorage.java b/heron/statefulstorages/src/java/org/apache/heron/statefulstorage/hdfs/HDFSStorage.java
index 1c4bc0a..a1df8e8 100644
--- a/heron/statefulstorages/src/java/org/apache/heron/statefulstorage/hdfs/HDFSStorage.java
+++ b/heron/statefulstorages/src/java/org/apache/heron/statefulstorage/hdfs/HDFSStorage.java
@@ -31,8 +31,9 @@ import org.apache.hadoop.fs.FileSystem;
 import org.apache.hadoop.fs.Path;
 import org.apache.heron.common.basics.SysUtils;
 import org.apache.heron.proto.ckptmgr.CheckpointManager;
-import org.apache.heron.proto.system.PhysicalPlans;
 import org.apache.heron.spi.statefulstorage.Checkpoint;
+import org.apache.heron.spi.statefulstorage.CheckpointInfo;
+import org.apache.heron.spi.statefulstorage.CheckpointMetadata;
 import org.apache.heron.spi.statefulstorage.IStatefulStorage;
 import org.apache.heron.spi.statefulstorage.StatefulStorageException;
 
@@ -46,12 +47,15 @@ public class HDFSStorage implements IStatefulStorage {
 
   private String checkpointRootPath;
   private FileSystem fileSystem;
+  private String topologyName;
 
   @Override
-  public void init(Map<String, Object> conf) throws StatefulStorageException {
+  public void init(String topology, final Map<String, Object> conf)
+      throws StatefulStorageException {
     LOG.info("Initializing... Config: " + conf.toString());
     LOG.info("Class path: " + System.getProperty("java.class.path"));
 
+    this.topologyName = topology;
     checkpointRootPath = (String) conf.get(ROOT_PATH_KEY);
 
     // Notice, we pass the config folder via classpath
@@ -73,17 +77,16 @@ public class HDFSStorage implements IStatefulStorage {
   }
 
   @Override
-  public void store(Checkpoint checkpoint) throws StatefulStorageException {
-    Path path = new Path(getCheckpointPath(checkpoint.getTopologyName(),
-                                           checkpoint.getCheckpointId(),
-                                           checkpoint.getComponent(),
-                                           checkpoint.getTaskId()));
+  public void storeCheckpoint(CheckpointInfo info, Checkpoint checkpoint)
+      throws StatefulStorageException {
+    Path path = new Path(getCheckpointPath(info.getCheckpointId(),
+                                           info.getComponent(),
+                                           info.getInstanceId()));
 
     // We need to ensure the existence of directories structure,
     // since it is not guaranteed that FileSystem.create(..) always creates parents' dirs.
-    String checkpointDir = getCheckpointDir(checkpoint.getTopologyName(),
-                                            checkpoint.getCheckpointId(),
-                                            checkpoint.getComponent());
+    String checkpointDir = getCheckpointDir(info.getCheckpointId(),
+                                            info.getComponent());
     createDir(checkpointDir);
 
     FSDataOutputStream out = null;
@@ -98,11 +101,11 @@ public class HDFSStorage implements IStatefulStorage {
   }
 
   @Override
-  public Checkpoint restore(String topologyName, String checkpointId,
-                            PhysicalPlans.Instance instanceInfo) throws StatefulStorageException {
-    Path path = new Path(getCheckpointPath(topologyName, checkpointId,
-                                           instanceInfo.getInfo().getComponentName(),
-                                           instanceInfo.getInfo().getTaskId()));
+  public Checkpoint restoreCheckpoint(CheckpointInfo info)
+      throws StatefulStorageException {
+    Path path = new Path(getCheckpointPath(info.getCheckpointId(),
+                                           info.getComponent(),
+                                           info.getInstanceId()));
 
     FSDataInputStream in = null;
     CheckpointManager.InstanceStateCheckpoint state = null;
@@ -115,13 +118,26 @@ public class HDFSStorage implements IStatefulStorage {
     } finally {
       SysUtils.closeIgnoringExceptions(in);
     }
-    return new Checkpoint(topologyName, instanceInfo, state);
+    return new Checkpoint(state);
   }
 
   @Override
-  public void dispose(String topologyName, String oldestCheckpointPreserved,
-                      boolean deleteAll) throws StatefulStorageException {
-    String topologyCheckpointRoot = getTopologyCheckpointRoot(topologyName);
+  public void storeComponentMetaData(CheckpointInfo info, CheckpointMetadata metadata)
+      throws StatefulStorageException {
+    // TODO(nwang): To implement
+  }
+
+  @Override
+  public CheckpointMetadata restoreComponentMetadata(CheckpointInfo info)
+      throws StatefulStorageException {
+    // TODO(nwang): To implement
+    return null;
+  }
+
+  @Override
+  public void dispose(String oldestCheckpointPreserved, boolean deleteAll)
+      throws StatefulStorageException {
+    String topologyCheckpointRoot = getTopologyCheckpointRoot();
     Path topologyRootPath = new Path(topologyCheckpointRoot);
 
     if (deleteAll) {
@@ -177,18 +193,16 @@ public class HDFSStorage implements IStatefulStorage {
     }
   }
 
-  private String getTopologyCheckpointRoot(String topologyName) {
+  private String getTopologyCheckpointRoot() {
     return String.format("%s/%s", checkpointRootPath, topologyName);
   }
 
-  private String getCheckpointDir(String topologyName, String checkpointId, String componentName) {
+  private String getCheckpointDir(String checkpointId, String componentName) {
     return String.format("%s/%s/%s",
-        getTopologyCheckpointRoot(topologyName), checkpointId, componentName);
+        getTopologyCheckpointRoot(), checkpointId, componentName);
   }
 
-  private String getCheckpointPath(String topologyName, String checkpointId,
-                                   String componentName, int taskId) {
-    return String.format("%s/%d", getCheckpointDir(topologyName, checkpointId, componentName),
-                         taskId);
+  private String getCheckpointPath(String checkpointId, String componentName, int taskId) {
+    return String.format("%s/%d", getCheckpointDir(checkpointId, componentName), taskId);
   }
 }
diff --git a/heron/statefulstorages/src/java/org/apache/heron/statefulstorage/localfs/LocalFileSystemStorage.java b/heron/statefulstorages/src/java/org/apache/heron/statefulstorage/localfs/LocalFileSystemStorage.java
index 60fdc53..f7c386a 100644
--- a/heron/statefulstorages/src/java/org/apache/heron/statefulstorage/localfs/LocalFileSystemStorage.java
+++ b/heron/statefulstorages/src/java/org/apache/heron/statefulstorage/localfs/LocalFileSystemStorage.java
@@ -28,8 +28,9 @@ import com.google.protobuf.InvalidProtocolBufferException;
 
 import org.apache.heron.common.basics.FileUtils;
 import org.apache.heron.proto.ckptmgr.CheckpointManager;
-import org.apache.heron.proto.system.PhysicalPlans;
 import org.apache.heron.spi.statefulstorage.Checkpoint;
+import org.apache.heron.spi.statefulstorage.CheckpointInfo;
+import org.apache.heron.spi.statefulstorage.CheckpointMetadata;
 import org.apache.heron.spi.statefulstorage.IStatefulStorage;
 import org.apache.heron.spi.statefulstorage.StatefulStorageException;
 
@@ -42,10 +43,13 @@ public class  LocalFileSystemStorage implements IStatefulStorage {
   private static final int DEFAULT_MAX_CHECKPOINTS = 10;
 
   private String checkpointRootPath;
+  private String topologyName;
   private int maxCheckpoints;
 
   @Override
-  public void init(Map<String, Object> conf) throws StatefulStorageException {
+  public void init(String topology, final Map<String, Object> conf)
+      throws StatefulStorageException {
+    this.topologyName = topology;
     checkpointRootPath = (String) conf.get(ROOT_PATH_KEY);
     maxCheckpoints = (int) conf.getOrDefault(MAX_CHECKPOINTS_KEY, DEFAULT_MAX_CHECKPOINTS);
 
@@ -63,20 +67,20 @@ public class  LocalFileSystemStorage implements IStatefulStorage {
   }
 
   @Override
-  public void store(Checkpoint checkpoint) throws StatefulStorageException {
+  public void storeCheckpoint(CheckpointInfo info, Checkpoint checkpoint)
+      throws StatefulStorageException {
     // heron doesn't clean checkpoints stored on local disk automatically
     // localFS cleans checkpoints before store and limits the number of checkpoints saved
-    String rootPath = getTopologyCheckpointRoot(checkpoint.getTopologyName());
+    String rootPath = getTopologyCheckpointRoot();
     cleanCheckpoints(new File(rootPath), maxCheckpoints);
 
-    String path = getCheckpointPath(checkpoint.getTopologyName(), checkpoint.getCheckpointId(),
-                                    checkpoint.getComponent(), checkpoint.getTaskId());
+    String path = getCheckpointPath(info.getCheckpointId(),
+                                    info.getComponent(), info.getInstanceId());
 
     // We would try to create but we would not enforce this operation successful,
     // since it is possible already created by others
-    String checkpointDir = getCheckpointDir(checkpoint.getTopologyName(),
-                                            checkpoint.getCheckpointId(),
-                                            checkpoint.getComponent());
+    String checkpointDir = getCheckpointDir(info.getCheckpointId(),
+                                            info.getComponent());
     FileUtils.createDirectory(checkpointDir);
 
     // Do a check after the attempt
@@ -93,11 +97,10 @@ public class  LocalFileSystemStorage implements IStatefulStorage {
   }
 
   @Override
-  public Checkpoint restore(String topologyName, String checkpointId,
-                            PhysicalPlans.Instance instanceInfo) throws StatefulStorageException {
-    String path = getCheckpointPath(topologyName, checkpointId,
-                                    instanceInfo.getInfo().getComponentName(),
-                                    instanceInfo.getInfo().getTaskId());
+  public Checkpoint restoreCheckpoint(CheckpointInfo info)
+      throws StatefulStorageException {
+    String path = getCheckpointPath(info.getCheckpointId(), info.getComponent(),
+                                    info.getInstanceId());
 
     byte[] res = FileUtils.readFromFile(path);
     if (res.length != 0) {
@@ -109,16 +112,29 @@ public class  LocalFileSystemStorage implements IStatefulStorage {
       } catch (InvalidProtocolBufferException e) {
         throw new StatefulStorageException("Failed to parse the data", e);
       }
-      return new Checkpoint(topologyName, instanceInfo, state);
+      return new Checkpoint(state);
     } else {
       throw new StatefulStorageException("Failed to parse the data");
     }
   }
 
   @Override
-  public void dispose(String topologyName, String oldestCheckpointPreserved,
-                      boolean deleteAll) throws StatefulStorageException {
-    String topologyCheckpointRoot = getTopologyCheckpointRoot(topologyName);
+  public void storeComponentMetaData(CheckpointInfo info, CheckpointMetadata metadata)
+      throws StatefulStorageException {
+    // TODO(nwang): To implement
+  }
+
+  @Override
+  public CheckpointMetadata restoreComponentMetadata(CheckpointInfo info)
+      throws StatefulStorageException {
+    // TODO(nwang): To implement
+    return null;
+  }
+
+  @Override
+  public void dispose(String oldestCheckpointPreserved, boolean deleteAll)
+      throws StatefulStorageException {
+    String topologyCheckpointRoot = getTopologyCheckpointRoot();
 
     if (deleteAll) {
       // Clean all checkpoint states
@@ -148,6 +164,10 @@ public class  LocalFileSystemStorage implements IStatefulStorage {
     }
   }
 
+  private String getTopologyCheckpointRoot() {
+    return String.format("%s/%s", checkpointRootPath, topologyName);
+  }
+
   protected void cleanCheckpoints(File rootFile, int remaining) throws StatefulStorageException {
     if (FileUtils.isDirectoryExists(rootFile.getAbsolutePath())
         && FileUtils.hasChildren(rootFile.getAbsolutePath())) {
@@ -166,18 +186,11 @@ public class  LocalFileSystemStorage implements IStatefulStorage {
     }
   }
 
-  private String getTopologyCheckpointRoot(String topologyName) {
-    return String.format("%s/%s", checkpointRootPath, topologyName);
-  }
-
-  private String getCheckpointDir(String topologyName, String checkpointId, String componentName) {
-    return String.format("%s/%s/%s",
-        getTopologyCheckpointRoot(topologyName), checkpointId, componentName);
+  private String getCheckpointDir(String checkpointId, String componentName) {
+    return String.format("%s/%s/%s", getTopologyCheckpointRoot(), checkpointId, componentName);
   }
 
-  private String getCheckpointPath(String topologyName, String checkpointId,
-                                   String componentName, int taskId) {
-    return String.format("%s/%d", getCheckpointDir(topologyName, checkpointId, componentName),
-                         taskId);
+  private String getCheckpointPath(String checkpointId, String componentName, int taskId) {
+    return String.format("%s/%d", getCheckpointDir(checkpointId, componentName), taskId);
   }
 }
diff --git a/heron/statefulstorages/tests/java/org/apache/heron/statefulstorage/dlog/DlogStorageTest.java b/heron/statefulstorages/tests/java/org/apache/heron/statefulstorage/dlog/DlogStorageTest.java
index d9102b1..6e55c3c 100644
--- a/heron/statefulstorages/tests/java/org/apache/heron/statefulstorage/dlog/DlogStorageTest.java
+++ b/heron/statefulstorages/tests/java/org/apache/heron/statefulstorage/dlog/DlogStorageTest.java
@@ -43,6 +43,7 @@ import org.apache.distributedlog.api.namespace.NamespaceBuilder;
 import org.apache.heron.proto.ckptmgr.CheckpointManager;
 import org.apache.heron.proto.system.PhysicalPlans;
 import org.apache.heron.spi.statefulstorage.Checkpoint;
+import org.apache.heron.spi.statefulstorage.CheckpointInfo;
 import org.apache.heron.statefulstorage.StatefulStorageTestContext;
 
 import static org.junit.Assert.assertEquals;
@@ -63,7 +64,7 @@ public class DlogStorageTest {
   private static final String ROOT_URI = "distributedlog://127.0.0.1/heron/statefulstorage";
 
   private PhysicalPlans.Instance instance;
-  private CheckpointManager.InstanceStateCheckpoint instanceStateCheckpoint;
+  private CheckpointManager.InstanceStateCheckpoint checkpointPartition;
 
   private DlogStorage dlogStorage;
   private NamespaceBuilder mockNsBuilder;
@@ -82,11 +83,11 @@ public class DlogStorageTest {
     when(mockNsBuilder.build()).thenReturn(mockNamespace);
 
     dlogStorage = new DlogStorage(() -> mockNsBuilder);
-    dlogStorage.init(config);
+    dlogStorage.init(StatefulStorageTestContext.TOPOLOGY_NAME, config);
     dlogStorage = spy(dlogStorage);
 
     instance = StatefulStorageTestContext.getInstance();
-    instanceStateCheckpoint = StatefulStorageTestContext.getInstanceStateCheckpoint();
+    checkpointPartition = StatefulStorageTestContext.getInstanceStateCheckpoint();
   }
 
   @After
@@ -100,15 +101,16 @@ public class DlogStorageTest {
     CheckpointManager.InstanceStateCheckpoint mockCheckpointState =
         mock(CheckpointManager.InstanceStateCheckpoint.class);
 
-    Checkpoint checkpoint =
-        new Checkpoint(StatefulStorageTestContext.TOPOLOGY_NAME, instance, mockCheckpointState);
+    final CheckpointInfo info = new CheckpointInfo(
+        StatefulStorageTestContext.CHECKPOINT_ID, instance);
+    Checkpoint checkpoint = new Checkpoint(mockCheckpointState);
 
     DistributedLogManager mockDLM = mock(DistributedLogManager.class);
     when(mockNamespace.openLog(anyString())).thenReturn(mockDLM);
     AppendOnlyStreamWriter mockWriter = mock(AppendOnlyStreamWriter.class);
     when(mockDLM.getAppendOnlyStreamWriter()).thenReturn(mockWriter);
 
-    dlogStorage.store(checkpoint);
+    dlogStorage.storeCheckpoint(info, checkpoint);
 
     verify(mockWriter).markEndOfStream();
     verify(mockWriter).close();
@@ -116,22 +118,18 @@ public class DlogStorageTest {
 
   @Test
   public void testRestore() throws Exception {
-    Checkpoint restoreCheckpoint =
-        new Checkpoint(StatefulStorageTestContext.TOPOLOGY_NAME, instance, instanceStateCheckpoint);
-
     InputStream mockInputStream = mock(InputStream.class);
     doReturn(mockInputStream).when(dlogStorage).openInputStream(anyString());
 
     PowerMockito.spy(CheckpointManager.InstanceStateCheckpoint.class);
-    PowerMockito.doReturn(instanceStateCheckpoint)
+    PowerMockito.doReturn(checkpointPartition)
         .when(CheckpointManager.InstanceStateCheckpoint.class,
             "parseFrom", mockInputStream);
 
-    dlogStorage.restore(
-        StatefulStorageTestContext.TOPOLOGY_NAME,
-        StatefulStorageTestContext.CHECKPOINT_ID,
-        instance);
-    assertEquals(restoreCheckpoint.getCheckpoint(), instanceStateCheckpoint);
+    final CheckpointInfo info = new CheckpointInfo(
+        StatefulStorageTestContext.CHECKPOINT_ID, instance);
+    Checkpoint restoreCheckpoint = dlogStorage.restoreCheckpoint(info);
+    assertEquals(restoreCheckpoint.getCheckpoint(), checkpointPartition);
   }
 
   @Test
@@ -167,10 +165,7 @@ public class DlogStorageTest {
     when(mockCheckpoint1.getLogs()).thenReturn(chkp1Tasks.iterator());
     when(mockCheckpoint2.getLogs()).thenReturn(chkp2Tasks.iterator());
 
-    dlogStorage.dispose(
-        StatefulStorageTestContext.TOPOLOGY_NAME,
-        "checkpoint0",
-        true);
+    dlogStorage.dispose("checkpoint0", true);
 
     verify(mockCheckpoint1, times(1)).deleteLog(eq("component1_task1"));
     verify(mockCheckpoint1, times(1)).deleteLog(eq("component1_task2"));
@@ -211,10 +206,7 @@ public class DlogStorageTest {
     when(mockCheckpoint1.getLogs()).thenReturn(chkp1Tasks.iterator());
     when(mockCheckpoint2.getLogs()).thenReturn(chkp2Tasks.iterator());
 
-    dlogStorage.dispose(
-        StatefulStorageTestContext.TOPOLOGY_NAME,
-        "checkpoint0",
-        false);
+    dlogStorage.dispose("checkpoint0", false);
 
     verify(mockCheckpoint1, times(0)).deleteLog(eq("component1_task1"));
     verify(mockCheckpoint1, times(0)).deleteLog(eq("component1_task2"));
@@ -255,10 +247,7 @@ public class DlogStorageTest {
     when(mockCheckpoint1.getLogs()).thenReturn(chkp1Tasks.iterator());
     when(mockCheckpoint2.getLogs()).thenReturn(chkp2Tasks.iterator());
 
-    dlogStorage.dispose(
-        StatefulStorageTestContext.TOPOLOGY_NAME,
-        "checkpoint2",
-        false);
+    dlogStorage.dispose("checkpoint2", false);
 
     verify(mockCheckpoint1, times(1)).deleteLog(eq("component1_task1"));
     verify(mockCheckpoint1, times(1)).deleteLog(eq("component1_task2"));
diff --git a/heron/statefulstorages/tests/java/org/apache/heron/statefulstorage/hdfs/HDFSStorageTest.java b/heron/statefulstorages/tests/java/org/apache/heron/statefulstorage/hdfs/HDFSStorageTest.java
index 1eeb4cc..ff65f6e 100644
--- a/heron/statefulstorages/tests/java/org/apache/heron/statefulstorage/hdfs/HDFSStorageTest.java
+++ b/heron/statefulstorages/tests/java/org/apache/heron/statefulstorage/hdfs/HDFSStorageTest.java
@@ -41,6 +41,7 @@ import org.apache.hadoop.fs.Path;
 import org.apache.heron.proto.ckptmgr.CheckpointManager;
 import org.apache.heron.proto.system.PhysicalPlans;
 import org.apache.heron.spi.statefulstorage.Checkpoint;
+import org.apache.heron.spi.statefulstorage.CheckpointInfo;
 import org.apache.heron.statefulstorage.StatefulStorageTestContext;
 
 import static org.junit.Assert.assertEquals;
@@ -77,7 +78,7 @@ public class HDFSStorageTest {
     PowerMockito.doReturn(mockFileSystem).when(FileSystem.class, "get", any(Configuration.class));
 
     hdfsStorage = spy(HDFSStorage.class);
-    hdfsStorage.init(config);
+    hdfsStorage.init(StatefulStorageTestContext.TOPOLOGY_NAME, config);
 
     instance = StatefulStorageTestContext.getInstance();
     instanceCheckpointState = StatefulStorageTestContext.getInstanceStateCheckpoint();
@@ -94,41 +95,40 @@ public class HDFSStorageTest {
     CheckpointManager.InstanceStateCheckpoint mockCheckpointState =
         mock(CheckpointManager.InstanceStateCheckpoint.class);
 
-    Checkpoint checkpoint =
-        new Checkpoint(StatefulStorageTestContext.TOPOLOGY_NAME, instance, mockCheckpointState);
+    Checkpoint checkpoint = new Checkpoint(mockCheckpointState);
 
     FSDataOutputStream mockFSDateOutputStream = mock(FSDataOutputStream.class);
     when(mockFileSystem.create(any(Path.class))).thenReturn(mockFSDateOutputStream);
 
     doNothing().when(hdfsStorage).createDir(anyString());
 
-    hdfsStorage.store(checkpoint);
+    final CheckpointInfo info = new CheckpointInfo(
+        StatefulStorageTestContext.CHECKPOINT_ID, instance);
+    hdfsStorage.storeCheckpoint(info, checkpoint);
 
     verify(mockCheckpointState).writeTo(mockFSDateOutputStream);
   }
 
   @Test
   public void testRestore() throws Exception {
-    Checkpoint restoreCheckpoint =
-        new Checkpoint(StatefulStorageTestContext.TOPOLOGY_NAME, instance, instanceCheckpointState);
-
     FSDataInputStream mockFSDataInputStream = mock(FSDataInputStream.class);
 
     when(mockFileSystem.open(any(Path.class))).thenReturn(mockFSDataInputStream);
 
     PowerMockito.spy(CheckpointManager.InstanceStateCheckpoint.class);
     PowerMockito.doReturn(instanceCheckpointState)
-        .when(CheckpointManager.InstanceStateCheckpoint.class, "parseFrom", mockFSDataInputStream);
+        .when(CheckpointManager.InstanceStateCheckpoint.class, "parseFrom",
+            mockFSDataInputStream);
 
-    hdfsStorage.restore(StatefulStorageTestContext.TOPOLOGY_NAME,
+    final CheckpointInfo info = new CheckpointInfo(
         StatefulStorageTestContext.CHECKPOINT_ID, instance);
+    Checkpoint restoreCheckpoint = hdfsStorage.restoreCheckpoint(info);
     assertEquals(restoreCheckpoint.getCheckpoint(), instanceCheckpointState);
   }
 
   @Test
   public void testDisposeAll() throws Exception {
-    hdfsStorage.dispose(StatefulStorageTestContext.TOPOLOGY_NAME,
-        StatefulStorageTestContext.CHECKPOINT_ID, true);
+    hdfsStorage.dispose(StatefulStorageTestContext.CHECKPOINT_ID, true);
     verify(mockFileSystem).delete(any(Path.class), eq(true));
   }
 
@@ -149,8 +149,7 @@ public class HDFSStorageTest {
         .thenReturn(mockFileStatus)
         .thenReturn(emptyFileStatus);
 
-    hdfsStorage.dispose(StatefulStorageTestContext.TOPOLOGY_NAME,
-        StatefulStorageTestContext.CHECKPOINT_ID, false);
+    hdfsStorage.dispose(StatefulStorageTestContext.CHECKPOINT_ID, false);
 
     verify(mockFileSystem, times(2)).delete(any(Path.class), eq(true));
   }
diff --git a/heron/statefulstorages/tests/java/org/apache/heron/statefulstorage/localfs/LocalFileSystemStorageTest.java b/heron/statefulstorages/tests/java/org/apache/heron/statefulstorage/localfs/LocalFileSystemStorageTest.java
index 2875f64..e6d8481 100644
--- a/heron/statefulstorages/tests/java/org/apache/heron/statefulstorage/localfs/LocalFileSystemStorageTest.java
+++ b/heron/statefulstorages/tests/java/org/apache/heron/statefulstorage/localfs/LocalFileSystemStorageTest.java
@@ -35,6 +35,7 @@ import org.apache.heron.common.basics.FileUtils;
 import org.apache.heron.proto.ckptmgr.CheckpointManager.InstanceStateCheckpoint;
 import org.apache.heron.proto.system.PhysicalPlans;
 import org.apache.heron.spi.statefulstorage.Checkpoint;
+import org.apache.heron.spi.statefulstorage.CheckpointInfo;
 import org.apache.heron.statefulstorage.StatefulStorageTestContext;
 
 import static org.junit.Assert.assertEquals;
@@ -61,7 +62,7 @@ public class LocalFileSystemStorageTest {
     config.put(StatefulStorageTestContext.ROOT_PATH_KEY, StatefulStorageTestContext.ROOT_PATH);
 
     localFileSystemStorage = spy(new LocalFileSystemStorage());
-    localFileSystemStorage.init(config);
+    localFileSystemStorage.init(StatefulStorageTestContext.TOPOLOGY_NAME, config);
 
     instance = StatefulStorageTestContext.getInstance();
     checkpoint = StatefulStorageTestContext.getInstanceStateCheckpoint();
@@ -85,7 +86,9 @@ public class LocalFileSystemStorageTest {
     Checkpoint mockCheckpoint = mock(Checkpoint.class);
     when(mockCheckpoint.getCheckpoint()).thenReturn(checkpoint);
 
-    localFileSystemStorage.store(mockCheckpoint);
+    final CheckpointInfo info = new CheckpointInfo(
+        StatefulStorageTestContext.CHECKPOINT_ID, instance);
+    localFileSystemStorage.storeCheckpoint(info, mockCheckpoint);
 
     PowerMockito.verifyStatic(times(1));
     FileUtils.writeToFile(anyString(), eq(checkpoint.toByteArray()), eq(true));
@@ -118,12 +121,11 @@ public class LocalFileSystemStorageTest {
     PowerMockito.doReturn(checkpoint.toByteArray())
         .when(FileUtils.class, "readFromFile", anyString());
 
-    Checkpoint ckpt =
-        new Checkpoint(StatefulStorageTestContext.TOPOLOGY_NAME, instance, checkpoint);
-
-    localFileSystemStorage.restore(StatefulStorageTestContext.TOPOLOGY_NAME,
+    final CheckpointInfo info = new CheckpointInfo(
         StatefulStorageTestContext.CHECKPOINT_ID, instance);
 
+    Checkpoint ckpt = localFileSystemStorage.restoreCheckpoint(info);
+
     assertEquals(checkpoint, ckpt.getCheckpoint());
   }
 
@@ -133,7 +135,7 @@ public class LocalFileSystemStorageTest {
     PowerMockito.doReturn(true).when(FileUtils.class, "deleteDir", anyString());
     PowerMockito.doReturn(false).when(FileUtils.class, "isDirectoryExists", anyString());
 
-    localFileSystemStorage.dispose(StatefulStorageTestContext.TOPOLOGY_NAME, "", true);
+    localFileSystemStorage.dispose("", true);
 
     PowerMockito.verifyStatic(times(1));
     FileUtils.deleteDir(anyString());
diff --git a/heron/stmgr/src/cpp/manager/ckptmgr-client.cpp b/heron/stmgr/src/cpp/manager/ckptmgr-client.cpp
index 2784e67..c6e6a04 100644
--- a/heron/stmgr/src/cpp/manager/ckptmgr-client.cpp
+++ b/heron/stmgr/src/cpp/manager/ckptmgr-client.cpp
@@ -33,7 +33,8 @@ CkptMgrClient::CkptMgrClient(EventLoop* eventloop, const NetworkOptions& _option
                              std::function<void(const proto::system::Instance&,
                                                 const std::string&)> _ckpt_saved_watcher,
                              std::function<void(proto::system::StatusCode, sp_int32, sp_string,
-                               const proto::ckptmgr::InstanceStateCheckpoint&)> _ckpt_get_watcher,
+                               const proto::ckptmgr::InstanceStateCheckpoint&)>
+                               _ckpt_get_watcher,
                              std::function<void()> _register_watcher)
     : Client(eventloop, _options),
       topology_name_(_topology_name),
diff --git a/heron/stmgr/src/cpp/manager/ckptmgr-client.h b/heron/stmgr/src/cpp/manager/ckptmgr-client.h
index 7ea30e1..2886cfb 100644
--- a/heron/stmgr/src/cpp/manager/ckptmgr-client.h
+++ b/heron/stmgr/src/cpp/manager/ckptmgr-client.h
@@ -37,7 +37,8 @@ class CkptMgrClient : public Client {
                 std::function<void(const proto::system::Instance&,
                                    const std::string&)> _ckpt_saved_watcher,
                 std::function<void(proto::system::StatusCode, sp_int32, sp_string,
-                              const proto::ckptmgr::InstanceStateCheckpoint&)> _ckpt_get_watcher,
+                              const proto::ckptmgr::InstanceStateCheckpoint&)>
+                              _ckpt_get_watcher,
                 std::function<void()> _register_watcher);
   virtual ~CkptMgrClient();
 
diff --git a/heron/stmgr/src/cpp/manager/instance-server.h b/heron/stmgr/src/cpp/manager/instance-server.h
index c85a894..5ad504a 100644
--- a/heron/stmgr/src/cpp/manager/instance-server.h
+++ b/heron/stmgr/src/cpp/manager/instance-server.h
@@ -82,7 +82,7 @@ class InstanceServer : public Server {
   void InitiateStatefulCheckpoint(const sp_string& _checkpoint_tag);
   // Send a RestoreInstanceStateRequest to _task_id asking it to restore itself from _state
   virtual bool SendRestoreInstanceStateRequest(sp_int32 _task_id,
-                                           const proto::ckptmgr::InstanceStateCheckpoint& _state);
+      const proto::ckptmgr::InstanceStateCheckpoint& _state);
   // Send StartInstanceStatefulProcessing message to all instances so that they can start
   // processing
   void SendStartInstanceStatefulProcessing(const std::string& _ckpt_id);
diff --git a/heron/stmgr/src/cpp/manager/stateful-restorer.cpp b/heron/stmgr/src/cpp/manager/stateful-restorer.cpp
index f82c1d3..538c71d 100644
--- a/heron/stmgr/src/cpp/manager/stateful-restorer.cpp
+++ b/heron/stmgr/src/cpp/manager/stateful-restorer.cpp
@@ -141,8 +141,8 @@ void StatefulRestorer::GetCheckpoints() {
 }
 
 void StatefulRestorer::HandleCheckpointState(proto::system::StatusCode _status, sp_int32 _task_id,
-                                       sp_string _checkpoint_id,
-                                       const proto::ckptmgr::InstanceStateCheckpoint& _state) {
+    sp_string _checkpoint_id,
+    const proto::ckptmgr::InstanceStateCheckpoint& _state) {
   LOG(INFO) << "Got InstanceState from checkpoint mgr for task " << _task_id
             << " and checkpoint " << _state.checkpoint_id();
   multi_count_metric_->scope(METRIC_CKPT_RESPONSES)->incr();
diff --git a/heron/stmgr/src/cpp/manager/stmgr.h b/heron/stmgr/src/cpp/manager/stmgr.h
index d21bc61..a1444dd 100644
--- a/heron/stmgr/src/cpp/manager/stmgr.h
+++ b/heron/stmgr/src/cpp/manager/stmgr.h
@@ -82,8 +82,9 @@ class StMgr {
                           proto::system::HeronTupleSet* _message);
   // Called when an instance does checkpoint and sends its checkpoint
   // to the stmgr to save it
-  void HandleStoreInstanceStateCheckpoint(const proto::ckptmgr::InstanceStateCheckpoint& _message,
-                                          const proto::system::Instance& _instance);
+  void HandleStoreInstanceStateCheckpoint(
+      const proto::ckptmgr::InstanceStateCheckpoint& _message,
+      const proto::system::Instance& _instance);
   void DrainInstanceData(sp_int32 _task_id, proto::system::HeronTupleSet2* _tuple);
   // Send checkpoint message to this task_id
   void DrainDownstreamCheckpoint(sp_int32 _task_id,
diff --git a/heron/stmgr/tests/cpp/server/dummy_ckptmgr_client.cpp b/heron/stmgr/tests/cpp/server/dummy_ckptmgr_client.cpp
index 5a4a77f..e2e8761 100644
--- a/heron/stmgr/tests/cpp/server/dummy_ckptmgr_client.cpp
+++ b/heron/stmgr/tests/cpp/server/dummy_ckptmgr_client.cpp
@@ -93,10 +93,11 @@ void DummyCkptMgrClient::DummySaveWatcher(const heron::proto::system::Instance&,
                                           const std::string&) {
 }
 
-void DummyCkptMgrClient::DummyGetWatcher(heron::proto::system::StatusCode,
-                                         int32_t,
-                                         const std::string&,
-                                         const heron::proto::ckptmgr::InstanceStateCheckpoint&) {
+void DummyCkptMgrClient::DummyGetWatcher(
+    heron::proto::system::StatusCode,
+    int32_t,
+    const std::string&,
+    const heron::proto::ckptmgr::InstanceStateCheckpoint&) {
 }
 
 void DummyCkptMgrClient::DummyRegisterWatcher() {
diff --git a/heron/stmgr/tests/cpp/server/dummy_instance_server.h b/heron/stmgr/tests/cpp/server/dummy_instance_server.h
index 988aa1f..3b739ba 100644
--- a/heron/stmgr/tests/cpp/server/dummy_instance_server.h
+++ b/heron/stmgr/tests/cpp/server/dummy_instance_server.h
@@ -68,7 +68,7 @@ class DummyInstanceServer : public heron::stmgr::InstanceServer {
   virtual void SetAllInstancesConnectedToUs(bool val) { all_instances_connected_ = val; }
 
   virtual bool SendRestoreInstanceStateRequest(sp_int32 _task_id,
-                                         const heron::proto::ckptmgr::InstanceStateCheckpoint&) {
+      const heron::proto::ckptmgr::InstanceStateCheckpoint&) {
     restore_sent_.insert(_task_id);
     return true;
   }