You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@heron.apache.org by GitBox <gi...@apache.org> on 2018/07/07 18:23:18 UTC

[GitHub] nlu90 closed pull request #2891: Refactor StatefulStorage API

nlu90 closed pull request #2891: Refactor StatefulStorage API
URL: https://github.com/apache/incubator-heron/pull/2891
 
 
   

This is a PR merged from a forked repository.
As GitHub hides the original diff on merge, it is displayed below for
the sake of provenance:

As this is a foreign pull request (from a fork), the diff is supplied
below (as it won't show otherwise due to GitHub magic):

diff --git a/heron/ckptmgr/src/java/org/apache/heron/ckptmgr/CheckpointManager.java b/heron/ckptmgr/src/java/org/apache/heron/ckptmgr/CheckpointManager.java
index 5c590ffca5..47983863d2 100644
--- a/heron/ckptmgr/src/java/org/apache/heron/ckptmgr/CheckpointManager.java
+++ b/heron/ckptmgr/src/java/org/apache/heron/ckptmgr/CheckpointManager.java
@@ -193,7 +193,7 @@ private static IStatefulStorage setupStatefulStorage(
     }
 
     try {
-      statefulStorage.init(
+      statefulStorage.init(topologyName,
           Collections.unmodifiableMap(checkpointManagerConfig.getStatefulStorageConfig()));
     } catch (StatefulStorageException e) {
       throw new CheckpointManagerException(classname + " init threw exception", e);
diff --git a/heron/ckptmgr/src/java/org/apache/heron/ckptmgr/CheckpointManagerServer.java b/heron/ckptmgr/src/java/org/apache/heron/ckptmgr/CheckpointManagerServer.java
index 46ce36b0fa..4b4654ad9b 100644
--- a/heron/ckptmgr/src/java/org/apache/heron/ckptmgr/CheckpointManagerServer.java
+++ b/heron/ckptmgr/src/java/org/apache/heron/ckptmgr/CheckpointManagerServer.java
@@ -34,6 +34,7 @@
 import org.apache.heron.proto.ckptmgr.CheckpointManager;
 import org.apache.heron.proto.system.Common;
 import org.apache.heron.spi.statefulstorage.Checkpoint;
+import org.apache.heron.spi.statefulstorage.CheckpointInfo;
 import org.apache.heron.spi.statefulstorage.IStatefulStorage;
 import org.apache.heron.spi.statefulstorage.StatefulStorageException;
 
@@ -48,9 +49,14 @@
   private SocketChannel connection;
 
   public CheckpointManagerServer(
-      String topologyName, String topologyId, String checkpointMgrId,
-      IStatefulStorage statefulStorage, NIOLooper looper, String host,
-      int port, HeronSocketOptions options) {
+      String topologyName,
+      String topologyId,
+      String checkpointMgrId,
+      IStatefulStorage statefulStorage,
+      NIOLooper looper,
+      String host,
+      int port,
+      HeronSocketOptions options) {
     super(looper, host, port, options);
 
     this.topologyName = topologyName;
@@ -115,8 +121,7 @@ protected void handleCleanStatefulCheckpointRequest(
     String errorMessage = "";
 
     try {
-      statefulStorage.dispose(topologyName,
-                                 request.getOldestCheckpointPreserved(), deleteAll);
+      statefulStorage.dispose(request.getOldestCheckpointPreserved(), deleteAll);
       LOG.info("Dispose checkpoint successful");
     } catch (StatefulStorageException e) {
       errorMessage = String.format("Request to dispose checkpoint failed for oldest Checkpoint "
@@ -203,25 +208,30 @@ protected void handleSaveInstanceStateRequest(
       SocketChannel channel,
       CheckpointManager.SaveInstanceStateRequest request
   ) {
-    Checkpoint checkpoint = new Checkpoint(topologyName, request.getInstance(),
-                                           request.getCheckpoint());
+    CheckpointInfo info =
+        new CheckpointInfo(request.getCheckpoint().getCheckpointId(),
+                           request.getInstance());
+    Checkpoint checkpoint = new Checkpoint(request.getCheckpoint());
+
     LOG.info(String.format("Got a save checkpoint request for checkpointId %s "
-                           + " component %s instance %s on connection %s",
-                           checkpoint.getCheckpointId(), checkpoint.getComponent(),
-                           checkpoint.getInstance(), channel.socket().getRemoteSocketAddress()));
+                           + " component %s instanceId %s on connection %s",
+                           info.getCheckpointId(),
+                           info.getComponent(),
+                           info.getInstanceId(),
+                           channel.socket().getRemoteSocketAddress()));
 
     Common.StatusCode statusCode = Common.StatusCode.OK;
     String errorMessage = "";
     try {
-      statefulStorage.store(checkpoint);
-      LOG.info(String.format("Saved checkpoint for checkpointId %s compnent %s instance %s",
-                             checkpoint.getCheckpointId(), checkpoint.getComponent(),
-                             checkpoint.getInstance()));
+      statefulStorage.storeCheckpoint(info, checkpoint);
+      LOG.info(String.format("Saved checkpoint for checkpointId %s compnent %s instanceId %s",
+                             info.getCheckpointId(), info.getComponent(),
+                             info.getInstanceId()));
     } catch (StatefulStorageException e) {
       errorMessage = String.format("Save checkpoint not successful for checkpointId "
-                                   + "%s component %s instance %s",
-                                   checkpoint.getCheckpointId(), checkpoint.getComponent(),
-                                   checkpoint.getInstance());
+                                   + "%s component %s instanceId %s",
+                                   info.getCheckpointId(), info.getComponent(),
+                                   info.getInstanceId());
       statusCode = Common.StatusCode.NOTOK;
       LOG.log(Level.WARNING, errorMessage, e);
     }
@@ -241,11 +251,12 @@ protected void handleGetInstanceStateRequest(
       SocketChannel channel,
       CheckpointManager.GetInstanceStateRequest request
   ) {
+    CheckpointInfo info = new CheckpointInfo(request.getCheckpointId(), request.getInstance());
     LOG.info(String.format("Got a get checkpoint request for checkpointId %s "
-                           + " component %s taskId %d on connection %s",
-                           request.getCheckpointId(),
-                           request.getInstance().getInfo().getComponentName(),
-                           request.getInstance().getInfo().getTaskId(),
+                           + " component %s instanceId %d on connection %s",
+                           info.getCheckpointId(),
+                           info.getComponent(),
+                           info.getInstanceId(),
                            channel.socket().getRemoteSocketAddress()));
 
     CheckpointManager.GetInstanceStateResponse.Builder responseBuilder =
@@ -265,18 +276,20 @@ protected void handleGetInstanceStateRequest(
       responseBuilder.setCheckpoint(dummyState);
     } else {
       try {
-        Checkpoint checkpoint = statefulStorage.restore(topologyName, request.getCheckpointId(),
-                                                request.getInstance());
+        Checkpoint checkpoint = statefulStorage.restoreCheckpoint(info);
         LOG.info(String.format("Get checkpoint successful for checkpointId %s "
-                               + "component %s taskId %d", checkpoint.getCheckpointId(),
-                               checkpoint.getComponent(), checkpoint.getTaskId()));
+                               + "component %s instanceId %d",
+                               info.getCheckpointId(),
+                               info.getComponent(),
+                               info.getInstanceId()));
         // Set the checkpoint-state in response
         responseBuilder.setCheckpoint(checkpoint.getCheckpoint());
       } catch (StatefulStorageException e) {
         errorMessage = String.format("Get checkpoint not successful for checkpointId %s "
-                                     + "component %s taskId %d", request.getCheckpointId(),
-                                     request.getInstance().getInfo().getComponentName(),
-                                     request.getInstance().getInfo().getTaskId());
+                                     + "component %s instanceId %d",
+                                     info.getCheckpointId(),
+                                     info.getComponent(),
+                                     info.getInstanceId());
         LOG.log(Level.WARNING, errorMessage, e);
         statusCode = Common.StatusCode.NOTOK;
       }
diff --git a/heron/ckptmgr/tests/java/org/apache/heron/ckptmgr/CheckpointManagerServerTest.java b/heron/ckptmgr/tests/java/org/apache/heron/ckptmgr/CheckpointManagerServerTest.java
index 1ce6bff7e0..c6bffc7ca3 100644
--- a/heron/ckptmgr/tests/java/org/apache/heron/ckptmgr/CheckpointManagerServerTest.java
+++ b/heron/ckptmgr/tests/java/org/apache/heron/ckptmgr/CheckpointManagerServerTest.java
@@ -38,6 +38,7 @@
 import org.apache.heron.proto.ckptmgr.CheckpointManager;
 import org.apache.heron.proto.system.PhysicalPlans;
 import org.apache.heron.spi.statefulstorage.Checkpoint;
+import org.apache.heron.spi.statefulstorage.CheckpointInfo;
 import org.apache.heron.spi.statefulstorage.IStatefulStorage;
 
 import static org.apache.heron.common.testhelpers.HeronServerTester.RESPONSE_RECEIVED_TIMEOUT;
@@ -56,7 +57,8 @@
   private static final String CHECKPOINT_ID = "checkpoint_id";
   private static final String CHECKPOINT_MANAGER_ID = "ckptmgr_id";
 
-  private static CheckpointManager.InstanceStateCheckpoint instanceStateCheckpoint;
+  private static CheckpointManager.InstanceStateCheckpoint checkpointPartition;
+  private static CheckpointManager.CheckpointComponentMetadata checkpointComponentMetadata;
   private static CheckpointManager.SaveInstanceStateRequest saveInstanceStateRequest;
   private static CheckpointManager.GetInstanceStateRequest getInstanceStateRequest;
   private static CheckpointManager.CleanStatefulCheckpointRequest cleanStatefulCheckpointRequest;
@@ -90,14 +92,19 @@ public static void setup() throws Exception {
         .setInfo(info)
         .build();
 
-    instanceStateCheckpoint = CheckpointManager.InstanceStateCheckpoint.newBuilder()
-            .setCheckpointId(CHECKPOINT_ID)
-            .setState(ByteString.copyFrom(BYTES))
-            .build();
+    checkpointPartition = CheckpointManager.InstanceStateCheckpoint.newBuilder()
+        .setCheckpointId(CHECKPOINT_ID)
+        .setState(ByteString.copyFrom(BYTES))
+        .build();
+
+    checkpointComponentMetadata = CheckpointManager.CheckpointComponentMetadata.newBuilder()
+        .setComponentName(COMPONENT_NAME)
+        .setParallelism(2)
+        .build();
 
     saveInstanceStateRequest = CheckpointManager.SaveInstanceStateRequest.newBuilder()
         .setInstance(instance)
-        .setCheckpoint(instanceStateCheckpoint)
+        .setCheckpoint(checkpointPartition)
         .build();
 
     getInstanceStateRequest = CheckpointManager.GetInstanceStateRequest.newBuilder()
@@ -154,7 +161,8 @@ public void testSaveInstanceState() throws Exception {
               @Override
               public void handleResponse(HeronClient client, StatusCode status,
                                          Object ctx, Message response) throws Exception {
-                verify(statefulStorage).store(any(Checkpoint.class));
+                verify(statefulStorage).storeCheckpoint(
+                    any(CheckpointInfo.class), any(Checkpoint.class));
                 assertEquals(CHECKPOINT_ID,
                     ((CheckpointManager.SaveInstanceStateResponse) response).getCheckpointId());
                 assertEquals(instance,
@@ -166,8 +174,10 @@ public void handleResponse(HeronClient client, StatusCode status,
 
   @Test
   public void testGetInstanceState() throws Exception {
-    final Checkpoint checkpoint = new Checkpoint(TOPOLOGY_NAME, instance, instanceStateCheckpoint);
-    when(statefulStorage.restore(TOPOLOGY_NAME, CHECKPOINT_ID, instance)).thenReturn(checkpoint);
+    final CheckpointInfo info = new CheckpointInfo(CHECKPOINT_ID, instance);
+    final Checkpoint checkpoint = new Checkpoint(checkpointPartition);
+    when(statefulStorage.restoreCheckpoint(any(CheckpointInfo.class)))
+        .thenReturn(checkpoint);
 
     runTest(TestRequestHandler.RequestType.GET_INSTANCE_STATE,
         new HeronServerTester.SuccessResponseHandler(
@@ -176,7 +186,7 @@ public void testGetInstanceState() throws Exception {
               @Override
               public void handleResponse(HeronClient client, StatusCode status,
                                          Object ctx, Message response) throws Exception {
-                verify(statefulStorage).restore(TOPOLOGY_NAME, CHECKPOINT_ID, instance);
+                verify(statefulStorage).restoreCheckpoint(info);
                 assertEquals(checkpoint.getCheckpoint(),
                     ((CheckpointManager.GetInstanceStateResponse) response).getCheckpoint());
               }
@@ -193,7 +203,7 @@ public void testCleanStatefulCheckpoint() throws Exception {
               @Override
               public void handleResponse(HeronClient client, StatusCode status,
                                          Object ctx, Message response) throws Exception {
-                verify(statefulStorage).dispose(anyString(), anyString(), anyBoolean());
+                verify(statefulStorage).dispose(anyString(), anyBoolean());
               }
             })
     );
diff --git a/heron/proto/ckptmgr.proto b/heron/proto/ckptmgr.proto
index daf5556f55..68935cc287 100644
--- a/heron/proto/ckptmgr.proto
+++ b/heron/proto/ckptmgr.proto
@@ -82,7 +82,9 @@ message StatefulConsistentCheckpoints {
   repeated StatefulConsistentCheckpoint consistent_checkpoints = 1;
 }
 
-// tmaster -> stmgr messages
+/*
+ * tmaster <-> stmgr messages
+ */
 
 // Message sent to stmgrs by the tmaster to initiate checkpointing
 message StartStatefulCheckpoint {
@@ -140,7 +142,9 @@ message StartStmgrStatefulProcessing {
   required string checkpoint_id = 1;
 }
 
-// tmaster -> ckptmgr messages
+/*
+ * tmaster -> ckptmgr messages
+ */
 
 // This is the message that a tmaster sends
 // when it wants to register itself
@@ -171,7 +175,23 @@ message CleanStatefulCheckpointResponse {
   repeated string cleaned_checkpoint_ids = 2;
 }
 
-// stmgr -> ckptmgr messages
+/*
+ * stmgr -> ckptmgr messages
+ */
+
+// This message encapsulates the info associated with
+// state of an instance/partition
+message InstanceStateCheckpoint {
+  required string checkpoint_id = 1;
+  required bytes state = 2;
+}
+
+// This message encapsulates the info associated with
+// checkpoint metadata of a component
+message CheckpointComponentMetadata {
+  required string component_name = 1;
+  required int32 parallelism = 2;
+}
 
 // This is the message that a stmgr sends
 // when it wants to register itself with
@@ -194,6 +214,8 @@ message RegisterStMgrResponse {
 message SaveInstanceStateRequest {
   // Information about the instance whose state this is
   required heron.proto.system.Instance instance = 1;
+  // TODO(nwang): currently repartition is not supported and each request
+  //     contains a single partition.
   required InstanceStateCheckpoint checkpoint = 2;
 }
 
@@ -221,7 +243,9 @@ message GetInstanceStateResponse {
   optional InstanceStateCheckpoint checkpoint = 4;
 }
 
-// stmgr -> Instance messages
+/*
+ * stmgr -> Instance messages
+ */
 
 // This is the message that the stmgr sends to its
 // local tasks to begin initiating checkpoint
@@ -229,13 +253,6 @@ message InitiateStatefulCheckpoint {
   required string checkpoint_id = 1;
 }
 
-// This proto encapsulates the info associated with
-// state of an instance
-message InstanceStateCheckpoint {
-  required string checkpoint_id = 1;
-  required bytes state = 2;
-}
-
 // This is the message that the instance sends
 // when it wants stmgr to store its state
 message StoreInstanceStateCheckpoint {
@@ -262,7 +279,9 @@ message StartInstanceStatefulProcessing {
   required string checkpoint_id = 1;
 }
 
-// stmgr -> stmgr messages
+/*
+ * stmgr -> stmgr messages
+ */
 
 // Once stmgr receives StoreInstanceStateCheckpoint from an instance,
 // it sends this message to all of that instance's 
diff --git a/heron/spi/src/java/org/apache/heron/spi/statefulstorage/Checkpoint.java b/heron/spi/src/java/org/apache/heron/spi/statefulstorage/Checkpoint.java
index 5796d15b10..029f4f62ea 100644
--- a/heron/spi/src/java/org/apache/heron/spi/statefulstorage/Checkpoint.java
+++ b/heron/spi/src/java/org/apache/heron/spi/statefulstorage/Checkpoint.java
@@ -20,51 +20,27 @@
 package org.apache.heron.spi.statefulstorage;
 
 import org.apache.heron.proto.ckptmgr.CheckpointManager;
-import org.apache.heron.proto.system.PhysicalPlans;
 
+/**
+ * The checkpoint data from an instance. It contains the instance information and the checkpoint
+ * data for the instance.
+ * TODO(nwang): Currently each instance has only one partition.
+ */
 public class Checkpoint {
-  private final String topologyName;
-  private final String checkpointId;
-  private final PhysicalPlans.Instance instanceInfo;
-
   private CheckpointManager.InstanceStateCheckpoint checkpoint;
   private int nBytes;
 
-  public Checkpoint(String topologyName, PhysicalPlans.Instance instanceInfo,
-                    CheckpointManager.InstanceStateCheckpoint checkpoint) {
-    this.topologyName = topologyName;
-    this.checkpointId = checkpoint.getCheckpointId();
-    this.instanceInfo = instanceInfo;
+  public Checkpoint(CheckpointManager.InstanceStateCheckpoint checkpoint) {
     this.checkpoint = checkpoint;
     this.nBytes = checkpoint.getSerializedSize();
   }
 
-  public String getTopologyName() {
-    return topologyName;
-  }
-
-  public String getCheckpointId() {
-    return checkpointId;
-  }
-
-  public String getComponent() {
-    return instanceInfo.getInfo().getComponentName();
-  }
-
-  public String getInstance() {
-    return instanceInfo.getInstanceId();
-  }
-
-  public int getTaskId() {
-    return instanceInfo.getInfo().getTaskId();
-  }
-
   public CheckpointManager.InstanceStateCheckpoint getCheckpoint() {
     return this.checkpoint;
   }
 
   @Override
   public String toString() {
-    return String.format("%s %s %s %s", topologyName, checkpointId, getComponent(), getInstance());
+    return String.format("Checkpoint(%d bytes)", nBytes);
   }
 }
diff --git a/heron/spi/src/java/org/apache/heron/spi/statefulstorage/CheckpointInfo.java b/heron/spi/src/java/org/apache/heron/spi/statefulstorage/CheckpointInfo.java
new file mode 100644
index 0000000000..38c690297a
--- /dev/null
+++ b/heron/spi/src/java/org/apache/heron/spi/statefulstorage/CheckpointInfo.java
@@ -0,0 +1,60 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.heron.spi.statefulstorage;
+
+import org.apache.heron.proto.system.PhysicalPlans;
+
+/**
+ * The information for one checkpoint blob. It can be used to reference checkpoint and
+ * metadata (instance id is ignored).
+ */
+public class CheckpointInfo {
+  // The checkpoint ID under the topology.
+  private final String checkpointId;
+  // The name of the component.
+  private final String componentName;
+  // TODO(nwang): Currently only support one partition per instance.
+  private final int instanceId;
+
+  public CheckpointInfo(String checkpointId, PhysicalPlans.Instance instance) {
+
+    this.checkpointId = checkpointId;
+    this.componentName = instance.getInfo().getComponentName();
+    this.instanceId = instance.getInfo().getComponentIndex();
+  }
+
+  public String getCheckpointId() {
+    return checkpointId;
+  }
+
+  public String getComponent() {
+    return componentName;
+  }
+
+  public int getInstanceId() {
+    return instanceId;
+  }
+
+  @Override
+  public String toString() {
+    return String.format("CheckpointInfo(%s %s %d)",
+                         checkpointId, componentName, instanceId);
+  }
+}
diff --git a/heron/spi/src/java/org/apache/heron/spi/statefulstorage/CheckpointMetadata.java b/heron/spi/src/java/org/apache/heron/spi/statefulstorage/CheckpointMetadata.java
new file mode 100644
index 0000000000..4888ebf28a
--- /dev/null
+++ b/heron/spi/src/java/org/apache/heron/spi/statefulstorage/CheckpointMetadata.java
@@ -0,0 +1,47 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.heron.spi.statefulstorage;
+
+import org.apache.heron.proto.ckptmgr.CheckpointManager.CheckpointComponentMetadata;
+
+/**
+ * The checkpoint metadata for a component.
+ */
+public class CheckpointMetadata {
+  private CheckpointComponentMetadata metadata;
+  private int nBytes;
+
+  public CheckpointMetadata(String componentName, int parallelism) {
+    this.metadata = CheckpointComponentMetadata.newBuilder()
+      .setComponentName(componentName)
+      .setParallelism(parallelism)
+      .build();
+    this.nBytes = this.metadata.getSerializedSize();
+  }
+
+  public CheckpointComponentMetadata getMetadata() {
+    return metadata;
+  }
+
+  @Override
+  public String toString() {
+    return String.format("CheckpointMetadata(%d bytes)", nBytes);
+  }
+}
diff --git a/heron/spi/src/java/org/apache/heron/spi/statefulstorage/IStatefulStorage.java b/heron/spi/src/java/org/apache/heron/spi/statefulstorage/IStatefulStorage.java
index bd0965fa37..0698bf404f 100644
--- a/heron/spi/src/java/org/apache/heron/spi/statefulstorage/IStatefulStorage.java
+++ b/heron/spi/src/java/org/apache/heron/spi/statefulstorage/IStatefulStorage.java
@@ -21,37 +21,67 @@
 
 import java.util.Map;
 
-import org.apache.heron.proto.system.PhysicalPlans;
-
+/**
+ * The interface of all storage classes for checkpoints.
+ * For each checkpoint, two types of data are stored:
+ * - Component Meta Data (one per component).
+ * - Instance Checkpoint Data (one per instance or patition)
+ * Each Stateful Storage implementation needs to handle them accordingly.
+ */
 public interface IStatefulStorage {
   /**
    * Initialize the Stateful Storage
-   *
+   * @param topologyName The name of the topology.
    * @param conf An unmodifiableMap containing basic configuration
-   * Attempts to modify the returned map,
-   * whether direct or via its collection views, result in an UnsupportedOperationException.
    */
-  void init(Map<String, Object> conf) throws StatefulStorageException;
+  void init(String topologyName, final Map<String, Object> conf)
+      throws StatefulStorageException;
 
   /**
    * Closes the Stateful Storage
    */
   void close();
 
-  // Store the checkpoint
-  void store(Checkpoint checkpoint) throws StatefulStorageException;
-
-  // Retrieve the checkpoint
-  Checkpoint restore(String topologyName, String checkpointId,
-                     PhysicalPlans.Instance instanceInfo) throws StatefulStorageException;
-
-  // TODO(mfu): We should refactor all interfaces in IStatefulStorage,
-  // TODO(mfu): instead providing Class Checkpoint, we should provide an Context class,
-  // TODO(mfu): It should:
-  // TODO(mfu): 1. Provide meta data access, like topologyName
-  // TODO(mfu): 2. Provide utils method to parse the protobuf object, like getTaskId()
-  // TODO(mfu): 3. Common methods, like getCheckpointDir()
-  // Dispose the checkpoint
-  void dispose(String topologyName, String oldestCheckpointId, boolean deleteAll)
-                  throws StatefulStorageException;
+  /**
+   * Store instance checkpoint.
+   * @param info The information (reference key) for the checkpoint partition.
+   * @param checkpoint The checkpoint data.
+   */
+  void storeCheckpoint(final CheckpointInfo info, final Checkpoint checkpoint)
+      throws StatefulStorageException;
+
+  /**
+   * Retrieve instance checkpoint.
+   * @param info The information (reference key) for the checkpoint partition.
+   * @return The checkpoint data from the specified blob id.
+   */
+  Checkpoint restoreCheckpoint(final CheckpointInfo info)
+        throws StatefulStorageException;
+
+  /**
+   * Store medata data for component. Ideally in distributed storages this function should only
+   * be called once for each component. In local storages, the function should be called by
+   * every instance and the data should be stored with the checkpoint data for each partition.
+   * @param info The information (reference key) for the checkpoint partition.
+   * @param metadata The checkpoint metadata from a component.
+   */
+  void storeComponentMetaData(final CheckpointInfo info, final CheckpointMetadata metadata)
+      throws StatefulStorageException;
+
+  /**
+   * Retrieve component metadata.
+   * @param info The information (reference key) for the checkpoint partition.
+   * @return The checkpoint metadata for the component.
+   */
+  CheckpointMetadata restoreComponentMetadata(final CheckpointInfo info)
+      throws StatefulStorageException;
+
+  /**
+   * Dispose checkpoints.
+   * @param oldestCheckpointPreserved The oldest checkpoint id to be preserved. All checkpoints
+   *        before this id should be deleted.
+   * @param deleteAll Ignore the checkpoint Id and delete all checkpoints.
+   */
+  void dispose(String oldestCheckpointPreserved, boolean deleteAll)
+      throws StatefulStorageException;
 }
diff --git a/heron/statefulstorages/src/java/org/apache/heron/statefulstorage/dlog/DlogStorage.java b/heron/statefulstorages/src/java/org/apache/heron/statefulstorage/dlog/DlogStorage.java
index a0d3247eaf..9999f36b2c 100644
--- a/heron/statefulstorages/src/java/org/apache/heron/statefulstorage/dlog/DlogStorage.java
+++ b/heron/statefulstorages/src/java/org/apache/heron/statefulstorage/dlog/DlogStorage.java
@@ -37,8 +37,9 @@
 import org.apache.heron.dlog.DLInputStream;
 import org.apache.heron.dlog.DLOutputStream;
 import org.apache.heron.proto.ckptmgr.CheckpointManager;
-import org.apache.heron.proto.system.PhysicalPlans;
 import org.apache.heron.spi.statefulstorage.Checkpoint;
+import org.apache.heron.spi.statefulstorage.CheckpointInfo;
+import org.apache.heron.spi.statefulstorage.CheckpointMetadata;
 import org.apache.heron.spi.statefulstorage.IStatefulStorage;
 import org.apache.heron.spi.statefulstorage.StatefulStorageException;
 
@@ -52,6 +53,7 @@
   private String checkpointNamespaceUriStr;
   private URI checkpointNamespaceUri;
   private int numReplicas = 3;
+  private String topologyName;
 
   // the namespace instance
   private final Supplier<NamespaceBuilder> nsBuilderSupplier;
@@ -66,11 +68,13 @@ public DlogStorage(Supplier<NamespaceBuilder> nsBuilderSupplier) {
   }
 
   @Override
-  public void init(Map<String, Object> conf) throws StatefulStorageException {
+  public void init(String topology, Map<String, Object> conf)
+      throws StatefulStorageException {
 
     LOG.info("Initializing ... Config: " + conf.toString());
     LOG.info("Class path: " + System.getProperty("java.class.path"));
 
+    this.topologyName = topology;
     checkpointNamespaceUriStr = (String) conf.get(NS_URI_KEY);
     checkpointNamespaceUri = URI.create(checkpointNamespaceUriStr);
     Integer numReplicasValue = (Integer) conf.get(NUM_REPLICAS_KEY);
@@ -128,12 +132,13 @@ public void close() {
   }
 
   @Override
-  public void store(Checkpoint checkpoint) throws StatefulStorageException {
+  public void storeCheckpoint(CheckpointInfo info, Checkpoint checkpoint)
+      throws StatefulStorageException {
     String checkpointPath = getCheckpointPath(
-        checkpoint.getTopologyName(),
-        checkpoint.getCheckpointId(),
-        checkpoint.getComponent(),
-        checkpoint.getTaskId());
+        topologyName,
+        info.getCheckpointId(),
+        info.getComponent(),
+        info.getInstanceId());
 
     OutputStream out = null;
     try {
@@ -147,15 +152,13 @@ public void store(Checkpoint checkpoint) throws StatefulStorageException {
   }
 
   @Override
-  public Checkpoint restore(String topologyName,
-                            String checkpointId,
-                            PhysicalPlans.Instance instanceInfo)
+  public Checkpoint restoreCheckpoint(CheckpointInfo info)
       throws StatefulStorageException {
     String checkpointPath = getCheckpointPath(
         topologyName,
-        checkpointId,
-        instanceInfo.getInfo().getComponentName(),
-        instanceInfo.getInfo().getTaskId());
+        info.getCheckpointId(),
+        info.getComponent(),
+        info.getInstanceId());
 
     InputStream in = null;
     CheckpointManager.InstanceStateCheckpoint state;
@@ -168,13 +171,24 @@ public Checkpoint restore(String topologyName,
       SysUtils.closeIgnoringExceptions(in);
     }
 
-    return new Checkpoint(topologyName, instanceInfo, state);
+    return new Checkpoint(state);
+  }
+
+  @Override
+  public void storeComponentMetaData(CheckpointInfo info, CheckpointMetadata metadata)
+      throws StatefulStorageException {
+    // TODO(nwang): To implement
+  }
+
+  @Override
+  public CheckpointMetadata restoreComponentMetadata(CheckpointInfo info)
+      throws StatefulStorageException {
+    // TODO(nwang): To implement
+    return null;
   }
 
   @Override
-  public void dispose(String topologyName,
-                      String oldestCheckpointId,
-                      boolean deleteAll)
+  public void dispose(String oldestCheckpointId, boolean deleteAll)
       throws StatefulStorageException {
 
     // Currently dlog doesn't support recursive deletion. so we have to fetch all the checkpoints
diff --git a/heron/statefulstorages/src/java/org/apache/heron/statefulstorage/hdfs/HDFSStorage.java b/heron/statefulstorages/src/java/org/apache/heron/statefulstorage/hdfs/HDFSStorage.java
index 1c4bc0a086..a1df8e8b40 100644
--- a/heron/statefulstorages/src/java/org/apache/heron/statefulstorage/hdfs/HDFSStorage.java
+++ b/heron/statefulstorages/src/java/org/apache/heron/statefulstorage/hdfs/HDFSStorage.java
@@ -31,8 +31,9 @@
 import org.apache.hadoop.fs.Path;
 import org.apache.heron.common.basics.SysUtils;
 import org.apache.heron.proto.ckptmgr.CheckpointManager;
-import org.apache.heron.proto.system.PhysicalPlans;
 import org.apache.heron.spi.statefulstorage.Checkpoint;
+import org.apache.heron.spi.statefulstorage.CheckpointInfo;
+import org.apache.heron.spi.statefulstorage.CheckpointMetadata;
 import org.apache.heron.spi.statefulstorage.IStatefulStorage;
 import org.apache.heron.spi.statefulstorage.StatefulStorageException;
 
@@ -46,12 +47,15 @@
 
   private String checkpointRootPath;
   private FileSystem fileSystem;
+  private String topologyName;
 
   @Override
-  public void init(Map<String, Object> conf) throws StatefulStorageException {
+  public void init(String topology, final Map<String, Object> conf)
+      throws StatefulStorageException {
     LOG.info("Initializing... Config: " + conf.toString());
     LOG.info("Class path: " + System.getProperty("java.class.path"));
 
+    this.topologyName = topology;
     checkpointRootPath = (String) conf.get(ROOT_PATH_KEY);
 
     // Notice, we pass the config folder via classpath
@@ -73,17 +77,16 @@ public void close() {
   }
 
   @Override
-  public void store(Checkpoint checkpoint) throws StatefulStorageException {
-    Path path = new Path(getCheckpointPath(checkpoint.getTopologyName(),
-                                           checkpoint.getCheckpointId(),
-                                           checkpoint.getComponent(),
-                                           checkpoint.getTaskId()));
+  public void storeCheckpoint(CheckpointInfo info, Checkpoint checkpoint)
+      throws StatefulStorageException {
+    Path path = new Path(getCheckpointPath(info.getCheckpointId(),
+                                           info.getComponent(),
+                                           info.getInstanceId()));
 
     // We need to ensure the existence of directories structure,
     // since it is not guaranteed that FileSystem.create(..) always creates parents' dirs.
-    String checkpointDir = getCheckpointDir(checkpoint.getTopologyName(),
-                                            checkpoint.getCheckpointId(),
-                                            checkpoint.getComponent());
+    String checkpointDir = getCheckpointDir(info.getCheckpointId(),
+                                            info.getComponent());
     createDir(checkpointDir);
 
     FSDataOutputStream out = null;
@@ -98,11 +101,11 @@ public void store(Checkpoint checkpoint) throws StatefulStorageException {
   }
 
   @Override
-  public Checkpoint restore(String topologyName, String checkpointId,
-                            PhysicalPlans.Instance instanceInfo) throws StatefulStorageException {
-    Path path = new Path(getCheckpointPath(topologyName, checkpointId,
-                                           instanceInfo.getInfo().getComponentName(),
-                                           instanceInfo.getInfo().getTaskId()));
+  public Checkpoint restoreCheckpoint(CheckpointInfo info)
+      throws StatefulStorageException {
+    Path path = new Path(getCheckpointPath(info.getCheckpointId(),
+                                           info.getComponent(),
+                                           info.getInstanceId()));
 
     FSDataInputStream in = null;
     CheckpointManager.InstanceStateCheckpoint state = null;
@@ -115,13 +118,26 @@ public Checkpoint restore(String topologyName, String checkpointId,
     } finally {
       SysUtils.closeIgnoringExceptions(in);
     }
-    return new Checkpoint(topologyName, instanceInfo, state);
+    return new Checkpoint(state);
   }
 
   @Override
-  public void dispose(String topologyName, String oldestCheckpointPreserved,
-                      boolean deleteAll) throws StatefulStorageException {
-    String topologyCheckpointRoot = getTopologyCheckpointRoot(topologyName);
+  public void storeComponentMetaData(CheckpointInfo info, CheckpointMetadata metadata)
+      throws StatefulStorageException {
+    // TODO(nwang): To implement
+  }
+
+  @Override
+  public CheckpointMetadata restoreComponentMetadata(CheckpointInfo info)
+      throws StatefulStorageException {
+    // TODO(nwang): To implement
+    return null;
+  }
+
+  @Override
+  public void dispose(String oldestCheckpointPreserved, boolean deleteAll)
+      throws StatefulStorageException {
+    String topologyCheckpointRoot = getTopologyCheckpointRoot();
     Path topologyRootPath = new Path(topologyCheckpointRoot);
 
     if (deleteAll) {
@@ -177,18 +193,16 @@ protected void createDir(String dir) throws StatefulStorageException {
     }
   }
 
-  private String getTopologyCheckpointRoot(String topologyName) {
+  private String getTopologyCheckpointRoot() {
     return String.format("%s/%s", checkpointRootPath, topologyName);
   }
 
-  private String getCheckpointDir(String topologyName, String checkpointId, String componentName) {
+  private String getCheckpointDir(String checkpointId, String componentName) {
     return String.format("%s/%s/%s",
-        getTopologyCheckpointRoot(topologyName), checkpointId, componentName);
+        getTopologyCheckpointRoot(), checkpointId, componentName);
   }
 
-  private String getCheckpointPath(String topologyName, String checkpointId,
-                                   String componentName, int taskId) {
-    return String.format("%s/%d", getCheckpointDir(topologyName, checkpointId, componentName),
-                         taskId);
+  private String getCheckpointPath(String checkpointId, String componentName, int taskId) {
+    return String.format("%s/%d", getCheckpointDir(checkpointId, componentName), taskId);
   }
 }
diff --git a/heron/statefulstorages/src/java/org/apache/heron/statefulstorage/localfs/LocalFileSystemStorage.java b/heron/statefulstorages/src/java/org/apache/heron/statefulstorage/localfs/LocalFileSystemStorage.java
index 60fdc53d6d..f7c386a587 100644
--- a/heron/statefulstorages/src/java/org/apache/heron/statefulstorage/localfs/LocalFileSystemStorage.java
+++ b/heron/statefulstorages/src/java/org/apache/heron/statefulstorage/localfs/LocalFileSystemStorage.java
@@ -28,8 +28,9 @@
 
 import org.apache.heron.common.basics.FileUtils;
 import org.apache.heron.proto.ckptmgr.CheckpointManager;
-import org.apache.heron.proto.system.PhysicalPlans;
 import org.apache.heron.spi.statefulstorage.Checkpoint;
+import org.apache.heron.spi.statefulstorage.CheckpointInfo;
+import org.apache.heron.spi.statefulstorage.CheckpointMetadata;
 import org.apache.heron.spi.statefulstorage.IStatefulStorage;
 import org.apache.heron.spi.statefulstorage.StatefulStorageException;
 
@@ -42,10 +43,13 @@
   private static final int DEFAULT_MAX_CHECKPOINTS = 10;
 
   private String checkpointRootPath;
+  private String topologyName;
   private int maxCheckpoints;
 
   @Override
-  public void init(Map<String, Object> conf) throws StatefulStorageException {
+  public void init(String topology, final Map<String, Object> conf)
+      throws StatefulStorageException {
+    this.topologyName = topology;
     checkpointRootPath = (String) conf.get(ROOT_PATH_KEY);
     maxCheckpoints = (int) conf.getOrDefault(MAX_CHECKPOINTS_KEY, DEFAULT_MAX_CHECKPOINTS);
 
@@ -63,20 +67,20 @@ public void close() {
   }
 
   @Override
-  public void store(Checkpoint checkpoint) throws StatefulStorageException {
+  public void storeCheckpoint(CheckpointInfo info, Checkpoint checkpoint)
+      throws StatefulStorageException {
     // heron doesn't clean checkpoints stored on local disk automatically
     // localFS cleans checkpoints before store and limits the number of checkpoints saved
-    String rootPath = getTopologyCheckpointRoot(checkpoint.getTopologyName());
+    String rootPath = getTopologyCheckpointRoot();
     cleanCheckpoints(new File(rootPath), maxCheckpoints);
 
-    String path = getCheckpointPath(checkpoint.getTopologyName(), checkpoint.getCheckpointId(),
-                                    checkpoint.getComponent(), checkpoint.getTaskId());
+    String path = getCheckpointPath(info.getCheckpointId(),
+                                    info.getComponent(), info.getInstanceId());
 
     // We would try to create but we would not enforce this operation successful,
     // since it is possible already created by others
-    String checkpointDir = getCheckpointDir(checkpoint.getTopologyName(),
-                                            checkpoint.getCheckpointId(),
-                                            checkpoint.getComponent());
+    String checkpointDir = getCheckpointDir(info.getCheckpointId(),
+                                            info.getComponent());
     FileUtils.createDirectory(checkpointDir);
 
     // Do a check after the attempt
@@ -93,11 +97,10 @@ public void store(Checkpoint checkpoint) throws StatefulStorageException {
   }
 
   @Override
-  public Checkpoint restore(String topologyName, String checkpointId,
-                            PhysicalPlans.Instance instanceInfo) throws StatefulStorageException {
-    String path = getCheckpointPath(topologyName, checkpointId,
-                                    instanceInfo.getInfo().getComponentName(),
-                                    instanceInfo.getInfo().getTaskId());
+  public Checkpoint restoreCheckpoint(CheckpointInfo info)
+      throws StatefulStorageException {
+    String path = getCheckpointPath(info.getCheckpointId(), info.getComponent(),
+                                    info.getInstanceId());
 
     byte[] res = FileUtils.readFromFile(path);
     if (res.length != 0) {
@@ -109,16 +112,29 @@ public Checkpoint restore(String topologyName, String checkpointId,
       } catch (InvalidProtocolBufferException e) {
         throw new StatefulStorageException("Failed to parse the data", e);
       }
-      return new Checkpoint(topologyName, instanceInfo, state);
+      return new Checkpoint(state);
     } else {
       throw new StatefulStorageException("Failed to parse the data");
     }
   }
 
   @Override
-  public void dispose(String topologyName, String oldestCheckpointPreserved,
-                      boolean deleteAll) throws StatefulStorageException {
-    String topologyCheckpointRoot = getTopologyCheckpointRoot(topologyName);
+  public void storeComponentMetaData(CheckpointInfo info, CheckpointMetadata metadata)
+      throws StatefulStorageException {
+    // TODO(nwang): To implement
+  }
+
+  @Override
+  public CheckpointMetadata restoreComponentMetadata(CheckpointInfo info)
+      throws StatefulStorageException {
+    // TODO(nwang): To implement
+    return null;
+  }
+
+  @Override
+  public void dispose(String oldestCheckpointPreserved, boolean deleteAll)
+      throws StatefulStorageException {
+    String topologyCheckpointRoot = getTopologyCheckpointRoot();
 
     if (deleteAll) {
       // Clean all checkpoint states
@@ -148,6 +164,10 @@ public void dispose(String topologyName, String oldestCheckpointPreserved,
     }
   }
 
+  private String getTopologyCheckpointRoot() {
+    return String.format("%s/%s", checkpointRootPath, topologyName);
+  }
+
   protected void cleanCheckpoints(File rootFile, int remaining) throws StatefulStorageException {
     if (FileUtils.isDirectoryExists(rootFile.getAbsolutePath())
         && FileUtils.hasChildren(rootFile.getAbsolutePath())) {
@@ -166,18 +186,11 @@ protected void cleanCheckpoints(File rootFile, int remaining) throws StatefulSto
     }
   }
 
-  private String getTopologyCheckpointRoot(String topologyName) {
-    return String.format("%s/%s", checkpointRootPath, topologyName);
-  }
-
-  private String getCheckpointDir(String topologyName, String checkpointId, String componentName) {
-    return String.format("%s/%s/%s",
-        getTopologyCheckpointRoot(topologyName), checkpointId, componentName);
+  private String getCheckpointDir(String checkpointId, String componentName) {
+    return String.format("%s/%s/%s", getTopologyCheckpointRoot(), checkpointId, componentName);
   }
 
-  private String getCheckpointPath(String topologyName, String checkpointId,
-                                   String componentName, int taskId) {
-    return String.format("%s/%d", getCheckpointDir(topologyName, checkpointId, componentName),
-                         taskId);
+  private String getCheckpointPath(String checkpointId, String componentName, int taskId) {
+    return String.format("%s/%d", getCheckpointDir(checkpointId, componentName), taskId);
   }
 }
diff --git a/heron/statefulstorages/tests/java/org/apache/heron/statefulstorage/dlog/DlogStorageTest.java b/heron/statefulstorages/tests/java/org/apache/heron/statefulstorage/dlog/DlogStorageTest.java
index d9102b10be..6e55c3c57d 100644
--- a/heron/statefulstorages/tests/java/org/apache/heron/statefulstorage/dlog/DlogStorageTest.java
+++ b/heron/statefulstorages/tests/java/org/apache/heron/statefulstorage/dlog/DlogStorageTest.java
@@ -43,6 +43,7 @@
 import org.apache.heron.proto.ckptmgr.CheckpointManager;
 import org.apache.heron.proto.system.PhysicalPlans;
 import org.apache.heron.spi.statefulstorage.Checkpoint;
+import org.apache.heron.spi.statefulstorage.CheckpointInfo;
 import org.apache.heron.statefulstorage.StatefulStorageTestContext;
 
 import static org.junit.Assert.assertEquals;
@@ -63,7 +64,7 @@
   private static final String ROOT_URI = "distributedlog://127.0.0.1/heron/statefulstorage";
 
   private PhysicalPlans.Instance instance;
-  private CheckpointManager.InstanceStateCheckpoint instanceStateCheckpoint;
+  private CheckpointManager.InstanceStateCheckpoint checkpointPartition;
 
   private DlogStorage dlogStorage;
   private NamespaceBuilder mockNsBuilder;
@@ -82,11 +83,11 @@ public void before() throws Exception {
     when(mockNsBuilder.build()).thenReturn(mockNamespace);
 
     dlogStorage = new DlogStorage(() -> mockNsBuilder);
-    dlogStorage.init(config);
+    dlogStorage.init(StatefulStorageTestContext.TOPOLOGY_NAME, config);
     dlogStorage = spy(dlogStorage);
 
     instance = StatefulStorageTestContext.getInstance();
-    instanceStateCheckpoint = StatefulStorageTestContext.getInstanceStateCheckpoint();
+    checkpointPartition = StatefulStorageTestContext.getInstanceStateCheckpoint();
   }
 
   @After
@@ -100,15 +101,16 @@ public void testStore() throws Exception {
     CheckpointManager.InstanceStateCheckpoint mockCheckpointState =
         mock(CheckpointManager.InstanceStateCheckpoint.class);
 
-    Checkpoint checkpoint =
-        new Checkpoint(StatefulStorageTestContext.TOPOLOGY_NAME, instance, mockCheckpointState);
+    final CheckpointInfo info = new CheckpointInfo(
+        StatefulStorageTestContext.CHECKPOINT_ID, instance);
+    Checkpoint checkpoint = new Checkpoint(mockCheckpointState);
 
     DistributedLogManager mockDLM = mock(DistributedLogManager.class);
     when(mockNamespace.openLog(anyString())).thenReturn(mockDLM);
     AppendOnlyStreamWriter mockWriter = mock(AppendOnlyStreamWriter.class);
     when(mockDLM.getAppendOnlyStreamWriter()).thenReturn(mockWriter);
 
-    dlogStorage.store(checkpoint);
+    dlogStorage.storeCheckpoint(info, checkpoint);
 
     verify(mockWriter).markEndOfStream();
     verify(mockWriter).close();
@@ -116,22 +118,18 @@ public void testStore() throws Exception {
 
   @Test
   public void testRestore() throws Exception {
-    Checkpoint restoreCheckpoint =
-        new Checkpoint(StatefulStorageTestContext.TOPOLOGY_NAME, instance, instanceStateCheckpoint);
-
     InputStream mockInputStream = mock(InputStream.class);
     doReturn(mockInputStream).when(dlogStorage).openInputStream(anyString());
 
     PowerMockito.spy(CheckpointManager.InstanceStateCheckpoint.class);
-    PowerMockito.doReturn(instanceStateCheckpoint)
+    PowerMockito.doReturn(checkpointPartition)
         .when(CheckpointManager.InstanceStateCheckpoint.class,
             "parseFrom", mockInputStream);
 
-    dlogStorage.restore(
-        StatefulStorageTestContext.TOPOLOGY_NAME,
-        StatefulStorageTestContext.CHECKPOINT_ID,
-        instance);
-    assertEquals(restoreCheckpoint.getCheckpoint(), instanceStateCheckpoint);
+    final CheckpointInfo info = new CheckpointInfo(
+        StatefulStorageTestContext.CHECKPOINT_ID, instance);
+    Checkpoint restoreCheckpoint = dlogStorage.restoreCheckpoint(info);
+    assertEquals(restoreCheckpoint.getCheckpoint(), checkpointPartition);
   }
 
   @Test
@@ -167,10 +165,7 @@ public void testDiposeAll() throws Exception {
     when(mockCheckpoint1.getLogs()).thenReturn(chkp1Tasks.iterator());
     when(mockCheckpoint2.getLogs()).thenReturn(chkp2Tasks.iterator());
 
-    dlogStorage.dispose(
-        StatefulStorageTestContext.TOPOLOGY_NAME,
-        "checkpoint0",
-        true);
+    dlogStorage.dispose("checkpoint0", true);
 
     verify(mockCheckpoint1, times(1)).deleteLog(eq("component1_task1"));
     verify(mockCheckpoint1, times(1)).deleteLog(eq("component1_task2"));
@@ -211,10 +206,7 @@ public void testDiposeNone() throws Exception {
     when(mockCheckpoint1.getLogs()).thenReturn(chkp1Tasks.iterator());
     when(mockCheckpoint2.getLogs()).thenReturn(chkp2Tasks.iterator());
 
-    dlogStorage.dispose(
-        StatefulStorageTestContext.TOPOLOGY_NAME,
-        "checkpoint0",
-        false);
+    dlogStorage.dispose("checkpoint0", false);
 
     verify(mockCheckpoint1, times(0)).deleteLog(eq("component1_task1"));
     verify(mockCheckpoint1, times(0)).deleteLog(eq("component1_task2"));
@@ -255,10 +247,7 @@ public void testDiposePartial() throws Exception {
     when(mockCheckpoint1.getLogs()).thenReturn(chkp1Tasks.iterator());
     when(mockCheckpoint2.getLogs()).thenReturn(chkp2Tasks.iterator());
 
-    dlogStorage.dispose(
-        StatefulStorageTestContext.TOPOLOGY_NAME,
-        "checkpoint2",
-        false);
+    dlogStorage.dispose("checkpoint2", false);
 
     verify(mockCheckpoint1, times(1)).deleteLog(eq("component1_task1"));
     verify(mockCheckpoint1, times(1)).deleteLog(eq("component1_task2"));
diff --git a/heron/statefulstorages/tests/java/org/apache/heron/statefulstorage/hdfs/HDFSStorageTest.java b/heron/statefulstorages/tests/java/org/apache/heron/statefulstorage/hdfs/HDFSStorageTest.java
index 1eeb4cc176..ff65f6eaee 100644
--- a/heron/statefulstorages/tests/java/org/apache/heron/statefulstorage/hdfs/HDFSStorageTest.java
+++ b/heron/statefulstorages/tests/java/org/apache/heron/statefulstorage/hdfs/HDFSStorageTest.java
@@ -41,6 +41,7 @@
 import org.apache.heron.proto.ckptmgr.CheckpointManager;
 import org.apache.heron.proto.system.PhysicalPlans;
 import org.apache.heron.spi.statefulstorage.Checkpoint;
+import org.apache.heron.spi.statefulstorage.CheckpointInfo;
 import org.apache.heron.statefulstorage.StatefulStorageTestContext;
 
 import static org.junit.Assert.assertEquals;
@@ -77,7 +78,7 @@ public void before() throws Exception {
     PowerMockito.doReturn(mockFileSystem).when(FileSystem.class, "get", any(Configuration.class));
 
     hdfsStorage = spy(HDFSStorage.class);
-    hdfsStorage.init(config);
+    hdfsStorage.init(StatefulStorageTestContext.TOPOLOGY_NAME, config);
 
     instance = StatefulStorageTestContext.getInstance();
     instanceCheckpointState = StatefulStorageTestContext.getInstanceStateCheckpoint();
@@ -94,41 +95,40 @@ public void testStore() throws Exception {
     CheckpointManager.InstanceStateCheckpoint mockCheckpointState =
         mock(CheckpointManager.InstanceStateCheckpoint.class);
 
-    Checkpoint checkpoint =
-        new Checkpoint(StatefulStorageTestContext.TOPOLOGY_NAME, instance, mockCheckpointState);
+    Checkpoint checkpoint = new Checkpoint(mockCheckpointState);
 
     FSDataOutputStream mockFSDateOutputStream = mock(FSDataOutputStream.class);
     when(mockFileSystem.create(any(Path.class))).thenReturn(mockFSDateOutputStream);
 
     doNothing().when(hdfsStorage).createDir(anyString());
 
-    hdfsStorage.store(checkpoint);
+    final CheckpointInfo info = new CheckpointInfo(
+        StatefulStorageTestContext.CHECKPOINT_ID, instance);
+    hdfsStorage.storeCheckpoint(info, checkpoint);
 
     verify(mockCheckpointState).writeTo(mockFSDateOutputStream);
   }
 
   @Test
   public void testRestore() throws Exception {
-    Checkpoint restoreCheckpoint =
-        new Checkpoint(StatefulStorageTestContext.TOPOLOGY_NAME, instance, instanceCheckpointState);
-
     FSDataInputStream mockFSDataInputStream = mock(FSDataInputStream.class);
 
     when(mockFileSystem.open(any(Path.class))).thenReturn(mockFSDataInputStream);
 
     PowerMockito.spy(CheckpointManager.InstanceStateCheckpoint.class);
     PowerMockito.doReturn(instanceCheckpointState)
-        .when(CheckpointManager.InstanceStateCheckpoint.class, "parseFrom", mockFSDataInputStream);
+        .when(CheckpointManager.InstanceStateCheckpoint.class, "parseFrom",
+            mockFSDataInputStream);
 
-    hdfsStorage.restore(StatefulStorageTestContext.TOPOLOGY_NAME,
+    final CheckpointInfo info = new CheckpointInfo(
         StatefulStorageTestContext.CHECKPOINT_ID, instance);
+    Checkpoint restoreCheckpoint = hdfsStorage.restoreCheckpoint(info);
     assertEquals(restoreCheckpoint.getCheckpoint(), instanceCheckpointState);
   }
 
   @Test
   public void testDisposeAll() throws Exception {
-    hdfsStorage.dispose(StatefulStorageTestContext.TOPOLOGY_NAME,
-        StatefulStorageTestContext.CHECKPOINT_ID, true);
+    hdfsStorage.dispose(StatefulStorageTestContext.CHECKPOINT_ID, true);
     verify(mockFileSystem).delete(any(Path.class), eq(true));
   }
 
@@ -149,8 +149,7 @@ public void testDisposePartial() throws Exception {
         .thenReturn(mockFileStatus)
         .thenReturn(emptyFileStatus);
 
-    hdfsStorage.dispose(StatefulStorageTestContext.TOPOLOGY_NAME,
-        StatefulStorageTestContext.CHECKPOINT_ID, false);
+    hdfsStorage.dispose(StatefulStorageTestContext.CHECKPOINT_ID, false);
 
     verify(mockFileSystem, times(2)).delete(any(Path.class), eq(true));
   }
diff --git a/heron/statefulstorages/tests/java/org/apache/heron/statefulstorage/localfs/LocalFileSystemStorageTest.java b/heron/statefulstorages/tests/java/org/apache/heron/statefulstorage/localfs/LocalFileSystemStorageTest.java
index 2875f64f83..e6d8481dfb 100644
--- a/heron/statefulstorages/tests/java/org/apache/heron/statefulstorage/localfs/LocalFileSystemStorageTest.java
+++ b/heron/statefulstorages/tests/java/org/apache/heron/statefulstorage/localfs/LocalFileSystemStorageTest.java
@@ -35,6 +35,7 @@
 import org.apache.heron.proto.ckptmgr.CheckpointManager.InstanceStateCheckpoint;
 import org.apache.heron.proto.system.PhysicalPlans;
 import org.apache.heron.spi.statefulstorage.Checkpoint;
+import org.apache.heron.spi.statefulstorage.CheckpointInfo;
 import org.apache.heron.statefulstorage.StatefulStorageTestContext;
 
 import static org.junit.Assert.assertEquals;
@@ -61,7 +62,7 @@ public void before() throws Exception {
     config.put(StatefulStorageTestContext.ROOT_PATH_KEY, StatefulStorageTestContext.ROOT_PATH);
 
     localFileSystemStorage = spy(new LocalFileSystemStorage());
-    localFileSystemStorage.init(config);
+    localFileSystemStorage.init(StatefulStorageTestContext.TOPOLOGY_NAME, config);
 
     instance = StatefulStorageTestContext.getInstance();
     checkpoint = StatefulStorageTestContext.getInstanceStateCheckpoint();
@@ -85,7 +86,9 @@ public void testStore() throws Exception {
     Checkpoint mockCheckpoint = mock(Checkpoint.class);
     when(mockCheckpoint.getCheckpoint()).thenReturn(checkpoint);
 
-    localFileSystemStorage.store(mockCheckpoint);
+    final CheckpointInfo info = new CheckpointInfo(
+        StatefulStorageTestContext.CHECKPOINT_ID, instance);
+    localFileSystemStorage.storeCheckpoint(info, mockCheckpoint);
 
     PowerMockito.verifyStatic(times(1));
     FileUtils.writeToFile(anyString(), eq(checkpoint.toByteArray()), eq(true));
@@ -118,12 +121,11 @@ public void testRestore() throws Exception {
     PowerMockito.doReturn(checkpoint.toByteArray())
         .when(FileUtils.class, "readFromFile", anyString());
 
-    Checkpoint ckpt =
-        new Checkpoint(StatefulStorageTestContext.TOPOLOGY_NAME, instance, checkpoint);
-
-    localFileSystemStorage.restore(StatefulStorageTestContext.TOPOLOGY_NAME,
+    final CheckpointInfo info = new CheckpointInfo(
         StatefulStorageTestContext.CHECKPOINT_ID, instance);
 
+    Checkpoint ckpt = localFileSystemStorage.restoreCheckpoint(info);
+
     assertEquals(checkpoint, ckpt.getCheckpoint());
   }
 
@@ -133,7 +135,7 @@ public void testDispose() throws Exception {
     PowerMockito.doReturn(true).when(FileUtils.class, "deleteDir", anyString());
     PowerMockito.doReturn(false).when(FileUtils.class, "isDirectoryExists", anyString());
 
-    localFileSystemStorage.dispose(StatefulStorageTestContext.TOPOLOGY_NAME, "", true);
+    localFileSystemStorage.dispose("", true);
 
     PowerMockito.verifyStatic(times(1));
     FileUtils.deleteDir(anyString());
diff --git a/heron/stmgr/src/cpp/manager/ckptmgr-client.cpp b/heron/stmgr/src/cpp/manager/ckptmgr-client.cpp
index 2784e67afb..c6e6a043f9 100644
--- a/heron/stmgr/src/cpp/manager/ckptmgr-client.cpp
+++ b/heron/stmgr/src/cpp/manager/ckptmgr-client.cpp
@@ -33,7 +33,8 @@ CkptMgrClient::CkptMgrClient(EventLoop* eventloop, const NetworkOptions& _option
                              std::function<void(const proto::system::Instance&,
                                                 const std::string&)> _ckpt_saved_watcher,
                              std::function<void(proto::system::StatusCode, sp_int32, sp_string,
-                               const proto::ckptmgr::InstanceStateCheckpoint&)> _ckpt_get_watcher,
+                               const proto::ckptmgr::InstanceStateCheckpoint&)>
+                               _ckpt_get_watcher,
                              std::function<void()> _register_watcher)
     : Client(eventloop, _options),
       topology_name_(_topology_name),
diff --git a/heron/stmgr/src/cpp/manager/ckptmgr-client.h b/heron/stmgr/src/cpp/manager/ckptmgr-client.h
index 7ea30e1416..2886cfb73e 100644
--- a/heron/stmgr/src/cpp/manager/ckptmgr-client.h
+++ b/heron/stmgr/src/cpp/manager/ckptmgr-client.h
@@ -37,7 +37,8 @@ class CkptMgrClient : public Client {
                 std::function<void(const proto::system::Instance&,
                                    const std::string&)> _ckpt_saved_watcher,
                 std::function<void(proto::system::StatusCode, sp_int32, sp_string,
-                              const proto::ckptmgr::InstanceStateCheckpoint&)> _ckpt_get_watcher,
+                              const proto::ckptmgr::InstanceStateCheckpoint&)>
+                              _ckpt_get_watcher,
                 std::function<void()> _register_watcher);
   virtual ~CkptMgrClient();
 
diff --git a/heron/stmgr/src/cpp/manager/instance-server.h b/heron/stmgr/src/cpp/manager/instance-server.h
index c85a8949da..5ad504a733 100644
--- a/heron/stmgr/src/cpp/manager/instance-server.h
+++ b/heron/stmgr/src/cpp/manager/instance-server.h
@@ -82,7 +82,7 @@ class InstanceServer : public Server {
   void InitiateStatefulCheckpoint(const sp_string& _checkpoint_tag);
   // Send a RestoreInstanceStateRequest to _task_id asking it to restore itself from _state
   virtual bool SendRestoreInstanceStateRequest(sp_int32 _task_id,
-                                           const proto::ckptmgr::InstanceStateCheckpoint& _state);
+      const proto::ckptmgr::InstanceStateCheckpoint& _state);
   // Send StartInstanceStatefulProcessing message to all instances so that they can start
   // processing
   void SendStartInstanceStatefulProcessing(const std::string& _ckpt_id);
diff --git a/heron/stmgr/src/cpp/manager/stateful-restorer.cpp b/heron/stmgr/src/cpp/manager/stateful-restorer.cpp
index f82c1d3145..538c71d7c8 100644
--- a/heron/stmgr/src/cpp/manager/stateful-restorer.cpp
+++ b/heron/stmgr/src/cpp/manager/stateful-restorer.cpp
@@ -141,8 +141,8 @@ void StatefulRestorer::GetCheckpoints() {
 }
 
 void StatefulRestorer::HandleCheckpointState(proto::system::StatusCode _status, sp_int32 _task_id,
-                                       sp_string _checkpoint_id,
-                                       const proto::ckptmgr::InstanceStateCheckpoint& _state) {
+    sp_string _checkpoint_id,
+    const proto::ckptmgr::InstanceStateCheckpoint& _state) {
   LOG(INFO) << "Got InstanceState from checkpoint mgr for task " << _task_id
             << " and checkpoint " << _state.checkpoint_id();
   multi_count_metric_->scope(METRIC_CKPT_RESPONSES)->incr();
diff --git a/heron/stmgr/src/cpp/manager/stmgr.h b/heron/stmgr/src/cpp/manager/stmgr.h
index d21bc61ff6..a1444dd39e 100644
--- a/heron/stmgr/src/cpp/manager/stmgr.h
+++ b/heron/stmgr/src/cpp/manager/stmgr.h
@@ -82,8 +82,9 @@ class StMgr {
                           proto::system::HeronTupleSet* _message);
   // Called when an instance does checkpoint and sends its checkpoint
   // to the stmgr to save it
-  void HandleStoreInstanceStateCheckpoint(const proto::ckptmgr::InstanceStateCheckpoint& _message,
-                                          const proto::system::Instance& _instance);
+  void HandleStoreInstanceStateCheckpoint(
+      const proto::ckptmgr::InstanceStateCheckpoint& _message,
+      const proto::system::Instance& _instance);
   void DrainInstanceData(sp_int32 _task_id, proto::system::HeronTupleSet2* _tuple);
   // Send checkpoint message to this task_id
   void DrainDownstreamCheckpoint(sp_int32 _task_id,
diff --git a/heron/stmgr/tests/cpp/server/dummy_ckptmgr_client.cpp b/heron/stmgr/tests/cpp/server/dummy_ckptmgr_client.cpp
index 5a4a77f4b9..e2e8761f00 100644
--- a/heron/stmgr/tests/cpp/server/dummy_ckptmgr_client.cpp
+++ b/heron/stmgr/tests/cpp/server/dummy_ckptmgr_client.cpp
@@ -93,10 +93,11 @@ void DummyCkptMgrClient::DummySaveWatcher(const heron::proto::system::Instance&,
                                           const std::string&) {
 }
 
-void DummyCkptMgrClient::DummyGetWatcher(heron::proto::system::StatusCode,
-                                         int32_t,
-                                         const std::string&,
-                                         const heron::proto::ckptmgr::InstanceStateCheckpoint&) {
+void DummyCkptMgrClient::DummyGetWatcher(
+    heron::proto::system::StatusCode,
+    int32_t,
+    const std::string&,
+    const heron::proto::ckptmgr::InstanceStateCheckpoint&) {
 }
 
 void DummyCkptMgrClient::DummyRegisterWatcher() {
diff --git a/heron/stmgr/tests/cpp/server/dummy_instance_server.h b/heron/stmgr/tests/cpp/server/dummy_instance_server.h
index 988aa1ffee..3b739ba644 100644
--- a/heron/stmgr/tests/cpp/server/dummy_instance_server.h
+++ b/heron/stmgr/tests/cpp/server/dummy_instance_server.h
@@ -68,7 +68,7 @@ class DummyInstanceServer : public heron::stmgr::InstanceServer {
   virtual void SetAllInstancesConnectedToUs(bool val) { all_instances_connected_ = val; }
 
   virtual bool SendRestoreInstanceStateRequest(sp_int32 _task_id,
-                                         const heron::proto::ckptmgr::InstanceStateCheckpoint&) {
+      const heron::proto::ckptmgr::InstanceStateCheckpoint&) {
     restore_sent_.insert(_task_id);
     return true;
   }


 

----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
users@infra.apache.org


With regards,
Apache Git Services