You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@heron.apache.org by nl...@apache.org on 2018/07/07 18:23:19 UTC
[incubator-heron] branch master updated: Refactor StatefulStorage
API (#2891)
This is an automated email from the ASF dual-hosted git repository.
nlu90 pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/incubator-heron.git
The following commit(s) were added to refs/heads/master by this push:
new 581344e Refactor StatefulStorage API (#2891)
581344e is described below
commit 581344ea42033a2f5e826d487c355061cee58241
Author: Ning Wang <nw...@twitter.com>
AuthorDate: Sat Jul 7 11:23:16 2018 -0700
Refactor StatefulStorage API (#2891)
* Refactor StatefulStorage
* fix test
* fix CheckpointManagerServerTest
* component metadata is optional
* Update names to be more consistent
* remove data_version and partition_id from protobuf for now
* Remove metadata field from save/get checkpoint requests
* More name and comment changes
* revert refactor changes in CheckpointManagerServer
* Rename InstanceStateCheckpointPartition to InstanceStateCheckpoint
---
.../apache/heron/ckptmgr/CheckpointManager.java | 2 +-
.../heron/ckptmgr/CheckpointManagerServer.java | 69 ++++++++++++--------
.../heron/ckptmgr/CheckpointManagerServerTest.java | 32 ++++++----
heron/proto/ckptmgr.proto | 43 +++++++++----
.../heron/spi/statefulstorage/Checkpoint.java | 38 ++---------
.../{Checkpoint.java => CheckpointInfo.java} | 48 ++++++--------
.../spi/statefulstorage/CheckpointMetadata.java | 47 ++++++++++++++
.../spi/statefulstorage/IStatefulStorage.java | 74 +++++++++++++++-------
.../heron/statefulstorage/dlog/DlogStorage.java | 48 +++++++++-----
.../heron/statefulstorage/hdfs/HDFSStorage.java | 66 +++++++++++--------
.../localfs/LocalFileSystemStorage.java | 71 ++++++++++++---------
.../statefulstorage/dlog/DlogStorageTest.java | 43 +++++--------
.../statefulstorage/hdfs/HDFSStorageTest.java | 25 ++++----
.../localfs/LocalFileSystemStorageTest.java | 16 +++--
heron/stmgr/src/cpp/manager/ckptmgr-client.cpp | 3 +-
heron/stmgr/src/cpp/manager/ckptmgr-client.h | 3 +-
heron/stmgr/src/cpp/manager/instance-server.h | 2 +-
heron/stmgr/src/cpp/manager/stateful-restorer.cpp | 4 +-
heron/stmgr/src/cpp/manager/stmgr.h | 5 +-
.../tests/cpp/server/dummy_ckptmgr_client.cpp | 9 +--
.../stmgr/tests/cpp/server/dummy_instance_server.h | 2 +-
21 files changed, 385 insertions(+), 265 deletions(-)
diff --git a/heron/ckptmgr/src/java/org/apache/heron/ckptmgr/CheckpointManager.java b/heron/ckptmgr/src/java/org/apache/heron/ckptmgr/CheckpointManager.java
index 5c590ff..4798386 100644
--- a/heron/ckptmgr/src/java/org/apache/heron/ckptmgr/CheckpointManager.java
+++ b/heron/ckptmgr/src/java/org/apache/heron/ckptmgr/CheckpointManager.java
@@ -193,7 +193,7 @@ public class CheckpointManager {
}
try {
- statefulStorage.init(
+ statefulStorage.init(topologyName,
Collections.unmodifiableMap(checkpointManagerConfig.getStatefulStorageConfig()));
} catch (StatefulStorageException e) {
throw new CheckpointManagerException(classname + " init threw exception", e);
diff --git a/heron/ckptmgr/src/java/org/apache/heron/ckptmgr/CheckpointManagerServer.java b/heron/ckptmgr/src/java/org/apache/heron/ckptmgr/CheckpointManagerServer.java
index 46ce36b..4b4654a 100644
--- a/heron/ckptmgr/src/java/org/apache/heron/ckptmgr/CheckpointManagerServer.java
+++ b/heron/ckptmgr/src/java/org/apache/heron/ckptmgr/CheckpointManagerServer.java
@@ -34,6 +34,7 @@ import org.apache.heron.common.network.REQID;
import org.apache.heron.proto.ckptmgr.CheckpointManager;
import org.apache.heron.proto.system.Common;
import org.apache.heron.spi.statefulstorage.Checkpoint;
+import org.apache.heron.spi.statefulstorage.CheckpointInfo;
import org.apache.heron.spi.statefulstorage.IStatefulStorage;
import org.apache.heron.spi.statefulstorage.StatefulStorageException;
@@ -48,9 +49,14 @@ public class CheckpointManagerServer extends HeronServer {
private SocketChannel connection;
public CheckpointManagerServer(
- String topologyName, String topologyId, String checkpointMgrId,
- IStatefulStorage statefulStorage, NIOLooper looper, String host,
- int port, HeronSocketOptions options) {
+ String topologyName,
+ String topologyId,
+ String checkpointMgrId,
+ IStatefulStorage statefulStorage,
+ NIOLooper looper,
+ String host,
+ int port,
+ HeronSocketOptions options) {
super(looper, host, port, options);
this.topologyName = topologyName;
@@ -115,8 +121,7 @@ public class CheckpointManagerServer extends HeronServer {
String errorMessage = "";
try {
- statefulStorage.dispose(topologyName,
- request.getOldestCheckpointPreserved(), deleteAll);
+ statefulStorage.dispose(request.getOldestCheckpointPreserved(), deleteAll);
LOG.info("Dispose checkpoint successful");
} catch (StatefulStorageException e) {
errorMessage = String.format("Request to dispose checkpoint failed for oldest Checkpoint "
@@ -203,25 +208,30 @@ public class CheckpointManagerServer extends HeronServer {
SocketChannel channel,
CheckpointManager.SaveInstanceStateRequest request
) {
- Checkpoint checkpoint = new Checkpoint(topologyName, request.getInstance(),
- request.getCheckpoint());
+ CheckpointInfo info =
+ new CheckpointInfo(request.getCheckpoint().getCheckpointId(),
+ request.getInstance());
+ Checkpoint checkpoint = new Checkpoint(request.getCheckpoint());
+
LOG.info(String.format("Got a save checkpoint request for checkpointId %s "
- + " component %s instance %s on connection %s",
- checkpoint.getCheckpointId(), checkpoint.getComponent(),
- checkpoint.getInstance(), channel.socket().getRemoteSocketAddress()));
+ + " component %s instanceId %s on connection %s",
+ info.getCheckpointId(),
+ info.getComponent(),
+ info.getInstanceId(),
+ channel.socket().getRemoteSocketAddress()));
Common.StatusCode statusCode = Common.StatusCode.OK;
String errorMessage = "";
try {
- statefulStorage.store(checkpoint);
- LOG.info(String.format("Saved checkpoint for checkpointId %s compnent %s instance %s",
- checkpoint.getCheckpointId(), checkpoint.getComponent(),
- checkpoint.getInstance()));
+ statefulStorage.storeCheckpoint(info, checkpoint);
+ LOG.info(String.format("Saved checkpoint for checkpointId %s compnent %s instanceId %s",
+ info.getCheckpointId(), info.getComponent(),
+ info.getInstanceId()));
} catch (StatefulStorageException e) {
errorMessage = String.format("Save checkpoint not successful for checkpointId "
- + "%s component %s instance %s",
- checkpoint.getCheckpointId(), checkpoint.getComponent(),
- checkpoint.getInstance());
+ + "%s component %s instanceId %s",
+ info.getCheckpointId(), info.getComponent(),
+ info.getInstanceId());
statusCode = Common.StatusCode.NOTOK;
LOG.log(Level.WARNING, errorMessage, e);
}
@@ -241,11 +251,12 @@ public class CheckpointManagerServer extends HeronServer {
SocketChannel channel,
CheckpointManager.GetInstanceStateRequest request
) {
+ CheckpointInfo info = new CheckpointInfo(request.getCheckpointId(), request.getInstance());
LOG.info(String.format("Got a get checkpoint request for checkpointId %s "
- + " component %s taskId %d on connection %s",
- request.getCheckpointId(),
- request.getInstance().getInfo().getComponentName(),
- request.getInstance().getInfo().getTaskId(),
+ + " component %s instanceId %d on connection %s",
+ info.getCheckpointId(),
+ info.getComponent(),
+ info.getInstanceId(),
channel.socket().getRemoteSocketAddress()));
CheckpointManager.GetInstanceStateResponse.Builder responseBuilder =
@@ -265,18 +276,20 @@ public class CheckpointManagerServer extends HeronServer {
responseBuilder.setCheckpoint(dummyState);
} else {
try {
- Checkpoint checkpoint = statefulStorage.restore(topologyName, request.getCheckpointId(),
- request.getInstance());
+ Checkpoint checkpoint = statefulStorage.restoreCheckpoint(info);
LOG.info(String.format("Get checkpoint successful for checkpointId %s "
- + "component %s taskId %d", checkpoint.getCheckpointId(),
- checkpoint.getComponent(), checkpoint.getTaskId()));
+ + "component %s instanceId %d",
+ info.getCheckpointId(),
+ info.getComponent(),
+ info.getInstanceId()));
// Set the checkpoint-state in response
responseBuilder.setCheckpoint(checkpoint.getCheckpoint());
} catch (StatefulStorageException e) {
errorMessage = String.format("Get checkpoint not successful for checkpointId %s "
- + "component %s taskId %d", request.getCheckpointId(),
- request.getInstance().getInfo().getComponentName(),
- request.getInstance().getInfo().getTaskId());
+ + "component %s instanceId %d",
+ info.getCheckpointId(),
+ info.getComponent(),
+ info.getInstanceId());
LOG.log(Level.WARNING, errorMessage, e);
statusCode = Common.StatusCode.NOTOK;
}
diff --git a/heron/ckptmgr/tests/java/org/apache/heron/ckptmgr/CheckpointManagerServerTest.java b/heron/ckptmgr/tests/java/org/apache/heron/ckptmgr/CheckpointManagerServerTest.java
index 1ce6bff..c6bffc7 100644
--- a/heron/ckptmgr/tests/java/org/apache/heron/ckptmgr/CheckpointManagerServerTest.java
+++ b/heron/ckptmgr/tests/java/org/apache/heron/ckptmgr/CheckpointManagerServerTest.java
@@ -38,6 +38,7 @@ import org.apache.heron.common.testhelpers.HeronServerTester;
import org.apache.heron.proto.ckptmgr.CheckpointManager;
import org.apache.heron.proto.system.PhysicalPlans;
import org.apache.heron.spi.statefulstorage.Checkpoint;
+import org.apache.heron.spi.statefulstorage.CheckpointInfo;
import org.apache.heron.spi.statefulstorage.IStatefulStorage;
import static org.apache.heron.common.testhelpers.HeronServerTester.RESPONSE_RECEIVED_TIMEOUT;
@@ -56,7 +57,8 @@ public class CheckpointManagerServerTest {
private static final String CHECKPOINT_ID = "checkpoint_id";
private static final String CHECKPOINT_MANAGER_ID = "ckptmgr_id";
- private static CheckpointManager.InstanceStateCheckpoint instanceStateCheckpoint;
+ private static CheckpointManager.InstanceStateCheckpoint checkpointPartition;
+ private static CheckpointManager.CheckpointComponentMetadata checkpointComponentMetadata;
private static CheckpointManager.SaveInstanceStateRequest saveInstanceStateRequest;
private static CheckpointManager.GetInstanceStateRequest getInstanceStateRequest;
private static CheckpointManager.CleanStatefulCheckpointRequest cleanStatefulCheckpointRequest;
@@ -90,14 +92,19 @@ public class CheckpointManagerServerTest {
.setInfo(info)
.build();
- instanceStateCheckpoint = CheckpointManager.InstanceStateCheckpoint.newBuilder()
- .setCheckpointId(CHECKPOINT_ID)
- .setState(ByteString.copyFrom(BYTES))
- .build();
+ checkpointPartition = CheckpointManager.InstanceStateCheckpoint.newBuilder()
+ .setCheckpointId(CHECKPOINT_ID)
+ .setState(ByteString.copyFrom(BYTES))
+ .build();
+
+ checkpointComponentMetadata = CheckpointManager.CheckpointComponentMetadata.newBuilder()
+ .setComponentName(COMPONENT_NAME)
+ .setParallelism(2)
+ .build();
saveInstanceStateRequest = CheckpointManager.SaveInstanceStateRequest.newBuilder()
.setInstance(instance)
- .setCheckpoint(instanceStateCheckpoint)
+ .setCheckpoint(checkpointPartition)
.build();
getInstanceStateRequest = CheckpointManager.GetInstanceStateRequest.newBuilder()
@@ -154,7 +161,8 @@ public class CheckpointManagerServerTest {
@Override
public void handleResponse(HeronClient client, StatusCode status,
Object ctx, Message response) throws Exception {
- verify(statefulStorage).store(any(Checkpoint.class));
+ verify(statefulStorage).storeCheckpoint(
+ any(CheckpointInfo.class), any(Checkpoint.class));
assertEquals(CHECKPOINT_ID,
((CheckpointManager.SaveInstanceStateResponse) response).getCheckpointId());
assertEquals(instance,
@@ -166,8 +174,10 @@ public class CheckpointManagerServerTest {
@Test
public void testGetInstanceState() throws Exception {
- final Checkpoint checkpoint = new Checkpoint(TOPOLOGY_NAME, instance, instanceStateCheckpoint);
- when(statefulStorage.restore(TOPOLOGY_NAME, CHECKPOINT_ID, instance)).thenReturn(checkpoint);
+ final CheckpointInfo info = new CheckpointInfo(CHECKPOINT_ID, instance);
+ final Checkpoint checkpoint = new Checkpoint(checkpointPartition);
+ when(statefulStorage.restoreCheckpoint(any(CheckpointInfo.class)))
+ .thenReturn(checkpoint);
runTest(TestRequestHandler.RequestType.GET_INSTANCE_STATE,
new HeronServerTester.SuccessResponseHandler(
@@ -176,7 +186,7 @@ public class CheckpointManagerServerTest {
@Override
public void handleResponse(HeronClient client, StatusCode status,
Object ctx, Message response) throws Exception {
- verify(statefulStorage).restore(TOPOLOGY_NAME, CHECKPOINT_ID, instance);
+ verify(statefulStorage).restoreCheckpoint(info);
assertEquals(checkpoint.getCheckpoint(),
((CheckpointManager.GetInstanceStateResponse) response).getCheckpoint());
}
@@ -193,7 +203,7 @@ public class CheckpointManagerServerTest {
@Override
public void handleResponse(HeronClient client, StatusCode status,
Object ctx, Message response) throws Exception {
- verify(statefulStorage).dispose(anyString(), anyString(), anyBoolean());
+ verify(statefulStorage).dispose(anyString(), anyBoolean());
}
})
);
diff --git a/heron/proto/ckptmgr.proto b/heron/proto/ckptmgr.proto
index daf5556..68935cc 100644
--- a/heron/proto/ckptmgr.proto
+++ b/heron/proto/ckptmgr.proto
@@ -82,7 +82,9 @@ message StatefulConsistentCheckpoints {
repeated StatefulConsistentCheckpoint consistent_checkpoints = 1;
}
-// tmaster -> stmgr messages
+/*
+ * tmaster <-> stmgr messages
+ */
// Message sent to stmgrs by the tmaster to initiate checkpointing
message StartStatefulCheckpoint {
@@ -140,7 +142,9 @@ message StartStmgrStatefulProcessing {
required string checkpoint_id = 1;
}
-// tmaster -> ckptmgr messages
+/*
+ * tmaster -> ckptmgr messages
+ */
// This is the message that a tmaster sends
// when it wants to register itself
@@ -171,7 +175,23 @@ message CleanStatefulCheckpointResponse {
repeated string cleaned_checkpoint_ids = 2;
}
-// stmgr -> ckptmgr messages
+/*
+ * stmgr -> ckptmgr messages
+ */
+
+// This message encapsulates the info associated with
+// state of an instance/partition
+message InstanceStateCheckpoint {
+ required string checkpoint_id = 1;
+ required bytes state = 2;
+}
+
+// This message encapsulates the info associated with
+// checkpoint metadata of a component
+message CheckpointComponentMetadata {
+ required string component_name = 1;
+ required int32 parallelism = 2;
+}
// This is the message that a stmgr sends
// when it wants to register itself with
@@ -194,6 +214,8 @@ message RegisterStMgrResponse {
message SaveInstanceStateRequest {
// Information about the instance whose state this is
required heron.proto.system.Instance instance = 1;
+ // TODO(nwang): currently repartition is not supported and each request
+ // contains a single partition.
required InstanceStateCheckpoint checkpoint = 2;
}
@@ -221,7 +243,9 @@ message GetInstanceStateResponse {
optional InstanceStateCheckpoint checkpoint = 4;
}
-// stmgr -> Instance messages
+/*
+ * stmgr -> Instance messages
+ */
// This is the message that the stmgr sends to its
// local tasks to begin initiating checkpoint
@@ -229,13 +253,6 @@ message InitiateStatefulCheckpoint {
required string checkpoint_id = 1;
}
-// This proto encapsulates the info associated with
-// state of an instance
-message InstanceStateCheckpoint {
- required string checkpoint_id = 1;
- required bytes state = 2;
-}
-
// This is the message that the instance sends
// when it wants stmgr to store its state
message StoreInstanceStateCheckpoint {
@@ -262,7 +279,9 @@ message StartInstanceStatefulProcessing {
required string checkpoint_id = 1;
}
-// stmgr -> stmgr messages
+/*
+ * stmgr -> stmgr messages
+ */
// Once stmgr receives StoreInstanceStateCheckpoint from an instance,
// it sends this message to all of that instance's
diff --git a/heron/spi/src/java/org/apache/heron/spi/statefulstorage/Checkpoint.java b/heron/spi/src/java/org/apache/heron/spi/statefulstorage/Checkpoint.java
index 5796d15..029f4f6 100644
--- a/heron/spi/src/java/org/apache/heron/spi/statefulstorage/Checkpoint.java
+++ b/heron/spi/src/java/org/apache/heron/spi/statefulstorage/Checkpoint.java
@@ -20,51 +20,27 @@
package org.apache.heron.spi.statefulstorage;
import org.apache.heron.proto.ckptmgr.CheckpointManager;
-import org.apache.heron.proto.system.PhysicalPlans;
+/**
+ * The checkpoint data from an instance. It contains the instance information and the checkpoint
+ * data for the instance.
+ * TODO(nwang): Currently each instance has only one partition.
+ */
public class Checkpoint {
- private final String topologyName;
- private final String checkpointId;
- private final PhysicalPlans.Instance instanceInfo;
-
private CheckpointManager.InstanceStateCheckpoint checkpoint;
private int nBytes;
- public Checkpoint(String topologyName, PhysicalPlans.Instance instanceInfo,
- CheckpointManager.InstanceStateCheckpoint checkpoint) {
- this.topologyName = topologyName;
- this.checkpointId = checkpoint.getCheckpointId();
- this.instanceInfo = instanceInfo;
+ public Checkpoint(CheckpointManager.InstanceStateCheckpoint checkpoint) {
this.checkpoint = checkpoint;
this.nBytes = checkpoint.getSerializedSize();
}
- public String getTopologyName() {
- return topologyName;
- }
-
- public String getCheckpointId() {
- return checkpointId;
- }
-
- public String getComponent() {
- return instanceInfo.getInfo().getComponentName();
- }
-
- public String getInstance() {
- return instanceInfo.getInstanceId();
- }
-
- public int getTaskId() {
- return instanceInfo.getInfo().getTaskId();
- }
-
public CheckpointManager.InstanceStateCheckpoint getCheckpoint() {
return this.checkpoint;
}
@Override
public String toString() {
- return String.format("%s %s %s %s", topologyName, checkpointId, getComponent(), getInstance());
+ return String.format("Checkpoint(%d bytes)", nBytes);
}
}
diff --git a/heron/spi/src/java/org/apache/heron/spi/statefulstorage/Checkpoint.java b/heron/spi/src/java/org/apache/heron/spi/statefulstorage/CheckpointInfo.java
similarity index 50%
copy from heron/spi/src/java/org/apache/heron/spi/statefulstorage/Checkpoint.java
copy to heron/spi/src/java/org/apache/heron/spi/statefulstorage/CheckpointInfo.java
index 5796d15..38c6902 100644
--- a/heron/spi/src/java/org/apache/heron/spi/statefulstorage/Checkpoint.java
+++ b/heron/spi/src/java/org/apache/heron/spi/statefulstorage/CheckpointInfo.java
@@ -19,28 +19,25 @@
package org.apache.heron.spi.statefulstorage;
-import org.apache.heron.proto.ckptmgr.CheckpointManager;
import org.apache.heron.proto.system.PhysicalPlans;
-public class Checkpoint {
- private final String topologyName;
+/**
+ * The information for one checkpoint blob. It can be used to reference checkpoint and
+ * metadata (instance id is ignored).
+ */
+public class CheckpointInfo {
+ // The checkpoint ID under the topology.
private final String checkpointId;
- private final PhysicalPlans.Instance instanceInfo;
+ // The name of the component.
+ private final String componentName;
+ // TODO(nwang): Currently only support one partition per instance.
+ private final int instanceId;
- private CheckpointManager.InstanceStateCheckpoint checkpoint;
- private int nBytes;
-
- public Checkpoint(String topologyName, PhysicalPlans.Instance instanceInfo,
- CheckpointManager.InstanceStateCheckpoint checkpoint) {
- this.topologyName = topologyName;
- this.checkpointId = checkpoint.getCheckpointId();
- this.instanceInfo = instanceInfo;
- this.checkpoint = checkpoint;
- this.nBytes = checkpoint.getSerializedSize();
- }
+ public CheckpointInfo(String checkpointId, PhysicalPlans.Instance instance) {
- public String getTopologyName() {
- return topologyName;
+ this.checkpointId = checkpointId;
+ this.componentName = instance.getInfo().getComponentName();
+ this.instanceId = instance.getInfo().getComponentIndex();
}
public String getCheckpointId() {
@@ -48,23 +45,16 @@ public class Checkpoint {
}
public String getComponent() {
- return instanceInfo.getInfo().getComponentName();
- }
-
- public String getInstance() {
- return instanceInfo.getInstanceId();
- }
-
- public int getTaskId() {
- return instanceInfo.getInfo().getTaskId();
+ return componentName;
}
- public CheckpointManager.InstanceStateCheckpoint getCheckpoint() {
- return this.checkpoint;
+ public int getInstanceId() {
+ return instanceId;
}
@Override
public String toString() {
- return String.format("%s %s %s %s", topologyName, checkpointId, getComponent(), getInstance());
+ return String.format("CheckpointInfo(%s %s %d)",
+ checkpointId, componentName, instanceId);
}
}
diff --git a/heron/spi/src/java/org/apache/heron/spi/statefulstorage/CheckpointMetadata.java b/heron/spi/src/java/org/apache/heron/spi/statefulstorage/CheckpointMetadata.java
new file mode 100644
index 0000000..4888ebf
--- /dev/null
+++ b/heron/spi/src/java/org/apache/heron/spi/statefulstorage/CheckpointMetadata.java
@@ -0,0 +1,47 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.heron.spi.statefulstorage;
+
+import org.apache.heron.proto.ckptmgr.CheckpointManager.CheckpointComponentMetadata;
+
+/**
+ * The checkpoint metadata for a component.
+ */
+public class CheckpointMetadata {
+ private CheckpointComponentMetadata metadata;
+ private int nBytes;
+
+ public CheckpointMetadata(String componentName, int parallelism) {
+ this.metadata = CheckpointComponentMetadata.newBuilder()
+ .setComponentName(componentName)
+ .setParallelism(parallelism)
+ .build();
+ this.nBytes = this.metadata.getSerializedSize();
+ }
+
+ public CheckpointComponentMetadata getMetadata() {
+ return metadata;
+ }
+
+ @Override
+ public String toString() {
+ return String.format("CheckpointMetadata(%d bytes)", nBytes);
+ }
+}
diff --git a/heron/spi/src/java/org/apache/heron/spi/statefulstorage/IStatefulStorage.java b/heron/spi/src/java/org/apache/heron/spi/statefulstorage/IStatefulStorage.java
index bd0965f..0698bf4 100644
--- a/heron/spi/src/java/org/apache/heron/spi/statefulstorage/IStatefulStorage.java
+++ b/heron/spi/src/java/org/apache/heron/spi/statefulstorage/IStatefulStorage.java
@@ -21,37 +21,67 @@ package org.apache.heron.spi.statefulstorage;
import java.util.Map;
-import org.apache.heron.proto.system.PhysicalPlans;
-
+/**
+ * The interface of all storage classes for checkpoints.
+ * For each checkpoint, two types of data are stored:
+ * - Component Meta Data (one per component).
+ * - Instance Checkpoint Data (one per instance or patition)
+ * Each Stateful Storage implementation needs to handle them accordingly.
+ */
public interface IStatefulStorage {
/**
* Initialize the Stateful Storage
- *
+ * @param topologyName The name of the topology.
* @param conf An unmodifiableMap containing basic configuration
- * Attempts to modify the returned map,
- * whether direct or via its collection views, result in an UnsupportedOperationException.
*/
- void init(Map<String, Object> conf) throws StatefulStorageException;
+ void init(String topologyName, final Map<String, Object> conf)
+ throws StatefulStorageException;
/**
* Closes the Stateful Storage
*/
void close();
- // Store the checkpoint
- void store(Checkpoint checkpoint) throws StatefulStorageException;
-
- // Retrieve the checkpoint
- Checkpoint restore(String topologyName, String checkpointId,
- PhysicalPlans.Instance instanceInfo) throws StatefulStorageException;
-
- // TODO(mfu): We should refactor all interfaces in IStatefulStorage,
- // TODO(mfu): instead providing Class Checkpoint, we should provide an Context class,
- // TODO(mfu): It should:
- // TODO(mfu): 1. Provide meta data access, like topologyName
- // TODO(mfu): 2. Provide utils method to parse the protobuf object, like getTaskId()
- // TODO(mfu): 3. Common methods, like getCheckpointDir()
- // Dispose the checkpoint
- void dispose(String topologyName, String oldestCheckpointId, boolean deleteAll)
- throws StatefulStorageException;
+ /**
+ * Store instance checkpoint.
+ * @param info The information (reference key) for the checkpoint partition.
+ * @param checkpoint The checkpoint data.
+ */
+ void storeCheckpoint(final CheckpointInfo info, final Checkpoint checkpoint)
+ throws StatefulStorageException;
+
+ /**
+ * Retrieve instance checkpoint.
+ * @param info The information (reference key) for the checkpoint partition.
+ * @return The checkpoint data from the specified blob id.
+ */
+ Checkpoint restoreCheckpoint(final CheckpointInfo info)
+ throws StatefulStorageException;
+
+ /**
+ * Store medata data for component. Ideally in distributed storages this function should only
+ * be called once for each component. In local storages, the function should be called by
+ * every instance and the data should be stored with the checkpoint data for each partition.
+ * @param info The information (reference key) for the checkpoint partition.
+ * @param metadata The checkpoint metadata from a component.
+ */
+ void storeComponentMetaData(final CheckpointInfo info, final CheckpointMetadata metadata)
+ throws StatefulStorageException;
+
+ /**
+ * Retrieve component metadata.
+ * @param info The information (reference key) for the checkpoint partition.
+ * @return The checkpoint metadata for the component.
+ */
+ CheckpointMetadata restoreComponentMetadata(final CheckpointInfo info)
+ throws StatefulStorageException;
+
+ /**
+ * Dispose checkpoints.
+ * @param oldestCheckpointPreserved The oldest checkpoint id to be preserved. All checkpoints
+ * before this id should be deleted.
+ * @param deleteAll Ignore the checkpoint Id and delete all checkpoints.
+ */
+ void dispose(String oldestCheckpointPreserved, boolean deleteAll)
+ throws StatefulStorageException;
}
diff --git a/heron/statefulstorages/src/java/org/apache/heron/statefulstorage/dlog/DlogStorage.java b/heron/statefulstorages/src/java/org/apache/heron/statefulstorage/dlog/DlogStorage.java
index a0d3247..9999f36 100644
--- a/heron/statefulstorages/src/java/org/apache/heron/statefulstorage/dlog/DlogStorage.java
+++ b/heron/statefulstorages/src/java/org/apache/heron/statefulstorage/dlog/DlogStorage.java
@@ -37,8 +37,9 @@ import org.apache.heron.common.basics.SysUtils;
import org.apache.heron.dlog.DLInputStream;
import org.apache.heron.dlog.DLOutputStream;
import org.apache.heron.proto.ckptmgr.CheckpointManager;
-import org.apache.heron.proto.system.PhysicalPlans;
import org.apache.heron.spi.statefulstorage.Checkpoint;
+import org.apache.heron.spi.statefulstorage.CheckpointInfo;
+import org.apache.heron.spi.statefulstorage.CheckpointMetadata;
import org.apache.heron.spi.statefulstorage.IStatefulStorage;
import org.apache.heron.spi.statefulstorage.StatefulStorageException;
@@ -52,6 +53,7 @@ public class DlogStorage implements IStatefulStorage {
private String checkpointNamespaceUriStr;
private URI checkpointNamespaceUri;
private int numReplicas = 3;
+ private String topologyName;
// the namespace instance
private final Supplier<NamespaceBuilder> nsBuilderSupplier;
@@ -66,11 +68,13 @@ public class DlogStorage implements IStatefulStorage {
}
@Override
- public void init(Map<String, Object> conf) throws StatefulStorageException {
+ public void init(String topology, Map<String, Object> conf)
+ throws StatefulStorageException {
LOG.info("Initializing ... Config: " + conf.toString());
LOG.info("Class path: " + System.getProperty("java.class.path"));
+ this.topologyName = topology;
checkpointNamespaceUriStr = (String) conf.get(NS_URI_KEY);
checkpointNamespaceUri = URI.create(checkpointNamespaceUriStr);
Integer numReplicasValue = (Integer) conf.get(NUM_REPLICAS_KEY);
@@ -128,12 +132,13 @@ public class DlogStorage implements IStatefulStorage {
}
@Override
- public void store(Checkpoint checkpoint) throws StatefulStorageException {
+ public void storeCheckpoint(CheckpointInfo info, Checkpoint checkpoint)
+ throws StatefulStorageException {
String checkpointPath = getCheckpointPath(
- checkpoint.getTopologyName(),
- checkpoint.getCheckpointId(),
- checkpoint.getComponent(),
- checkpoint.getTaskId());
+ topologyName,
+ info.getCheckpointId(),
+ info.getComponent(),
+ info.getInstanceId());
OutputStream out = null;
try {
@@ -147,15 +152,13 @@ public class DlogStorage implements IStatefulStorage {
}
@Override
- public Checkpoint restore(String topologyName,
- String checkpointId,
- PhysicalPlans.Instance instanceInfo)
+ public Checkpoint restoreCheckpoint(CheckpointInfo info)
throws StatefulStorageException {
String checkpointPath = getCheckpointPath(
topologyName,
- checkpointId,
- instanceInfo.getInfo().getComponentName(),
- instanceInfo.getInfo().getTaskId());
+ info.getCheckpointId(),
+ info.getComponent(),
+ info.getInstanceId());
InputStream in = null;
CheckpointManager.InstanceStateCheckpoint state;
@@ -168,13 +171,24 @@ public class DlogStorage implements IStatefulStorage {
SysUtils.closeIgnoringExceptions(in);
}
- return new Checkpoint(topologyName, instanceInfo, state);
+ return new Checkpoint(state);
+ }
+
+ @Override
+ public void storeComponentMetaData(CheckpointInfo info, CheckpointMetadata metadata)
+ throws StatefulStorageException {
+ // TODO(nwang): To implement
+ }
+
+ @Override
+ public CheckpointMetadata restoreComponentMetadata(CheckpointInfo info)
+ throws StatefulStorageException {
+ // TODO(nwang): To implement
+ return null;
}
@Override
- public void dispose(String topologyName,
- String oldestCheckpointId,
- boolean deleteAll)
+ public void dispose(String oldestCheckpointId, boolean deleteAll)
throws StatefulStorageException {
// Currently dlog doesn't support recursive deletion. so we have to fetch all the checkpoints
diff --git a/heron/statefulstorages/src/java/org/apache/heron/statefulstorage/hdfs/HDFSStorage.java b/heron/statefulstorages/src/java/org/apache/heron/statefulstorage/hdfs/HDFSStorage.java
index 1c4bc0a..a1df8e8 100644
--- a/heron/statefulstorages/src/java/org/apache/heron/statefulstorage/hdfs/HDFSStorage.java
+++ b/heron/statefulstorages/src/java/org/apache/heron/statefulstorage/hdfs/HDFSStorage.java
@@ -31,8 +31,9 @@ import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.heron.common.basics.SysUtils;
import org.apache.heron.proto.ckptmgr.CheckpointManager;
-import org.apache.heron.proto.system.PhysicalPlans;
import org.apache.heron.spi.statefulstorage.Checkpoint;
+import org.apache.heron.spi.statefulstorage.CheckpointInfo;
+import org.apache.heron.spi.statefulstorage.CheckpointMetadata;
import org.apache.heron.spi.statefulstorage.IStatefulStorage;
import org.apache.heron.spi.statefulstorage.StatefulStorageException;
@@ -46,12 +47,15 @@ public class HDFSStorage implements IStatefulStorage {
private String checkpointRootPath;
private FileSystem fileSystem;
+ private String topologyName;
@Override
- public void init(Map<String, Object> conf) throws StatefulStorageException {
+ public void init(String topology, final Map<String, Object> conf)
+ throws StatefulStorageException {
LOG.info("Initializing... Config: " + conf.toString());
LOG.info("Class path: " + System.getProperty("java.class.path"));
+ this.topologyName = topology;
checkpointRootPath = (String) conf.get(ROOT_PATH_KEY);
// Notice, we pass the config folder via classpath
@@ -73,17 +77,16 @@ public class HDFSStorage implements IStatefulStorage {
}
@Override
- public void store(Checkpoint checkpoint) throws StatefulStorageException {
- Path path = new Path(getCheckpointPath(checkpoint.getTopologyName(),
- checkpoint.getCheckpointId(),
- checkpoint.getComponent(),
- checkpoint.getTaskId()));
+ public void storeCheckpoint(CheckpointInfo info, Checkpoint checkpoint)
+ throws StatefulStorageException {
+ Path path = new Path(getCheckpointPath(info.getCheckpointId(),
+ info.getComponent(),
+ info.getInstanceId()));
// We need to ensure the existence of directories structure,
// since it is not guaranteed that FileSystem.create(..) always creates parents' dirs.
- String checkpointDir = getCheckpointDir(checkpoint.getTopologyName(),
- checkpoint.getCheckpointId(),
- checkpoint.getComponent());
+ String checkpointDir = getCheckpointDir(info.getCheckpointId(),
+ info.getComponent());
createDir(checkpointDir);
FSDataOutputStream out = null;
@@ -98,11 +101,11 @@ public class HDFSStorage implements IStatefulStorage {
}
@Override
- public Checkpoint restore(String topologyName, String checkpointId,
- PhysicalPlans.Instance instanceInfo) throws StatefulStorageException {
- Path path = new Path(getCheckpointPath(topologyName, checkpointId,
- instanceInfo.getInfo().getComponentName(),
- instanceInfo.getInfo().getTaskId()));
+ public Checkpoint restoreCheckpoint(CheckpointInfo info)
+ throws StatefulStorageException {
+ Path path = new Path(getCheckpointPath(info.getCheckpointId(),
+ info.getComponent(),
+ info.getInstanceId()));
FSDataInputStream in = null;
CheckpointManager.InstanceStateCheckpoint state = null;
@@ -115,13 +118,26 @@ public class HDFSStorage implements IStatefulStorage {
} finally {
SysUtils.closeIgnoringExceptions(in);
}
- return new Checkpoint(topologyName, instanceInfo, state);
+ return new Checkpoint(state);
}
@Override
- public void dispose(String topologyName, String oldestCheckpointPreserved,
- boolean deleteAll) throws StatefulStorageException {
- String topologyCheckpointRoot = getTopologyCheckpointRoot(topologyName);
+ public void storeComponentMetaData(CheckpointInfo info, CheckpointMetadata metadata)
+ throws StatefulStorageException {
+ // TODO(nwang): To implement
+ }
+
+ @Override
+ public CheckpointMetadata restoreComponentMetadata(CheckpointInfo info)
+ throws StatefulStorageException {
+ // TODO(nwang): To implement
+ return null;
+ }
+
+ @Override
+ public void dispose(String oldestCheckpointPreserved, boolean deleteAll)
+ throws StatefulStorageException {
+ String topologyCheckpointRoot = getTopologyCheckpointRoot();
Path topologyRootPath = new Path(topologyCheckpointRoot);
if (deleteAll) {
@@ -177,18 +193,16 @@ public class HDFSStorage implements IStatefulStorage {
}
}
- private String getTopologyCheckpointRoot(String topologyName) {
+ private String getTopologyCheckpointRoot() {
return String.format("%s/%s", checkpointRootPath, topologyName);
}
- private String getCheckpointDir(String topologyName, String checkpointId, String componentName) {
+ private String getCheckpointDir(String checkpointId, String componentName) {
return String.format("%s/%s/%s",
- getTopologyCheckpointRoot(topologyName), checkpointId, componentName);
+ getTopologyCheckpointRoot(), checkpointId, componentName);
}
- private String getCheckpointPath(String topologyName, String checkpointId,
- String componentName, int taskId) {
- return String.format("%s/%d", getCheckpointDir(topologyName, checkpointId, componentName),
- taskId);
+ private String getCheckpointPath(String checkpointId, String componentName, int taskId) {
+ return String.format("%s/%d", getCheckpointDir(checkpointId, componentName), taskId);
}
}
diff --git a/heron/statefulstorages/src/java/org/apache/heron/statefulstorage/localfs/LocalFileSystemStorage.java b/heron/statefulstorages/src/java/org/apache/heron/statefulstorage/localfs/LocalFileSystemStorage.java
index 60fdc53..f7c386a 100644
--- a/heron/statefulstorages/src/java/org/apache/heron/statefulstorage/localfs/LocalFileSystemStorage.java
+++ b/heron/statefulstorages/src/java/org/apache/heron/statefulstorage/localfs/LocalFileSystemStorage.java
@@ -28,8 +28,9 @@ import com.google.protobuf.InvalidProtocolBufferException;
import org.apache.heron.common.basics.FileUtils;
import org.apache.heron.proto.ckptmgr.CheckpointManager;
-import org.apache.heron.proto.system.PhysicalPlans;
import org.apache.heron.spi.statefulstorage.Checkpoint;
+import org.apache.heron.spi.statefulstorage.CheckpointInfo;
+import org.apache.heron.spi.statefulstorage.CheckpointMetadata;
import org.apache.heron.spi.statefulstorage.IStatefulStorage;
import org.apache.heron.spi.statefulstorage.StatefulStorageException;
@@ -42,10 +43,13 @@ public class LocalFileSystemStorage implements IStatefulStorage {
private static final int DEFAULT_MAX_CHECKPOINTS = 10;
private String checkpointRootPath;
+ private String topologyName;
private int maxCheckpoints;
@Override
- public void init(Map<String, Object> conf) throws StatefulStorageException {
+ public void init(String topology, final Map<String, Object> conf)
+ throws StatefulStorageException {
+ this.topologyName = topology;
checkpointRootPath = (String) conf.get(ROOT_PATH_KEY);
maxCheckpoints = (int) conf.getOrDefault(MAX_CHECKPOINTS_KEY, DEFAULT_MAX_CHECKPOINTS);
@@ -63,20 +67,20 @@ public class LocalFileSystemStorage implements IStatefulStorage {
}
@Override
- public void store(Checkpoint checkpoint) throws StatefulStorageException {
+ public void storeCheckpoint(CheckpointInfo info, Checkpoint checkpoint)
+ throws StatefulStorageException {
// heron doesn't clean checkpoints stored on local disk automatically
// localFS cleans checkpoints before store and limits the number of checkpoints saved
- String rootPath = getTopologyCheckpointRoot(checkpoint.getTopologyName());
+ String rootPath = getTopologyCheckpointRoot();
cleanCheckpoints(new File(rootPath), maxCheckpoints);
- String path = getCheckpointPath(checkpoint.getTopologyName(), checkpoint.getCheckpointId(),
- checkpoint.getComponent(), checkpoint.getTaskId());
+ String path = getCheckpointPath(info.getCheckpointId(),
+ info.getComponent(), info.getInstanceId());
// We would try to create but we would not enforce this operation successful,
// since it is possible already created by others
- String checkpointDir = getCheckpointDir(checkpoint.getTopologyName(),
- checkpoint.getCheckpointId(),
- checkpoint.getComponent());
+ String checkpointDir = getCheckpointDir(info.getCheckpointId(),
+ info.getComponent());
FileUtils.createDirectory(checkpointDir);
// Do a check after the attempt
@@ -93,11 +97,10 @@ public class LocalFileSystemStorage implements IStatefulStorage {
}
@Override
- public Checkpoint restore(String topologyName, String checkpointId,
- PhysicalPlans.Instance instanceInfo) throws StatefulStorageException {
- String path = getCheckpointPath(topologyName, checkpointId,
- instanceInfo.getInfo().getComponentName(),
- instanceInfo.getInfo().getTaskId());
+ public Checkpoint restoreCheckpoint(CheckpointInfo info)
+ throws StatefulStorageException {
+ String path = getCheckpointPath(info.getCheckpointId(), info.getComponent(),
+ info.getInstanceId());
byte[] res = FileUtils.readFromFile(path);
if (res.length != 0) {
@@ -109,16 +112,29 @@ public class LocalFileSystemStorage implements IStatefulStorage {
} catch (InvalidProtocolBufferException e) {
throw new StatefulStorageException("Failed to parse the data", e);
}
- return new Checkpoint(topologyName, instanceInfo, state);
+ return new Checkpoint(state);
} else {
throw new StatefulStorageException("Failed to parse the data");
}
}
@Override
- public void dispose(String topologyName, String oldestCheckpointPreserved,
- boolean deleteAll) throws StatefulStorageException {
- String topologyCheckpointRoot = getTopologyCheckpointRoot(topologyName);
+ public void storeComponentMetaData(CheckpointInfo info, CheckpointMetadata metadata)
+ throws StatefulStorageException {
+ // TODO(nwang): To implement
+ }
+
+ @Override
+ public CheckpointMetadata restoreComponentMetadata(CheckpointInfo info)
+ throws StatefulStorageException {
+ // TODO(nwang): To implement
+ return null;
+ }
+
+ @Override
+ public void dispose(String oldestCheckpointPreserved, boolean deleteAll)
+ throws StatefulStorageException {
+ String topologyCheckpointRoot = getTopologyCheckpointRoot();
if (deleteAll) {
// Clean all checkpoint states
@@ -148,6 +164,10 @@ public class LocalFileSystemStorage implements IStatefulStorage {
}
}
+ private String getTopologyCheckpointRoot() {
+ return String.format("%s/%s", checkpointRootPath, topologyName);
+ }
+
protected void cleanCheckpoints(File rootFile, int remaining) throws StatefulStorageException {
if (FileUtils.isDirectoryExists(rootFile.getAbsolutePath())
&& FileUtils.hasChildren(rootFile.getAbsolutePath())) {
@@ -166,18 +186,11 @@ public class LocalFileSystemStorage implements IStatefulStorage {
}
}
- private String getTopologyCheckpointRoot(String topologyName) {
- return String.format("%s/%s", checkpointRootPath, topologyName);
- }
-
- private String getCheckpointDir(String topologyName, String checkpointId, String componentName) {
- return String.format("%s/%s/%s",
- getTopologyCheckpointRoot(topologyName), checkpointId, componentName);
+ private String getCheckpointDir(String checkpointId, String componentName) {
+ return String.format("%s/%s/%s", getTopologyCheckpointRoot(), checkpointId, componentName);
}
- private String getCheckpointPath(String topologyName, String checkpointId,
- String componentName, int taskId) {
- return String.format("%s/%d", getCheckpointDir(topologyName, checkpointId, componentName),
- taskId);
+ private String getCheckpointPath(String checkpointId, String componentName, int taskId) {
+ return String.format("%s/%d", getCheckpointDir(checkpointId, componentName), taskId);
}
}
diff --git a/heron/statefulstorages/tests/java/org/apache/heron/statefulstorage/dlog/DlogStorageTest.java b/heron/statefulstorages/tests/java/org/apache/heron/statefulstorage/dlog/DlogStorageTest.java
index d9102b1..6e55c3c 100644
--- a/heron/statefulstorages/tests/java/org/apache/heron/statefulstorage/dlog/DlogStorageTest.java
+++ b/heron/statefulstorages/tests/java/org/apache/heron/statefulstorage/dlog/DlogStorageTest.java
@@ -43,6 +43,7 @@ import org.apache.distributedlog.api.namespace.NamespaceBuilder;
import org.apache.heron.proto.ckptmgr.CheckpointManager;
import org.apache.heron.proto.system.PhysicalPlans;
import org.apache.heron.spi.statefulstorage.Checkpoint;
+import org.apache.heron.spi.statefulstorage.CheckpointInfo;
import org.apache.heron.statefulstorage.StatefulStorageTestContext;
import static org.junit.Assert.assertEquals;
@@ -63,7 +64,7 @@ public class DlogStorageTest {
private static final String ROOT_URI = "distributedlog://127.0.0.1/heron/statefulstorage";
private PhysicalPlans.Instance instance;
- private CheckpointManager.InstanceStateCheckpoint instanceStateCheckpoint;
+ private CheckpointManager.InstanceStateCheckpoint checkpointPartition;
private DlogStorage dlogStorage;
private NamespaceBuilder mockNsBuilder;
@@ -82,11 +83,11 @@ public class DlogStorageTest {
when(mockNsBuilder.build()).thenReturn(mockNamespace);
dlogStorage = new DlogStorage(() -> mockNsBuilder);
- dlogStorage.init(config);
+ dlogStorage.init(StatefulStorageTestContext.TOPOLOGY_NAME, config);
dlogStorage = spy(dlogStorage);
instance = StatefulStorageTestContext.getInstance();
- instanceStateCheckpoint = StatefulStorageTestContext.getInstanceStateCheckpoint();
+ checkpointPartition = StatefulStorageTestContext.getInstanceStateCheckpoint();
}
@After
@@ -100,15 +101,16 @@ public class DlogStorageTest {
CheckpointManager.InstanceStateCheckpoint mockCheckpointState =
mock(CheckpointManager.InstanceStateCheckpoint.class);
- Checkpoint checkpoint =
- new Checkpoint(StatefulStorageTestContext.TOPOLOGY_NAME, instance, mockCheckpointState);
+ final CheckpointInfo info = new CheckpointInfo(
+ StatefulStorageTestContext.CHECKPOINT_ID, instance);
+ Checkpoint checkpoint = new Checkpoint(mockCheckpointState);
DistributedLogManager mockDLM = mock(DistributedLogManager.class);
when(mockNamespace.openLog(anyString())).thenReturn(mockDLM);
AppendOnlyStreamWriter mockWriter = mock(AppendOnlyStreamWriter.class);
when(mockDLM.getAppendOnlyStreamWriter()).thenReturn(mockWriter);
- dlogStorage.store(checkpoint);
+ dlogStorage.storeCheckpoint(info, checkpoint);
verify(mockWriter).markEndOfStream();
verify(mockWriter).close();
@@ -116,22 +118,18 @@ public class DlogStorageTest {
@Test
public void testRestore() throws Exception {
- Checkpoint restoreCheckpoint =
- new Checkpoint(StatefulStorageTestContext.TOPOLOGY_NAME, instance, instanceStateCheckpoint);
-
InputStream mockInputStream = mock(InputStream.class);
doReturn(mockInputStream).when(dlogStorage).openInputStream(anyString());
PowerMockito.spy(CheckpointManager.InstanceStateCheckpoint.class);
- PowerMockito.doReturn(instanceStateCheckpoint)
+ PowerMockito.doReturn(checkpointPartition)
.when(CheckpointManager.InstanceStateCheckpoint.class,
"parseFrom", mockInputStream);
- dlogStorage.restore(
- StatefulStorageTestContext.TOPOLOGY_NAME,
- StatefulStorageTestContext.CHECKPOINT_ID,
- instance);
- assertEquals(restoreCheckpoint.getCheckpoint(), instanceStateCheckpoint);
+ final CheckpointInfo info = new CheckpointInfo(
+ StatefulStorageTestContext.CHECKPOINT_ID, instance);
+ Checkpoint restoreCheckpoint = dlogStorage.restoreCheckpoint(info);
+ assertEquals(restoreCheckpoint.getCheckpoint(), checkpointPartition);
}
@Test
@@ -167,10 +165,7 @@ public class DlogStorageTest {
when(mockCheckpoint1.getLogs()).thenReturn(chkp1Tasks.iterator());
when(mockCheckpoint2.getLogs()).thenReturn(chkp2Tasks.iterator());
- dlogStorage.dispose(
- StatefulStorageTestContext.TOPOLOGY_NAME,
- "checkpoint0",
- true);
+ dlogStorage.dispose("checkpoint0", true);
verify(mockCheckpoint1, times(1)).deleteLog(eq("component1_task1"));
verify(mockCheckpoint1, times(1)).deleteLog(eq("component1_task2"));
@@ -211,10 +206,7 @@ public class DlogStorageTest {
when(mockCheckpoint1.getLogs()).thenReturn(chkp1Tasks.iterator());
when(mockCheckpoint2.getLogs()).thenReturn(chkp2Tasks.iterator());
- dlogStorage.dispose(
- StatefulStorageTestContext.TOPOLOGY_NAME,
- "checkpoint0",
- false);
+ dlogStorage.dispose("checkpoint0", false);
verify(mockCheckpoint1, times(0)).deleteLog(eq("component1_task1"));
verify(mockCheckpoint1, times(0)).deleteLog(eq("component1_task2"));
@@ -255,10 +247,7 @@ public class DlogStorageTest {
when(mockCheckpoint1.getLogs()).thenReturn(chkp1Tasks.iterator());
when(mockCheckpoint2.getLogs()).thenReturn(chkp2Tasks.iterator());
- dlogStorage.dispose(
- StatefulStorageTestContext.TOPOLOGY_NAME,
- "checkpoint2",
- false);
+ dlogStorage.dispose("checkpoint2", false);
verify(mockCheckpoint1, times(1)).deleteLog(eq("component1_task1"));
verify(mockCheckpoint1, times(1)).deleteLog(eq("component1_task2"));
diff --git a/heron/statefulstorages/tests/java/org/apache/heron/statefulstorage/hdfs/HDFSStorageTest.java b/heron/statefulstorages/tests/java/org/apache/heron/statefulstorage/hdfs/HDFSStorageTest.java
index 1eeb4cc..ff65f6e 100644
--- a/heron/statefulstorages/tests/java/org/apache/heron/statefulstorage/hdfs/HDFSStorageTest.java
+++ b/heron/statefulstorages/tests/java/org/apache/heron/statefulstorage/hdfs/HDFSStorageTest.java
@@ -41,6 +41,7 @@ import org.apache.hadoop.fs.Path;
import org.apache.heron.proto.ckptmgr.CheckpointManager;
import org.apache.heron.proto.system.PhysicalPlans;
import org.apache.heron.spi.statefulstorage.Checkpoint;
+import org.apache.heron.spi.statefulstorage.CheckpointInfo;
import org.apache.heron.statefulstorage.StatefulStorageTestContext;
import static org.junit.Assert.assertEquals;
@@ -77,7 +78,7 @@ public class HDFSStorageTest {
PowerMockito.doReturn(mockFileSystem).when(FileSystem.class, "get", any(Configuration.class));
hdfsStorage = spy(HDFSStorage.class);
- hdfsStorage.init(config);
+ hdfsStorage.init(StatefulStorageTestContext.TOPOLOGY_NAME, config);
instance = StatefulStorageTestContext.getInstance();
instanceCheckpointState = StatefulStorageTestContext.getInstanceStateCheckpoint();
@@ -94,41 +95,40 @@ public class HDFSStorageTest {
CheckpointManager.InstanceStateCheckpoint mockCheckpointState =
mock(CheckpointManager.InstanceStateCheckpoint.class);
- Checkpoint checkpoint =
- new Checkpoint(StatefulStorageTestContext.TOPOLOGY_NAME, instance, mockCheckpointState);
+ Checkpoint checkpoint = new Checkpoint(mockCheckpointState);
FSDataOutputStream mockFSDateOutputStream = mock(FSDataOutputStream.class);
when(mockFileSystem.create(any(Path.class))).thenReturn(mockFSDateOutputStream);
doNothing().when(hdfsStorage).createDir(anyString());
- hdfsStorage.store(checkpoint);
+ final CheckpointInfo info = new CheckpointInfo(
+ StatefulStorageTestContext.CHECKPOINT_ID, instance);
+ hdfsStorage.storeCheckpoint(info, checkpoint);
verify(mockCheckpointState).writeTo(mockFSDateOutputStream);
}
@Test
public void testRestore() throws Exception {
- Checkpoint restoreCheckpoint =
- new Checkpoint(StatefulStorageTestContext.TOPOLOGY_NAME, instance, instanceCheckpointState);
-
FSDataInputStream mockFSDataInputStream = mock(FSDataInputStream.class);
when(mockFileSystem.open(any(Path.class))).thenReturn(mockFSDataInputStream);
PowerMockito.spy(CheckpointManager.InstanceStateCheckpoint.class);
PowerMockito.doReturn(instanceCheckpointState)
- .when(CheckpointManager.InstanceStateCheckpoint.class, "parseFrom", mockFSDataInputStream);
+ .when(CheckpointManager.InstanceStateCheckpoint.class, "parseFrom",
+ mockFSDataInputStream);
- hdfsStorage.restore(StatefulStorageTestContext.TOPOLOGY_NAME,
+ final CheckpointInfo info = new CheckpointInfo(
StatefulStorageTestContext.CHECKPOINT_ID, instance);
+ Checkpoint restoreCheckpoint = hdfsStorage.restoreCheckpoint(info);
assertEquals(restoreCheckpoint.getCheckpoint(), instanceCheckpointState);
}
@Test
public void testDisposeAll() throws Exception {
- hdfsStorage.dispose(StatefulStorageTestContext.TOPOLOGY_NAME,
- StatefulStorageTestContext.CHECKPOINT_ID, true);
+ hdfsStorage.dispose(StatefulStorageTestContext.CHECKPOINT_ID, true);
verify(mockFileSystem).delete(any(Path.class), eq(true));
}
@@ -149,8 +149,7 @@ public class HDFSStorageTest {
.thenReturn(mockFileStatus)
.thenReturn(emptyFileStatus);
- hdfsStorage.dispose(StatefulStorageTestContext.TOPOLOGY_NAME,
- StatefulStorageTestContext.CHECKPOINT_ID, false);
+ hdfsStorage.dispose(StatefulStorageTestContext.CHECKPOINT_ID, false);
verify(mockFileSystem, times(2)).delete(any(Path.class), eq(true));
}
diff --git a/heron/statefulstorages/tests/java/org/apache/heron/statefulstorage/localfs/LocalFileSystemStorageTest.java b/heron/statefulstorages/tests/java/org/apache/heron/statefulstorage/localfs/LocalFileSystemStorageTest.java
index 2875f64..e6d8481 100644
--- a/heron/statefulstorages/tests/java/org/apache/heron/statefulstorage/localfs/LocalFileSystemStorageTest.java
+++ b/heron/statefulstorages/tests/java/org/apache/heron/statefulstorage/localfs/LocalFileSystemStorageTest.java
@@ -35,6 +35,7 @@ import org.apache.heron.common.basics.FileUtils;
import org.apache.heron.proto.ckptmgr.CheckpointManager.InstanceStateCheckpoint;
import org.apache.heron.proto.system.PhysicalPlans;
import org.apache.heron.spi.statefulstorage.Checkpoint;
+import org.apache.heron.spi.statefulstorage.CheckpointInfo;
import org.apache.heron.statefulstorage.StatefulStorageTestContext;
import static org.junit.Assert.assertEquals;
@@ -61,7 +62,7 @@ public class LocalFileSystemStorageTest {
config.put(StatefulStorageTestContext.ROOT_PATH_KEY, StatefulStorageTestContext.ROOT_PATH);
localFileSystemStorage = spy(new LocalFileSystemStorage());
- localFileSystemStorage.init(config);
+ localFileSystemStorage.init(StatefulStorageTestContext.TOPOLOGY_NAME, config);
instance = StatefulStorageTestContext.getInstance();
checkpoint = StatefulStorageTestContext.getInstanceStateCheckpoint();
@@ -85,7 +86,9 @@ public class LocalFileSystemStorageTest {
Checkpoint mockCheckpoint = mock(Checkpoint.class);
when(mockCheckpoint.getCheckpoint()).thenReturn(checkpoint);
- localFileSystemStorage.store(mockCheckpoint);
+ final CheckpointInfo info = new CheckpointInfo(
+ StatefulStorageTestContext.CHECKPOINT_ID, instance);
+ localFileSystemStorage.storeCheckpoint(info, mockCheckpoint);
PowerMockito.verifyStatic(times(1));
FileUtils.writeToFile(anyString(), eq(checkpoint.toByteArray()), eq(true));
@@ -118,12 +121,11 @@ public class LocalFileSystemStorageTest {
PowerMockito.doReturn(checkpoint.toByteArray())
.when(FileUtils.class, "readFromFile", anyString());
- Checkpoint ckpt =
- new Checkpoint(StatefulStorageTestContext.TOPOLOGY_NAME, instance, checkpoint);
-
- localFileSystemStorage.restore(StatefulStorageTestContext.TOPOLOGY_NAME,
+ final CheckpointInfo info = new CheckpointInfo(
StatefulStorageTestContext.CHECKPOINT_ID, instance);
+ Checkpoint ckpt = localFileSystemStorage.restoreCheckpoint(info);
+
assertEquals(checkpoint, ckpt.getCheckpoint());
}
@@ -133,7 +135,7 @@ public class LocalFileSystemStorageTest {
PowerMockito.doReturn(true).when(FileUtils.class, "deleteDir", anyString());
PowerMockito.doReturn(false).when(FileUtils.class, "isDirectoryExists", anyString());
- localFileSystemStorage.dispose(StatefulStorageTestContext.TOPOLOGY_NAME, "", true);
+ localFileSystemStorage.dispose("", true);
PowerMockito.verifyStatic(times(1));
FileUtils.deleteDir(anyString());
diff --git a/heron/stmgr/src/cpp/manager/ckptmgr-client.cpp b/heron/stmgr/src/cpp/manager/ckptmgr-client.cpp
index 2784e67..c6e6a04 100644
--- a/heron/stmgr/src/cpp/manager/ckptmgr-client.cpp
+++ b/heron/stmgr/src/cpp/manager/ckptmgr-client.cpp
@@ -33,7 +33,8 @@ CkptMgrClient::CkptMgrClient(EventLoop* eventloop, const NetworkOptions& _option
std::function<void(const proto::system::Instance&,
const std::string&)> _ckpt_saved_watcher,
std::function<void(proto::system::StatusCode, sp_int32, sp_string,
- const proto::ckptmgr::InstanceStateCheckpoint&)> _ckpt_get_watcher,
+ const proto::ckptmgr::InstanceStateCheckpoint&)>
+ _ckpt_get_watcher,
std::function<void()> _register_watcher)
: Client(eventloop, _options),
topology_name_(_topology_name),
diff --git a/heron/stmgr/src/cpp/manager/ckptmgr-client.h b/heron/stmgr/src/cpp/manager/ckptmgr-client.h
index 7ea30e1..2886cfb 100644
--- a/heron/stmgr/src/cpp/manager/ckptmgr-client.h
+++ b/heron/stmgr/src/cpp/manager/ckptmgr-client.h
@@ -37,7 +37,8 @@ class CkptMgrClient : public Client {
std::function<void(const proto::system::Instance&,
const std::string&)> _ckpt_saved_watcher,
std::function<void(proto::system::StatusCode, sp_int32, sp_string,
- const proto::ckptmgr::InstanceStateCheckpoint&)> _ckpt_get_watcher,
+ const proto::ckptmgr::InstanceStateCheckpoint&)>
+ _ckpt_get_watcher,
std::function<void()> _register_watcher);
virtual ~CkptMgrClient();
diff --git a/heron/stmgr/src/cpp/manager/instance-server.h b/heron/stmgr/src/cpp/manager/instance-server.h
index c85a894..5ad504a 100644
--- a/heron/stmgr/src/cpp/manager/instance-server.h
+++ b/heron/stmgr/src/cpp/manager/instance-server.h
@@ -82,7 +82,7 @@ class InstanceServer : public Server {
void InitiateStatefulCheckpoint(const sp_string& _checkpoint_tag);
// Send a RestoreInstanceStateRequest to _task_id asking it to restore itself from _state
virtual bool SendRestoreInstanceStateRequest(sp_int32 _task_id,
- const proto::ckptmgr::InstanceStateCheckpoint& _state);
+ const proto::ckptmgr::InstanceStateCheckpoint& _state);
// Send StartInstanceStatefulProcessing message to all instances so that they can start
// processing
void SendStartInstanceStatefulProcessing(const std::string& _ckpt_id);
diff --git a/heron/stmgr/src/cpp/manager/stateful-restorer.cpp b/heron/stmgr/src/cpp/manager/stateful-restorer.cpp
index f82c1d3..538c71d 100644
--- a/heron/stmgr/src/cpp/manager/stateful-restorer.cpp
+++ b/heron/stmgr/src/cpp/manager/stateful-restorer.cpp
@@ -141,8 +141,8 @@ void StatefulRestorer::GetCheckpoints() {
}
void StatefulRestorer::HandleCheckpointState(proto::system::StatusCode _status, sp_int32 _task_id,
- sp_string _checkpoint_id,
- const proto::ckptmgr::InstanceStateCheckpoint& _state) {
+ sp_string _checkpoint_id,
+ const proto::ckptmgr::InstanceStateCheckpoint& _state) {
LOG(INFO) << "Got InstanceState from checkpoint mgr for task " << _task_id
<< " and checkpoint " << _state.checkpoint_id();
multi_count_metric_->scope(METRIC_CKPT_RESPONSES)->incr();
diff --git a/heron/stmgr/src/cpp/manager/stmgr.h b/heron/stmgr/src/cpp/manager/stmgr.h
index d21bc61..a1444dd 100644
--- a/heron/stmgr/src/cpp/manager/stmgr.h
+++ b/heron/stmgr/src/cpp/manager/stmgr.h
@@ -82,8 +82,9 @@ class StMgr {
proto::system::HeronTupleSet* _message);
// Called when an instance does checkpoint and sends its checkpoint
// to the stmgr to save it
- void HandleStoreInstanceStateCheckpoint(const proto::ckptmgr::InstanceStateCheckpoint& _message,
- const proto::system::Instance& _instance);
+ void HandleStoreInstanceStateCheckpoint(
+ const proto::ckptmgr::InstanceStateCheckpoint& _message,
+ const proto::system::Instance& _instance);
void DrainInstanceData(sp_int32 _task_id, proto::system::HeronTupleSet2* _tuple);
// Send checkpoint message to this task_id
void DrainDownstreamCheckpoint(sp_int32 _task_id,
diff --git a/heron/stmgr/tests/cpp/server/dummy_ckptmgr_client.cpp b/heron/stmgr/tests/cpp/server/dummy_ckptmgr_client.cpp
index 5a4a77f..e2e8761 100644
--- a/heron/stmgr/tests/cpp/server/dummy_ckptmgr_client.cpp
+++ b/heron/stmgr/tests/cpp/server/dummy_ckptmgr_client.cpp
@@ -93,10 +93,11 @@ void DummyCkptMgrClient::DummySaveWatcher(const heron::proto::system::Instance&,
const std::string&) {
}
-void DummyCkptMgrClient::DummyGetWatcher(heron::proto::system::StatusCode,
- int32_t,
- const std::string&,
- const heron::proto::ckptmgr::InstanceStateCheckpoint&) {
+void DummyCkptMgrClient::DummyGetWatcher(
+ heron::proto::system::StatusCode,
+ int32_t,
+ const std::string&,
+ const heron::proto::ckptmgr::InstanceStateCheckpoint&) {
}
void DummyCkptMgrClient::DummyRegisterWatcher() {
diff --git a/heron/stmgr/tests/cpp/server/dummy_instance_server.h b/heron/stmgr/tests/cpp/server/dummy_instance_server.h
index 988aa1f..3b739ba 100644
--- a/heron/stmgr/tests/cpp/server/dummy_instance_server.h
+++ b/heron/stmgr/tests/cpp/server/dummy_instance_server.h
@@ -68,7 +68,7 @@ class DummyInstanceServer : public heron::stmgr::InstanceServer {
virtual void SetAllInstancesConnectedToUs(bool val) { all_instances_connected_ = val; }
virtual bool SendRestoreInstanceStateRequest(sp_int32 _task_id,
- const heron::proto::ckptmgr::InstanceStateCheckpoint&) {
+ const heron::proto::ckptmgr::InstanceStateCheckpoint&) {
restore_sent_.insert(_task_id);
return true;
}