You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@giraph.apache.org by ma...@apache.org on 2014/07/16 19:30:59 UTC
git commit: updated refs/heads/trunk to 02d9e6c
Repository: giraph
Updated Branches:
refs/heads/trunk 0a9017716 -> 02d9e6c25
GIRAPH-924: Fix checkpointing (edunov via majakabiljo)
Project: http://git-wip-us.apache.org/repos/asf/giraph/repo
Commit: http://git-wip-us.apache.org/repos/asf/giraph/commit/02d9e6c2
Tree: http://git-wip-us.apache.org/repos/asf/giraph/tree/02d9e6c2
Diff: http://git-wip-us.apache.org/repos/asf/giraph/diff/02d9e6c2
Branch: refs/heads/trunk
Commit: 02d9e6c2533a3cd108f5e6feaf40f26e95deb64c
Parents: 0a90177
Author: Maja Kabiljo <ma...@fb.com>
Authored: Wed Jul 16 10:30:30 2014 -0700
Committer: Maja Kabiljo <ma...@fb.com>
Committed: Wed Jul 16 10:30:30 2014 -0700
----------------------------------------------------------------------
CHANGELOG | 2 +
.../giraph/aggregators/AggregatorWrapper.java | 6 +-
.../java/org/apache/giraph/bsp/BspService.java | 24 ++
.../java/org/apache/giraph/comm/ServerData.java | 17 +
.../org/apache/giraph/conf/GiraphConstants.java | 19 +
.../apache/giraph/master/BspServiceMaster.java | 179 ++++----
.../giraph/partition/BasicPartitionOwner.java | 10 -
.../giraph/partition/HashMasterPartitioner.java | 6 +
.../giraph/partition/HashWorkerPartitioner.java | 5 +-
.../partition/MasterGraphPartitioner.java | 7 +
.../giraph/partition/PartitionBalancer.java | 4 +-
.../apache/giraph/partition/PartitionOwner.java | 16 -
.../partition/SimpleMasterPartitioner.java | 6 +
.../partition/SimpleWorkerPartitioner.java | 6 +-
.../partition/WorkerGraphPartitioner.java | 5 +-
.../giraph/utils/InternalVertexRunner.java | 260 ++++++++----
.../utils/io/ExtendedDataInputOutput.java | 2 +-
.../apache/giraph/worker/BspServiceWorker.java | 404 ++++++++++++-------
.../org/apache/giraph/worker/WorkerContext.java | 16 +-
.../SimpleRangePartitionFactoryTest.java | 2 +-
.../org/apache/giraph/TestCheckpointing.java | 266 ++++++++++++
21 files changed, 882 insertions(+), 380 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/giraph/blob/02d9e6c2/CHANGELOG
----------------------------------------------------------------------
diff --git a/CHANGELOG b/CHANGELOG
index 7287490..4207339 100644
--- a/CHANGELOG
+++ b/CHANGELOG
@@ -1,6 +1,8 @@
Giraph Change Log
Release 1.1.0 - unreleased
+ GIRAPH-924: Fix checkpointing (edunov via majakabiljo)
+
GIRAPH-921: Create ByteValueVertex to store vertex values as bytes without object instance (akyrola via majakabiljo)
GIRAPH-929: setIfUnset for EnumConfOption (pavanka)
http://git-wip-us.apache.org/repos/asf/giraph/blob/02d9e6c2/giraph-core/src/main/java/org/apache/giraph/aggregators/AggregatorWrapper.java
----------------------------------------------------------------------
diff --git a/giraph-core/src/main/java/org/apache/giraph/aggregators/AggregatorWrapper.java b/giraph-core/src/main/java/org/apache/giraph/aggregators/AggregatorWrapper.java
index 9613805..7150402 100644
--- a/giraph-core/src/main/java/org/apache/giraph/aggregators/AggregatorWrapper.java
+++ b/giraph-core/src/main/java/org/apache/giraph/aggregators/AggregatorWrapper.java
@@ -44,13 +44,9 @@ public class AggregatorWrapper<A extends Writable> {
* @param persistent False iff aggregator should be reset at the end of
* each super step
* @param conf Configuration
- * @throws IllegalAccessException
- * @throws InstantiationException
*/
public AggregatorWrapper(Class<? extends Aggregator<A>> aggregatorClass,
- boolean persistent, ImmutableClassesGiraphConfiguration conf) throws
- IllegalAccessException,
- InstantiationException {
+ boolean persistent, ImmutableClassesGiraphConfiguration conf) {
this.persistent = persistent;
currentAggregator = ReflectionUtils.newInstance(aggregatorClass, conf);
changed = false;
http://git-wip-us.apache.org/repos/asf/giraph/blob/02d9e6c2/giraph-core/src/main/java/org/apache/giraph/bsp/BspService.java
----------------------------------------------------------------------
diff --git a/giraph-core/src/main/java/org/apache/giraph/bsp/BspService.java b/giraph-core/src/main/java/org/apache/giraph/bsp/BspService.java
index 2e35373..02577b9 100644
--- a/giraph-core/src/main/java/org/apache/giraph/bsp/BspService.java
+++ b/giraph-core/src/main/java/org/apache/giraph/bsp/BspService.java
@@ -54,6 +54,7 @@ import java.util.Collections;
import java.util.List;
import static org.apache.giraph.conf.GiraphConstants.CHECKPOINT_DIRECTORY;
+import static org.apache.giraph.conf.GiraphConstants.RESTART_JOB_ID;
/**
* Zookeeper-based implementation of {@link CentralizedService}.
@@ -198,6 +199,11 @@ public abstract class BspService<I extends WritableComparable,
*/
public static final String CHECKPOINT_VALID_POSTFIX = ".valid";
/**
+ * If at the end of a checkpoint file,
+ * indicates that we store WorkerContext and aggregator handler data.
+ */
+ public static final String CHECKPOINT_DATA_POSTFIX = ".data";
+ /**
* If at the end of a checkpoint file, indicates the stitched checkpoint
* file prefixes. A checkpoint is not valid if this file does not exist.
*/
@@ -226,6 +232,8 @@ public abstract class BspService<I extends WritableComparable,
protected final String cleanedUpPath;
/** Path to the checkpoint's root (including job id) */
protected final String checkpointBasePath;
+ /** Old checkpoint in case we want to restart some job */
+ protected final String savedCheckpointBasePath;
/** Path to the master election path */
protected final String masterElectionPath;
/** Stores progress info of this worker */
@@ -350,6 +358,12 @@ public abstract class BspService<I extends WritableComparable,
EDGE_INPUT_SPLITS_ALL_READY_NODE, EDGE_INPUT_SPLITS_ALL_DONE_NODE);
applicationAttemptsPath = basePath + APPLICATION_ATTEMPTS_DIR;
cleanedUpPath = basePath + CLEANED_UP_DIR;
+
+ String restartJobId = RESTART_JOB_ID.get(conf);
+ savedCheckpointBasePath =
+ CHECKPOINT_DIRECTORY.getWithDefault(getConfiguration(),
+ CHECKPOINT_DIRECTORY.getDefaultValue() + "/" +
+ (restartJobId == null ? getJobId() : restartJobId));
checkpointBasePath =
CHECKPOINT_DIRECTORY.getWithDefault(getConfiguration(),
CHECKPOINT_DIRECTORY.getDefaultValue() + "/" + getJobId());
@@ -575,6 +589,16 @@ public abstract class BspService<I extends WritableComparable,
}
/**
+ * In case when we restart another job this will give us a path
+ * to saved checkpoint.
+ * @param superstep superstep to use
+ * @return Direcory path for restarted job based on the superstep
+ */
+ public final String getSavedCheckpointBasePath(long superstep) {
+ return savedCheckpointBasePath + "/" + superstep;
+ }
+
+ /**
* Get the checkpoint from a finalized checkpoint path
*
* @param finalizedPath Path of the finalized checkpoint
http://git-wip-us.apache.org/repos/asf/giraph/blob/02d9e6c2/giraph-core/src/main/java/org/apache/giraph/comm/ServerData.java
----------------------------------------------------------------------
diff --git a/giraph-core/src/main/java/org/apache/giraph/comm/ServerData.java b/giraph-core/src/main/java/org/apache/giraph/comm/ServerData.java
index 85bfe04..036510e 100644
--- a/giraph-core/src/main/java/org/apache/giraph/comm/ServerData.java
+++ b/giraph-core/src/main/java/org/apache/giraph/comm/ServerData.java
@@ -161,6 +161,23 @@ public class ServerData<I extends WritableComparable,
return (MessageStore<I, M>) currentMessageStore;
}
+ /**
+ * Re-initialize message stores.
+ * Discards old values if any.
+ * @throws IOException
+ */
+ public void resetMessageStores() throws IOException {
+ if (currentMessageStore != null) {
+ currentMessageStore.clearAll();
+ currentMessageStore = null;
+ }
+ if (incomingMessageStore != null) {
+ incomingMessageStore.clearAll();
+ incomingMessageStore = null;
+ }
+ prepareSuperstep();
+ }
+
/** Prepare for next super step */
public void prepareSuperstep() {
if (currentMessageStore != null) {
http://git-wip-us.apache.org/repos/asf/giraph/blob/02d9e6c2/giraph-core/src/main/java/org/apache/giraph/conf/GiraphConstants.java
----------------------------------------------------------------------
diff --git a/giraph-core/src/main/java/org/apache/giraph/conf/GiraphConstants.java b/giraph-core/src/main/java/org/apache/giraph/conf/GiraphConstants.java
index 81c0e0b..3d16e9c 100644
--- a/giraph-core/src/main/java/org/apache/giraph/conf/GiraphConstants.java
+++ b/giraph-core/src/main/java/org/apache/giraph/conf/GiraphConstants.java
@@ -880,6 +880,13 @@ public interface GiraphConstants {
String RESTART_SUPERSTEP = "giraph.restartSuperstep";
/**
+ * If application is restarted manually we need to specify job ID
+ * to restart from.
+ */
+ StrConfOption RESTART_JOB_ID = new StrConfOption("giraph.restart.jobId",
+ null, "Which job ID should I try to restart?");
+
+ /**
* Base ZNode for Giraph's state in the ZooKeeper cluster. Must be a root
* znode on the cluster beginning with "/"
*/
@@ -1124,5 +1131,17 @@ public interface GiraphConstants {
IntConfOption HDFS_FILE_CREATION_RETRY_WAIT_MS =
new IntConfOption("giraph.hdfs.file.creation.retry.wait.ms", 30_000,
"Milliseconds to wait prior to retrying creation of an HDFS file");
+
+ /** Number of threads for writing and reading checkpoints */
+ IntConfOption NUM_CHECKPOINT_IO_THREADS =
+ new IntConfOption("giraph.checkpoint.io.threads", 8,
+ "Number of threads for writing and reading checkpoints");
+
+ /** Compression algorithm to be used for checkpointing */
+ StrConfOption CHECKPOINT_COMPRESSION_CODEC =
+ new StrConfOption("giraph.checkpoint.compression.codec",
+ "org.apache.hadoop.io.compress.DefaultCodec",
+ "Defines compression algorithm we will be using for " +
+ "storing checkpoint");
}
// CHECKSTYLE: resume InterfaceIsTypeCheck
http://git-wip-us.apache.org/repos/asf/giraph/blob/02d9e6c2/giraph-core/src/main/java/org/apache/giraph/master/BspServiceMaster.java
----------------------------------------------------------------------
diff --git a/giraph-core/src/main/java/org/apache/giraph/master/BspServiceMaster.java b/giraph-core/src/main/java/org/apache/giraph/master/BspServiceMaster.java
index 0275395..e129390 100644
--- a/giraph-core/src/main/java/org/apache/giraph/master/BspServiceMaster.java
+++ b/giraph-core/src/main/java/org/apache/giraph/master/BspServiceMaster.java
@@ -18,6 +18,8 @@
package org.apache.giraph.master;
+import it.unimi.dsi.fastutil.ints.Int2ObjectMap;
+import it.unimi.dsi.fastutil.ints.Int2ObjectOpenHashMap;
import org.apache.commons.io.FilenameUtils;
import org.apache.giraph.bsp.ApplicationState;
import org.apache.giraph.bsp.BspInputFormat;
@@ -42,6 +44,7 @@ import org.apache.giraph.io.GiraphInputFormat;
import org.apache.giraph.graph.GraphTaskManager;
import org.apache.giraph.io.MappingInputFormat;
import org.apache.giraph.io.VertexInputFormat;
+import org.apache.giraph.partition.BasicPartitionOwner;
import org.apache.giraph.partition.MasterGraphPartitioner;
import org.apache.giraph.partition.PartitionOwner;
import org.apache.giraph.partition.PartitionStats;
@@ -100,10 +103,8 @@ import java.util.Arrays;
import java.util.Collection;
import java.util.Collections;
import java.util.Comparator;
-import java.util.HashMap;
import java.util.HashSet;
import java.util.List;
-import java.util.Map;
import java.util.Set;
import java.util.TreeSet;
import java.util.concurrent.Callable;
@@ -272,6 +273,7 @@ public class BspServiceMaster<I extends WritableComparable,
Ids.OPEN_ACL_UNSAFE,
CreateMode.PERSISTENT_SEQUENTIAL,
true);
+ LOG.info("setJobState: " + jobState);
} catch (KeeperException.NodeExistsException e) {
throw new IllegalStateException(
"setJobState: Imposible that " +
@@ -740,20 +742,18 @@ public class BspServiceMaster<I extends WritableComparable,
* finalized checkpoint file and setting it.
*
* @param superstep Checkpoint set to examine.
- * @param partitionOwners Partition owners to modify with checkpoint
- * prefixes
* @throws IOException
* @throws InterruptedException
* @throws KeeperException
+ * @return Collection of generated partition owners.
*/
- private void prepareCheckpointRestart(
- long superstep,
- Collection<PartitionOwner> partitionOwners)
+ private Collection<PartitionOwner> prepareCheckpointRestart(long superstep)
throws IOException, KeeperException, InterruptedException {
+ List<PartitionOwner> partitionOwners = new ArrayList<>();
FileSystem fs = getFs();
- List<Path> validMetadataPathList = new ArrayList<Path>();
String finalizedCheckpointPath =
- getCheckpointBasePath(superstep) + CHECKPOINT_FINALIZED_POSTFIX;
+ getSavedCheckpointBasePath(superstep) + CHECKPOINT_FINALIZED_POSTFIX;
+ LOG.info("Loading checkpoint from " + finalizedCheckpointPath);
DataInputStream finalizedStream =
fs.open(new Path(finalizedCheckpointPath));
GlobalStats globalStats = new GlobalStats();
@@ -763,51 +763,46 @@ public class BspServiceMaster<I extends WritableComparable,
superstepClasses.readFields(finalizedStream);
getConfiguration().updateSuperstepClasses(superstepClasses);
int prefixFileCount = finalizedStream.readInt();
- for (int i = 0; i < prefixFileCount; ++i) {
- String metadataFilePath =
- finalizedStream.readUTF() + CHECKPOINT_METADATA_POSTFIX;
- validMetadataPathList.add(new Path(metadataFilePath));
- }
- aggregatorHandler.readFields(finalizedStream);
- masterCompute.readFields(finalizedStream);
- finalizedStream.close();
- Map<Integer, PartitionOwner> idOwnerMap =
- new HashMap<Integer, PartitionOwner>();
- for (PartitionOwner partitionOwner : partitionOwners) {
- if (idOwnerMap.put(partitionOwner.getPartitionId(),
- partitionOwner) != null) {
- throw new IllegalStateException(
- "prepareCheckpointRestart: Duplicate partition " +
- partitionOwner);
- }
+ Int2ObjectMap<WorkerInfo> workersMap = new Int2ObjectOpenHashMap<>();
+ for (WorkerInfo worker : chosenWorkerInfoList) {
+ workersMap.put(worker.getTaskId(), worker);
}
- // Reading the metadata files. Simply assign each partition owner
- // the correct file prefix based on the partition id.
- for (Path metadataPath : validMetadataPathList) {
- String checkpointFilePrefix = metadataPath.toString();
- checkpointFilePrefix =
- checkpointFilePrefix.substring(
- 0,
- checkpointFilePrefix.length() -
- CHECKPOINT_METADATA_POSTFIX.length());
- DataInputStream metadataStream = fs.open(metadataPath);
+ String checkpointFile =
+ finalizedStream.readUTF();
+ for (int i = 0; i < prefixFileCount; ++i) {
+ int mrTaskId = finalizedStream.readInt();
+
+ DataInputStream metadataStream = fs.open(new Path(checkpointFile +
+ "." + mrTaskId + CHECKPOINT_METADATA_POSTFIX));
long partitions = metadataStream.readInt();
- for (long i = 0; i < partitions; ++i) {
- long dataPos = metadataStream.readLong();
+ WorkerInfo worker = workersMap.get(mrTaskId);
+ for (long p = 0; p < partitions; ++p) {
int partitionId = metadataStream.readInt();
- PartitionOwner partitionOwner = idOwnerMap.get(partitionId);
- if (LOG.isInfoEnabled()) {
- LOG.info("prepareSuperstepRestart: File " + metadataPath +
- " with position " + dataPos +
- ", partition id = " + partitionId +
- " assigned to " + partitionOwner);
- }
- partitionOwner.setCheckpointFilesPrefix(checkpointFilePrefix);
+ PartitionOwner partitionOwner = new BasicPartitionOwner(partitionId,
+ worker);
+ partitionOwners.add(partitionOwner);
+ LOG.info("prepareCheckpointRestart partitionId=" + partitionId +
+ " assigned to " + partitionOwner);
}
metadataStream.close();
}
+ //Ordering appears to be important as of right now we rely on this ordering
+ //in WorkerGraphPartitioner
+ Collections.sort(partitionOwners, new Comparator<PartitionOwner>() {
+ @Override
+ public int compare(PartitionOwner p1, PartitionOwner p2) {
+ return Integer.compare(p1.getPartitionId(), p2.getPartitionId());
+ }
+ });
+
+
+ aggregatorHandler.readFields(finalizedStream);
+ masterCompute.readFields(finalizedStream);
+ finalizedStream.close();
+
+ return partitionOwners;
}
@Override
@@ -1085,11 +1080,9 @@ public class BspServiceMaster<I extends WritableComparable,
getZkExt().getData(superstepFinishedNode, false, null));
finalizedOutputStream.writeInt(chosenWorkerInfoList.size());
+ finalizedOutputStream.writeUTF(getCheckpointBasePath(superstep));
for (WorkerInfo chosenWorkerInfo : chosenWorkerInfoList) {
- String chosenWorkerInfoPrefix =
- getCheckpointBasePath(superstep) + "." +
- chosenWorkerInfo.getHostnameId();
- finalizedOutputStream.writeUTF(chosenWorkerInfoPrefix);
+ finalizedOutputStream.writeInt(chosenWorkerInfo.getTaskId());
}
aggregatorHandler.write(finalizedOutputStream);
masterCompute.write(finalizedOutputStream);
@@ -1104,18 +1097,10 @@ public class BspServiceMaster<I extends WritableComparable,
* the workers will know how to do the exchange. If this was a restarted
* superstep, then make sure to provide information on where to find the
* checkpoint file.
- *
- * @param allPartitionStatsList All partition stats
- * @param chosenWorkerInfoList All the chosen worker infos
- * @param masterGraphPartitioner Master graph partitioner
*/
- private void assignPartitionOwners(
- List<PartitionStats> allPartitionStatsList,
- List<WorkerInfo> chosenWorkerInfoList,
- MasterGraphPartitioner<I, V, E> masterGraphPartitioner) {
+ private void assignPartitionOwners() {
Collection<PartitionOwner> partitionOwners;
- if (getSuperstep() == INPUT_SUPERSTEP ||
- getSuperstep() == getRestartedSuperstep()) {
+ if (getSuperstep() == INPUT_SUPERSTEP) {
partitionOwners =
masterGraphPartitioner.createInitialPartitionOwners(
chosenWorkerInfoList, maxWorkers);
@@ -1123,23 +1108,10 @@ public class BspServiceMaster<I extends WritableComparable,
throw new IllegalStateException(
"assignAndExchangePartitions: No partition owners set");
}
- } else {
- partitionOwners =
- masterGraphPartitioner.generateChangedPartitionOwners(
- allPartitionStatsList,
- chosenWorkerInfoList,
- maxWorkers,
- getSuperstep());
-
- PartitionUtils.analyzePartitionStats(partitionOwners,
- allPartitionStatsList);
- }
- checkPartitions(masterGraphPartitioner.getCurrentPartitionOwners());
-
- // If restarted, prepare the checkpoint restart
- if (getRestartedSuperstep() == getSuperstep()) {
+ } else if (getRestartedSuperstep() == getSuperstep()) {
+ // If restarted, prepare the checkpoint restart
try {
- prepareCheckpointRestart(getSuperstep(), partitionOwners);
+ partitionOwners = prepareCheckpointRestart(getSuperstep());
} catch (IOException e) {
throw new IllegalStateException(
"assignPartitionOwners: IOException on preparing", e);
@@ -1151,7 +1123,21 @@ public class BspServiceMaster<I extends WritableComparable,
"assignPartitionOwners: InteruptedException on preparing",
e);
}
+ masterGraphPartitioner.setPartitionOwners(partitionOwners);
+ } else {
+ partitionOwners =
+ masterGraphPartitioner.generateChangedPartitionOwners(
+ allPartitionStatsList,
+ chosenWorkerInfoList,
+ maxWorkers,
+ getSuperstep());
+
+ PartitionUtils.analyzePartitionStats(partitionOwners,
+ allPartitionStatsList);
}
+ checkPartitions(masterGraphPartitioner.getCurrentPartitionOwners());
+
+
// There will be some exchange of partitions
if (!partitionOwners.isEmpty()) {
@@ -1240,18 +1226,9 @@ public class BspServiceMaster<I extends WritableComparable,
// 1. Remove all old input split data
// 2. Increase the application attempt and set to the correct checkpoint
// 3. Send command to all workers to restart their tasks
- try {
- getZkExt().deleteExt(vertexInputSplitsPaths.getPath(), -1,
- true);
- getZkExt().deleteExt(edgeInputSplitsPaths.getPath(), -1,
- true);
- } catch (InterruptedException e) {
- throw new RuntimeException(
- "restartFromCheckpoint: InterruptedException", e);
- } catch (KeeperException e) {
- throw new RuntimeException(
- "restartFromCheckpoint: KeeperException", e);
- }
+ zkDeleteNode(vertexInputSplitsPaths.getPath());
+ zkDeleteNode(edgeInputSplitsPaths.getPath());
+
setApplicationAttempt(getApplicationAttempt() + 1);
setCachedSuperstep(checkpoint);
setRestartedSuperstep(checkpoint);
@@ -1261,6 +1238,26 @@ public class BspServiceMaster<I extends WritableComparable,
}
/**
+ * Safely removes node from zookeeper.
+ * Ignores if node is already removed. Can only throw runtime exception if
+ * anything wrong.
+ * @param path path to the node to be removed.
+ */
+ private void zkDeleteNode(String path) {
+ try {
+ getZkExt().deleteExt(path, -1, true);
+ } catch (KeeperException.NoNodeException e) {
+ LOG.info("zkDeleteNode: node has already been removed " + path);
+ } catch (InterruptedException e) {
+ throw new RuntimeException(
+ "zkDeleteNode: InterruptedException", e);
+ } catch (KeeperException e) {
+ throw new RuntimeException(
+ "zkDeleteNode: KeeperException", e);
+ }
+ }
+
+ /**
* Only get the finalized checkpoint files
*/
public static class FinalizedCheckpointPathFilter implements PathFilter {
@@ -1277,7 +1274,7 @@ public class BspServiceMaster<I extends WritableComparable,
if (lastCheckpointedSuperstep == -1) {
try {
FileStatus[] fileStatusArray =
- getFs().listStatus(new Path(checkpointBasePath),
+ getFs().listStatus(new Path(savedCheckpointBasePath),
new FinalizedCheckpointPathFilter());
if (fileStatusArray == null) {
return -1;
@@ -1582,9 +1579,7 @@ public class BspServiceMaster<I extends WritableComparable,
GiraphStats.getInstance().
getCurrentWorkers().setValue(chosenWorkerInfoList.size());
- assignPartitionOwners(allPartitionStatsList,
- chosenWorkerInfoList,
- masterGraphPartitioner);
+ assignPartitionOwners();
// We need to finalize aggregators from previous superstep (send them to
// worker owners) after new worker assignments
http://git-wip-us.apache.org/repos/asf/giraph/blob/02d9e6c2/giraph-core/src/main/java/org/apache/giraph/partition/BasicPartitionOwner.java
----------------------------------------------------------------------
diff --git a/giraph-core/src/main/java/org/apache/giraph/partition/BasicPartitionOwner.java b/giraph-core/src/main/java/org/apache/giraph/partition/BasicPartitionOwner.java
index 545d1af..b6cf813 100644
--- a/giraph-core/src/main/java/org/apache/giraph/partition/BasicPartitionOwner.java
+++ b/giraph-core/src/main/java/org/apache/giraph/partition/BasicPartitionOwner.java
@@ -103,16 +103,6 @@ public class BasicPartitionOwner implements PartitionOwner,
}
@Override
- public String getCheckpointFilesPrefix() {
- return checkpointFilesPrefix;
- }
-
- @Override
- public void setCheckpointFilesPrefix(String checkpointFilesPrefix) {
- this.checkpointFilesPrefix = checkpointFilesPrefix;
- }
-
- @Override
public void writeWithWorkerIds(DataOutput output) throws IOException {
output.writeInt(partitionId);
output.writeInt(workerInfo.getTaskId());
http://git-wip-us.apache.org/repos/asf/giraph/blob/02d9e6c2/giraph-core/src/main/java/org/apache/giraph/partition/HashMasterPartitioner.java
----------------------------------------------------------------------
diff --git a/giraph-core/src/main/java/org/apache/giraph/partition/HashMasterPartitioner.java b/giraph-core/src/main/java/org/apache/giraph/partition/HashMasterPartitioner.java
index 240687e..caede8c 100644
--- a/giraph-core/src/main/java/org/apache/giraph/partition/HashMasterPartitioner.java
+++ b/giraph-core/src/main/java/org/apache/giraph/partition/HashMasterPartitioner.java
@@ -18,6 +18,7 @@
package org.apache.giraph.partition;
+import com.google.common.collect.Lists;
import org.apache.giraph.conf.ImmutableClassesGiraphConfiguration;
import org.apache.giraph.worker.WorkerInfo;
import org.apache.hadoop.io.Writable;
@@ -75,6 +76,11 @@ public class HashMasterPartitioner<I extends WritableComparable,
}
@Override
+ public void setPartitionOwners(Collection<PartitionOwner> partitionOwners) {
+ this.partitionOwnerList = Lists.newArrayList(partitionOwners);
+ }
+
+ @Override
public Collection<PartitionOwner> getCurrentPartitionOwners() {
return partitionOwnerList;
}
http://git-wip-us.apache.org/repos/asf/giraph/blob/02d9e6c2/giraph-core/src/main/java/org/apache/giraph/partition/HashWorkerPartitioner.java
----------------------------------------------------------------------
diff --git a/giraph-core/src/main/java/org/apache/giraph/partition/HashWorkerPartitioner.java b/giraph-core/src/main/java/org/apache/giraph/partition/HashWorkerPartitioner.java
index d833895..12aa417 100644
--- a/giraph-core/src/main/java/org/apache/giraph/partition/HashWorkerPartitioner.java
+++ b/giraph-core/src/main/java/org/apache/giraph/partition/HashWorkerPartitioner.java
@@ -65,10 +65,9 @@ public class HashWorkerPartitioner<I extends WritableComparable,
@Override
public PartitionExchange updatePartitionOwners(
WorkerInfo myWorkerInfo,
- Collection<? extends PartitionOwner> masterSetPartitionOwners,
- PartitionStore<I, V, E> partitionStore) {
+ Collection<? extends PartitionOwner> masterSetPartitionOwners) {
return PartitionBalancer.updatePartitionOwners(partitionOwnerList,
- myWorkerInfo, masterSetPartitionOwners, partitionStore);
+ myWorkerInfo, masterSetPartitionOwners);
}
@Override
http://git-wip-us.apache.org/repos/asf/giraph/blob/02d9e6c2/giraph-core/src/main/java/org/apache/giraph/partition/MasterGraphPartitioner.java
----------------------------------------------------------------------
diff --git a/giraph-core/src/main/java/org/apache/giraph/partition/MasterGraphPartitioner.java b/giraph-core/src/main/java/org/apache/giraph/partition/MasterGraphPartitioner.java
index 50c750a..d2363fb 100644
--- a/giraph-core/src/main/java/org/apache/giraph/partition/MasterGraphPartitioner.java
+++ b/giraph-core/src/main/java/org/apache/giraph/partition/MasterGraphPartitioner.java
@@ -47,6 +47,13 @@ public interface MasterGraphPartitioner<I extends WritableComparable,
Collection<WorkerInfo> availableWorkerInfos, int maxWorkers);
/**
+ * Sets partition owners for the graph.
+ * Used then loading from checkpoint.
+ * @param partitionOwners assigned partition owners.
+ */
+ void setPartitionOwners(Collection<PartitionOwner> partitionOwners);
+
+ /**
* After the worker stats have been merged to a single list, the master can
* use this information to send commands to the workers for any
* {@link Partition} changes. This protocol is specific to the
http://git-wip-us.apache.org/repos/asf/giraph/blob/02d9e6c2/giraph-core/src/main/java/org/apache/giraph/partition/PartitionBalancer.java
----------------------------------------------------------------------
diff --git a/giraph-core/src/main/java/org/apache/giraph/partition/PartitionBalancer.java b/giraph-core/src/main/java/org/apache/giraph/partition/PartitionBalancer.java
index 3454d62..0d8f3cf 100644
--- a/giraph-core/src/main/java/org/apache/giraph/partition/PartitionBalancer.java
+++ b/giraph-core/src/main/java/org/apache/giraph/partition/PartitionBalancer.java
@@ -309,14 +309,12 @@ public class PartitionBalancer {
* @param myWorkerInfo Worker info
* @param masterSetPartitionOwners Master set partition owners, received
* prior to beginning the superstep
- * @param partitionStore Partition store for the given worker
* @return Information for the partition exchange.
*/
public static PartitionExchange updatePartitionOwners(
List<PartitionOwner> partitionOwnerList,
WorkerInfo myWorkerInfo,
- Collection<? extends PartitionOwner> masterSetPartitionOwners,
- PartitionStore partitionStore) {
+ Collection<? extends PartitionOwner> masterSetPartitionOwners) {
partitionOwnerList.clear();
partitionOwnerList.addAll(masterSetPartitionOwners);
http://git-wip-us.apache.org/repos/asf/giraph/blob/02d9e6c2/giraph-core/src/main/java/org/apache/giraph/partition/PartitionOwner.java
----------------------------------------------------------------------
diff --git a/giraph-core/src/main/java/org/apache/giraph/partition/PartitionOwner.java b/giraph-core/src/main/java/org/apache/giraph/partition/PartitionOwner.java
index 0ac74da..f303a09 100644
--- a/giraph-core/src/main/java/org/apache/giraph/partition/PartitionOwner.java
+++ b/giraph-core/src/main/java/org/apache/giraph/partition/PartitionOwner.java
@@ -69,22 +69,6 @@ public interface PartitionOwner extends Writable {
void setPreviousWorkerInfo(WorkerInfo workerInfo);
/**
- * If this is a restarted checkpoint, the worker will use this information
- * to determine where the checkpointed partition was stored on HDFS.
- *
- * @return Prefix of the checkpoint HDFS files for this partition, null if
- * this is not a restarted superstep.
- */
- String getCheckpointFilesPrefix();
-
- /**
- * Set the checkpoint files prefix. Master uses this.
- *
- * @param checkpointFilesPrefix HDFS checkpoint file prefix
- */
- void setCheckpointFilesPrefix(String checkpointFilesPrefix);
-
- /**
* Write to the output, but don't serialize the whole WorkerInfo,
* instead use just the task id
*
http://git-wip-us.apache.org/repos/asf/giraph/blob/02d9e6c2/giraph-core/src/main/java/org/apache/giraph/partition/SimpleMasterPartitioner.java
----------------------------------------------------------------------
diff --git a/giraph-core/src/main/java/org/apache/giraph/partition/SimpleMasterPartitioner.java b/giraph-core/src/main/java/org/apache/giraph/partition/SimpleMasterPartitioner.java
index f128f34..7d4c1cb 100644
--- a/giraph-core/src/main/java/org/apache/giraph/partition/SimpleMasterPartitioner.java
+++ b/giraph-core/src/main/java/org/apache/giraph/partition/SimpleMasterPartitioner.java
@@ -22,6 +22,7 @@ import java.util.ArrayList;
import java.util.Collection;
import java.util.List;
+import com.google.common.collect.Lists;
import org.apache.giraph.conf.ImmutableClassesGiraphConfiguration;
import org.apache.giraph.worker.WorkerInfo;
import org.apache.hadoop.io.Writable;
@@ -74,6 +75,11 @@ public abstract class SimpleMasterPartitioner<I extends WritableComparable,
}
@Override
+ public void setPartitionOwners(Collection<PartitionOwner> partitionOwners) {
+ partitionOwnerList = Lists.newArrayList(partitionOwners);
+ }
+
+ @Override
public Collection<PartitionOwner> generateChangedPartitionOwners(
Collection<PartitionStats> allPartitionStatsList,
Collection<WorkerInfo> availableWorkers,
http://git-wip-us.apache.org/repos/asf/giraph/blob/02d9e6c2/giraph-core/src/main/java/org/apache/giraph/partition/SimpleWorkerPartitioner.java
----------------------------------------------------------------------
diff --git a/giraph-core/src/main/java/org/apache/giraph/partition/SimpleWorkerPartitioner.java b/giraph-core/src/main/java/org/apache/giraph/partition/SimpleWorkerPartitioner.java
index 3c0de44..0ee8d92 100644
--- a/giraph-core/src/main/java/org/apache/giraph/partition/SimpleWorkerPartitioner.java
+++ b/giraph-core/src/main/java/org/apache/giraph/partition/SimpleWorkerPartitioner.java
@@ -71,11 +71,9 @@ public abstract class SimpleWorkerPartitioner<I extends WritableComparable,
@Override
public PartitionExchange updatePartitionOwners(WorkerInfo myWorkerInfo,
- Collection<? extends PartitionOwner> masterSetPartitionOwners,
- PartitionStore<I, V, E> partitionStore) {
+ Collection<? extends PartitionOwner> masterSetPartitionOwners) {
PartitionExchange exchange = PartitionBalancer.updatePartitionOwners(
- partitionOwnerList, myWorkerInfo, masterSetPartitionOwners,
- partitionStore);
+ partitionOwnerList, myWorkerInfo, masterSetPartitionOwners);
extractAvailableWorkers();
return exchange;
}
http://git-wip-us.apache.org/repos/asf/giraph/blob/02d9e6c2/giraph-core/src/main/java/org/apache/giraph/partition/WorkerGraphPartitioner.java
----------------------------------------------------------------------
diff --git a/giraph-core/src/main/java/org/apache/giraph/partition/WorkerGraphPartitioner.java b/giraph-core/src/main/java/org/apache/giraph/partition/WorkerGraphPartitioner.java
index 004ea81..211fedb 100644
--- a/giraph-core/src/main/java/org/apache/giraph/partition/WorkerGraphPartitioner.java
+++ b/giraph-core/src/main/java/org/apache/giraph/partition/WorkerGraphPartitioner.java
@@ -75,14 +75,11 @@ public interface WorkerGraphPartitioner<I extends WritableComparable,
* @param myWorkerInfo Worker info.
* @param masterSetPartitionOwners Master set partition owners, received
* prior to beginning the superstep
- * @param partitionStore Partition store for this worker
- * (can be used to fill the return map of partitions to send)
* @return Information for the partition exchange.
*/
PartitionExchange updatePartitionOwners(
WorkerInfo myWorkerInfo,
- Collection<? extends PartitionOwner> masterSetPartitionOwners,
- PartitionStore<I, V, E> partitionStore);
+ Collection<? extends PartitionOwner> masterSetPartitionOwners);
/**
* Get a collection of the {@link PartitionOwner} objects.
http://git-wip-us.apache.org/repos/asf/giraph/blob/02d9e6c2/giraph-core/src/main/java/org/apache/giraph/utils/InternalVertexRunner.java
----------------------------------------------------------------------
diff --git a/giraph-core/src/main/java/org/apache/giraph/utils/InternalVertexRunner.java b/giraph-core/src/main/java/org/apache/giraph/utils/InternalVertexRunner.java
index 2c4606f..bb2865c 100644
--- a/giraph-core/src/main/java/org/apache/giraph/utils/InternalVertexRunner.java
+++ b/giraph-core/src/main/java/org/apache/giraph/utils/InternalVertexRunner.java
@@ -142,81 +142,107 @@ public class InternalVertexRunner {
GiraphConfiguration conf,
String[] vertexInputData,
String[] edgeInputData) throws Exception {
- File tmpDir = null;
+ // Prepare input file, output folder and temporary folders
+ File tmpDir = FileUtils.createTestDir(conf.getComputationName());
try {
- // Prepare input file, output folder and temporary folders
- tmpDir = FileUtils.createTestDir(conf.getComputationName());
-
- File vertexInputFile = null;
- File edgeInputFile = null;
- if (conf.hasVertexInputFormat()) {
- vertexInputFile = FileUtils.createTempFile(tmpDir, "vertices.txt");
- }
- if (conf.hasEdgeInputFormat()) {
- edgeInputFile = FileUtils.createTempFile(tmpDir, "edges.txt");
- }
+ return run(conf, vertexInputData, edgeInputData, null, tmpDir);
+ } finally {
+ FileUtils.delete(tmpDir);
+ }
+ }
- File outputDir = FileUtils.createTempDir(tmpDir, "output");
- File zkDir = FileUtils.createTempDir(tmpDir, "_bspZooKeeper");
- File zkMgrDir = FileUtils.createTempDir(tmpDir, "_defaultZkManagerDir");
- File checkpointsDir = FileUtils.createTempDir(tmpDir, "_checkpoints");
+ /**
+ * Attempts to run the vertex internally in the current JVM, reading from and
+ * writing to a temporary folder on local disk. Will start its own zookeeper
+ * instance.
+ *
+ *
+ * @param conf GiraphClasses specifying which types to use
+ * @param vertexInputData linewise vertex input data
+ * @param edgeInputData linewise edge input data
+ * @param checkpointsDir if set, will use this folder
+ * for storing checkpoints.
+ * @param tmpDir file path for storing temporary files.
+ * @return linewise output data, or null if job fails
+ * @throws Exception if anything goes wrong
+ */
+ public static Iterable<String> run(
+ GiraphConfiguration conf,
+ String[] vertexInputData,
+ String[] edgeInputData,
+ String checkpointsDir,
+ File tmpDir) throws Exception {
+ File vertexInputFile = null;
+ File edgeInputFile = null;
+ if (conf.hasVertexInputFormat()) {
+ vertexInputFile = FileUtils.createTempFile(tmpDir, "vertices.txt");
+ }
+ if (conf.hasEdgeInputFormat()) {
+ edgeInputFile = FileUtils.createTempFile(tmpDir, "edges.txt");
+ }
- // Write input data to disk
- if (conf.hasVertexInputFormat()) {
- FileUtils.writeLines(vertexInputFile, vertexInputData);
- }
- if (conf.hasEdgeInputFormat()) {
- FileUtils.writeLines(edgeInputFile, edgeInputData);
- }
+ File outputDir = FileUtils.createTempDir(tmpDir, "output");
+ File zkDir = FileUtils.createTempDir(tmpDir, "_bspZooKeeper");
+ File zkMgrDir = FileUtils.createTempDir(tmpDir, "_defaultZkManagerDir");
+ // Write input data to disk
+ if (conf.hasVertexInputFormat()) {
+ FileUtils.writeLines(vertexInputFile, vertexInputData);
+ }
+ if (conf.hasEdgeInputFormat()) {
+ FileUtils.writeLines(edgeInputFile, edgeInputData);
+ }
- int localZookeeperPort = findAvailablePort();
+ int localZookeeperPort = findAvailablePort();
- conf.setWorkerConfiguration(1, 1, 100.0f);
- GiraphConstants.SPLIT_MASTER_WORKER.set(conf, false);
- GiraphConstants.LOCAL_TEST_MODE.set(conf, true);
- conf.setZookeeperList("localhost:" +
+ conf.setWorkerConfiguration(1, 1, 100.0f);
+ GiraphConstants.SPLIT_MASTER_WORKER.set(conf, false);
+ GiraphConstants.LOCAL_TEST_MODE.set(conf, true);
+ conf.setZookeeperList("localhost:" +
String.valueOf(localZookeeperPort));
- conf.set(GiraphConstants.ZOOKEEPER_DIR, zkDir.toString());
- GiraphConstants.ZOOKEEPER_MANAGER_DIRECTORY.set(conf,
- zkMgrDir.toString());
- GiraphConstants.CHECKPOINT_DIRECTORY.set(conf, checkpointsDir.toString());
+ conf.set(GiraphConstants.ZOOKEEPER_DIR, zkDir.toString());
+ GiraphConstants.ZOOKEEPER_MANAGER_DIRECTORY.set(conf,
+ zkMgrDir.toString());
- // Create and configure the job to run the vertex
- GiraphJob job = new GiraphJob(conf, conf.getComputationName());
+ if (checkpointsDir == null) {
+ checkpointsDir = FileUtils.createTempDir(
+ tmpDir, "_checkpoints").toString();
+ }
+ GiraphConstants.CHECKPOINT_DIRECTORY.set(conf, checkpointsDir);
- Job internalJob = job.getInternalJob();
- if (conf.hasVertexInputFormat()) {
- GiraphFileInputFormat.setVertexInputPath(internalJob.getConfiguration(),
- new Path(vertexInputFile.toString()));
- }
- if (conf.hasEdgeInputFormat()) {
- GiraphFileInputFormat.setEdgeInputPath(internalJob.getConfiguration(),
- new Path(edgeInputFile.toString()));
- }
- FileOutputFormat.setOutputPath(job.getInternalJob(),
- new Path(outputDir.toString()));
+ // Create and configure the job to run the vertex
+ GiraphJob job = new GiraphJob(conf, conf.getComputationName());
- // Configure a local zookeeper instance
- Properties zkProperties = configLocalZooKeeper(zkDir, localZookeeperPort);
+ Job internalJob = job.getInternalJob();
+ if (conf.hasVertexInputFormat()) {
+ GiraphFileInputFormat.setVertexInputPath(internalJob.getConfiguration(),
+ new Path(vertexInputFile.toString()));
+ }
+ if (conf.hasEdgeInputFormat()) {
+ GiraphFileInputFormat.setEdgeInputPath(internalJob.getConfiguration(),
+ new Path(edgeInputFile.toString()));
+ }
+ FileOutputFormat.setOutputPath(job.getInternalJob(),
+ new Path(outputDir.toString()));
- QuorumPeerConfig qpConfig = new QuorumPeerConfig();
- qpConfig.parseProperties(zkProperties);
+ // Configure a local zookeeper instance
+ Properties zkProperties = configLocalZooKeeper(zkDir, localZookeeperPort);
- boolean success = runZooKeeperAndJob(qpConfig, job);
- if (!success) {
- return null;
- }
+ QuorumPeerConfig qpConfig = new QuorumPeerConfig();
+ qpConfig.parseProperties(zkProperties);
- File outFile = new File(outputDir, "part-m-00000");
- if (conf.hasVertexOutputFormat() && outFile.canRead()) {
- return Files.readLines(outFile, Charsets.UTF_8);
- } else {
- return ImmutableList.of();
- }
- } finally {
- FileUtils.delete(tmpDir);
+ boolean success = runZooKeeperAndJob(qpConfig, job);
+ if (!success) {
+ return null;
}
+
+ File outFile = new File(outputDir, "part-m-00000");
+ if (conf.hasVertexOutputFormat() && outFile.canRead()) {
+ return Files.readLines(outFile, Charsets.UTF_8);
+ } else {
+ return ImmutableList.of();
+ }
+
}
/**
@@ -236,42 +262,97 @@ public class InternalVertexRunner {
E extends Writable> void run(
GiraphConfiguration conf,
TestGraph<I, V, E> graph) throws Exception {
- File tmpDir = null;
+ // Prepare temporary folders
+ File tmpDir = FileUtils.createTestDir(conf.getComputationName());
try {
- // Prepare temporary folders
- tmpDir = FileUtils.createTestDir(conf.getComputationName());
+ run(conf, graph, tmpDir, null);
+ } finally {
+ FileUtils.delete(tmpDir);
+ }
+ }
- File zkDir = FileUtils.createTempDir(tmpDir, "_bspZooKeeper");
- File zkMgrDir = FileUtils.createTempDir(tmpDir, "_defaultZkManagerDir");
- File checkpointsDir = FileUtils.createTempDir(tmpDir, "_checkpoints");
+ /**
+ * Attempts to run the vertex internally in the current JVM,
+ * reading from an in-memory graph. Will start its own zookeeper
+ * instance.
+ *
+ * @param <I> Vertex ID
+ * @param <V> Vertex Value
+ * @param <E> Edge Value
+ * @param conf GiraphClasses specifying which types to use
+ * @param graph input graph
+ * @param tmpDir file path for storing temporary files.
+ * @param checkpointsDir if set, will use this folder
+ * for storing checkpoints.
+ * @throws Exception if anything goes wrong
+ */
+ public static <I extends WritableComparable,
+ V extends Writable,
+ E extends Writable> void run(
+ GiraphConfiguration conf,
+ TestGraph<I, V, E> graph,
+ File tmpDir,
+ String checkpointsDir) throws Exception {
+ File zkDir = FileUtils.createTempDir(tmpDir, "_bspZooKeeper");
+ File zkMgrDir = FileUtils.createTempDir(tmpDir, "_defaultZkManagerDir");
+
+ if (checkpointsDir == null) {
+ checkpointsDir = FileUtils.
+ createTempDir(tmpDir, "_checkpoints").toString();
+ }
- conf.setVertexInputFormatClass(InMemoryVertexInputFormat.class);
+ conf.setVertexInputFormatClass(InMemoryVertexInputFormat.class);
- // Create and configure the job to run the vertex
- GiraphJob job = new GiraphJob(conf, conf.getComputationName());
+ // Create and configure the job to run the vertex
+ GiraphJob job = new GiraphJob(conf, conf.getComputationName());
- InMemoryVertexInputFormat.setGraph(graph);
+ InMemoryVertexInputFormat.setGraph(graph);
- int localZookeeperPort = findAvailablePort();
+ int localZookeeperPort = findAvailablePort();
- conf.setWorkerConfiguration(1, 1, 100.0f);
- GiraphConstants.SPLIT_MASTER_WORKER.set(conf, false);
- GiraphConstants.LOCAL_TEST_MODE.set(conf, true);
- GiraphConstants.ZOOKEEPER_LIST.set(conf, "localhost:" +
+ conf.setWorkerConfiguration(1, 1, 100.0f);
+ GiraphConstants.SPLIT_MASTER_WORKER.set(conf, false);
+ GiraphConstants.LOCAL_TEST_MODE.set(conf, true);
+ GiraphConstants.ZOOKEEPER_LIST.set(conf, "localhost:" +
String.valueOf(localZookeeperPort));
- conf.set(GiraphConstants.ZOOKEEPER_DIR, zkDir.toString());
- GiraphConstants.ZOOKEEPER_MANAGER_DIRECTORY.set(conf,
- zkMgrDir.toString());
- GiraphConstants.CHECKPOINT_DIRECTORY.set(conf, checkpointsDir.toString());
+ conf.set(GiraphConstants.ZOOKEEPER_DIR, zkDir.toString());
+ GiraphConstants.ZOOKEEPER_MANAGER_DIRECTORY.set(conf,
+ zkMgrDir.toString());
+ GiraphConstants.CHECKPOINT_DIRECTORY.set(conf, checkpointsDir);
+
+ // Configure a local zookeeper instance
+ Properties zkProperties = configLocalZooKeeper(zkDir, localZookeeperPort);
+
+ QuorumPeerConfig qpConfig = new QuorumPeerConfig();
+ qpConfig.parseProperties(zkProperties);
- // Configure a local zookeeper instance
- Properties zkProperties = configLocalZooKeeper(zkDir, localZookeeperPort);
+ runZooKeeperAndJob(qpConfig, job);
- QuorumPeerConfig qpConfig = new QuorumPeerConfig();
- qpConfig.parseProperties(zkProperties);
+ }
- runZooKeeperAndJob(qpConfig, job);
+ /**
+ * Attempts to run the vertex internally in the current JVM, reading and
+ * writing to an in-memory graph. Will start its own zookeeper
+ * instance.
+ *
+ * @param <I> Vertex ID
+ * @param <V> Vertex Value
+ * @param <E> Edge Value
+ * @param conf GiraphClasses specifying which types to use
+ * @param graph input graph
+ * @return Output graph
+ * @throws Exception if anything goes wrong
+ */
+ public static <I extends WritableComparable,
+ V extends Writable,
+ E extends Writable> TestGraph<I, V, E> runWithInMemoryOutput(
+ GiraphConfiguration conf,
+ TestGraph<I, V, E> graph) throws Exception {
+ // Prepare temporary folders
+ File tmpDir = FileUtils.createTestDir(conf.getComputationName());
+ try {
+ return runWithInMemoryOutput(conf, graph, tmpDir, null);
} finally {
FileUtils.delete(tmpDir);
}
@@ -287,6 +368,9 @@ public class InternalVertexRunner {
* @param <E> Edge Value
* @param conf GiraphClasses specifying which types to use
* @param graph input graph
+ * @param tmpDir file path for storing temporary files.
+ * @param checkpointsDir if set, will use this folder
+ * for storing checkpoints.
* @return Output graph
* @throws Exception if anything goes wrong
*/
@@ -294,10 +378,12 @@ public class InternalVertexRunner {
V extends Writable,
E extends Writable> TestGraph<I, V, E> runWithInMemoryOutput(
GiraphConfiguration conf,
- TestGraph<I, V, E> graph) throws Exception {
+ TestGraph<I, V, E> graph,
+ File tmpDir,
+ String checkpointsDir) throws Exception {
conf.setVertexOutputFormatClass(InMemoryVertexOutputFormat.class);
InMemoryVertexOutputFormat.initializeOutputGraph(conf);
- InternalVertexRunner.run(conf, graph);
+ InternalVertexRunner.run(conf, graph, tmpDir, checkpointsDir);
return InMemoryVertexOutputFormat.getOutputGraph();
}
http://git-wip-us.apache.org/repos/asf/giraph/blob/02d9e6c2/giraph-core/src/main/java/org/apache/giraph/utils/io/ExtendedDataInputOutput.java
----------------------------------------------------------------------
diff --git a/giraph-core/src/main/java/org/apache/giraph/utils/io/ExtendedDataInputOutput.java b/giraph-core/src/main/java/org/apache/giraph/utils/io/ExtendedDataInputOutput.java
index af45426..a5b1567 100644
--- a/giraph-core/src/main/java/org/apache/giraph/utils/io/ExtendedDataInputOutput.java
+++ b/giraph-core/src/main/java/org/apache/giraph/utils/io/ExtendedDataInputOutput.java
@@ -65,6 +65,6 @@ public class ExtendedDataInputOutput extends DataInputOutput {
@Override
public void readFields(DataInput in) throws IOException {
- WritableUtils.readExtendedDataOutput(in, conf);
+ dataOutput = WritableUtils.readExtendedDataOutput(in, conf);
}
}
http://git-wip-us.apache.org/repos/asf/giraph/blob/02d9e6c2/giraph-core/src/main/java/org/apache/giraph/worker/BspServiceWorker.java
----------------------------------------------------------------------
diff --git a/giraph-core/src/main/java/org/apache/giraph/worker/BspServiceWorker.java b/giraph-core/src/main/java/org/apache/giraph/worker/BspServiceWorker.java
index de7af28..0d90a59 100644
--- a/giraph-core/src/main/java/org/apache/giraph/worker/BspServiceWorker.java
+++ b/giraph-core/src/main/java/org/apache/giraph/worker/BspServiceWorker.java
@@ -70,10 +70,13 @@ import org.apache.giraph.utils.ProgressableUtils;
import org.apache.giraph.utils.WritableUtils;
import org.apache.giraph.zk.BspEvent;
import org.apache.giraph.zk.PredicateLock;
+import org.apache.hadoop.fs.FSDataInputStream;
import org.apache.hadoop.fs.FSDataOutputStream;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.Writable;
import org.apache.hadoop.io.WritableComparable;
+import org.apache.hadoop.io.compress.CompressionCodec;
+import org.apache.hadoop.io.compress.CompressionCodecFactory;
import org.apache.hadoop.mapreduce.Mapper;
import org.apache.hadoop.mapreduce.OutputCommitter;
import org.apache.log4j.Level;
@@ -92,9 +95,7 @@ import com.google.common.collect.Iterables;
import com.google.common.collect.Lists;
import net.iharder.Base64;
-import java.io.ByteArrayOutputStream;
import java.io.DataInputStream;
-import java.io.DataOutput;
import java.io.DataOutputStream;
import java.io.IOException;
import java.nio.charset.Charset;
@@ -110,6 +111,7 @@ import java.util.Queue;
import java.util.Set;
import java.util.concurrent.ArrayBlockingQueue;
import java.util.concurrent.Callable;
+import java.util.concurrent.ConcurrentLinkedQueue;
import java.util.concurrent.TimeUnit;
/**
@@ -566,7 +568,7 @@ public class BspServiceWorker<I extends WritableComparable,
Collection<? extends PartitionOwner> masterSetPartitionOwners =
startSuperstep();
workerGraphPartitioner.updatePartitionOwners(
- getWorkerInfo(), masterSetPartitionOwners, getPartitionStore());
+ getWorkerInfo(), masterSetPartitionOwners);
/*if[HADOOP_NON_SECURE]
workerClient.setup();
@@ -1370,73 +1372,42 @@ public class BspServiceWorker<I extends WritableComparable,
// Algorithm:
// For each partition, dump vertices and messages
Path metadataFilePath =
- new Path(getCheckpointBasePath(getSuperstep()) + "." +
- getHostnamePartitionId() +
- CHECKPOINT_METADATA_POSTFIX);
- Path verticesFilePath =
- new Path(getCheckpointBasePath(getSuperstep()) + "." +
- getHostnamePartitionId() +
- CHECKPOINT_VERTICES_POSTFIX);
+ createCheckpointFilePathSafe(CHECKPOINT_METADATA_POSTFIX);
Path validFilePath =
- new Path(getCheckpointBasePath(getSuperstep()) + "." +
- getHostnamePartitionId() +
- CHECKPOINT_VALID_POSTFIX);
+ createCheckpointFilePathSafe(CHECKPOINT_VALID_POSTFIX);
+ Path checkpointFilePath =
+ createCheckpointFilePathSafe(CHECKPOINT_DATA_POSTFIX);
- // Remove these files if they already exist (shouldn't though, unless
- // of previous failure of this worker)
- if (getFs().delete(validFilePath, false)) {
- LOG.warn("storeCheckpoint: Removed valid file " +
- validFilePath);
- }
- if (getFs().delete(metadataFilePath, false)) {
- LOG.warn("storeCheckpoint: Removed metadata file " +
- metadataFilePath);
- }
- if (getFs().delete(verticesFilePath, false)) {
- LOG.warn("storeCheckpoint: Removed file " + verticesFilePath);
- }
- FSDataOutputStream verticesOutputStream =
- getFs().create(verticesFilePath);
- ByteArrayOutputStream metadataByteStream = new ByteArrayOutputStream();
- DataOutput metadataOutput = new DataOutputStream(metadataByteStream);
- for (Integer partitionId : getPartitionStore().getPartitionIds()) {
- Partition<I, V, E> partition =
- getPartitionStore().getOrCreatePartition(partitionId);
- long startPos = verticesOutputStream.getPos();
- partition.write(verticesOutputStream);
- // write messages
- getServerData().getCurrentMessageStore().writePartition(
- verticesOutputStream, partition.getId());
- // Write the metadata for this partition
- // Format:
- // <index count>
- // <index 0 start pos><partition id>
- // <index 1 start pos><partition id>
- metadataOutput.writeLong(startPos);
- metadataOutput.writeInt(partition.getId());
- if (LOG.isDebugEnabled()) {
- LOG.debug("storeCheckpoint: Vertex file starting " +
- "offset = " + startPos + ", length = " +
- (verticesOutputStream.getPos() - startPos) +
- ", partition = " + partition.toString());
- }
- getPartitionStore().putPartition(partition);
- getContext().progress();
- }
// Metadata is buffered and written at the end since it's small and
// needs to know how many partitions this worker owns
FSDataOutputStream metadataOutputStream =
getFs().create(metadataFilePath);
metadataOutputStream.writeInt(getPartitionStore().getNumPartitions());
- metadataOutputStream.write(metadataByteStream.toByteArray());
+
+ for (Integer partitionId : getPartitionStore().getPartitionIds()) {
+ metadataOutputStream.writeInt(partitionId);
+ }
metadataOutputStream.close();
- verticesOutputStream.close();
- if (LOG.isInfoEnabled()) {
- LOG.info("storeCheckpoint: Finished metadata (" +
- metadataFilePath + ") and vertices (" + verticesFilePath + ").");
+
+ storeCheckpointVertices();
+
+ FSDataOutputStream checkpointOutputStream =
+ getFs().create(checkpointFilePath);
+ workerContext.write(checkpointOutputStream);
+ getContext().progress();
+
+ for (Integer partitionId : getPartitionStore().getPartitionIds()) {
+ // write messages
+ checkpointOutputStream.writeInt(partitionId);
+ getServerData().getCurrentMessageStore().writePartition(
+ checkpointOutputStream, partitionId);
+ getContext().progress();
+
}
+ checkpointOutputStream.close();
+
getFs().createNewFile(validFilePath);
// Notify master that checkpoint is stored
@@ -1462,116 +1433,247 @@ public class BspServiceWorker<I extends WritableComparable,
}
}
+ /**
+ * Create checkpoint file safely. If file already exists remove it first.
+ * @param name file extension
+ * @return full file path to newly created file
+ * @throws IOException
+ */
+ private Path createCheckpointFilePathSafe(String name) throws IOException {
+ Path validFilePath = new Path(getCheckpointBasePath(getSuperstep()) + "." +
+ getTaskPartition() + name);
+ // Remove these files if they already exist (shouldn't though, unless
+ // of previous failure of this worker)
+ if (getFs().delete(validFilePath, false)) {
+ LOG.warn("storeCheckpoint: Removed " + name + " file " +
+ validFilePath);
+ }
+ return validFilePath;
+ }
+
+ /**
+ * Returns path to saved checkpoint.
+ * Doesn't check if file actually exists.
+ * @param superstep saved superstep.
+ * @param name extension name
+ * @return fill file path to checkpoint file
+ */
+ private Path getSavedCheckpoint(long superstep, String name) {
+ return new Path(getSavedCheckpointBasePath(superstep) + "." +
+ getTaskPartition() + name);
+ }
+
+ /**
+ * Save partitions. To speed up this operation
+ * runs in multiple threads.
+ */
+ private void storeCheckpointVertices() {
+ final int numPartitions = getPartitionStore().getNumPartitions();
+ int numThreads = Math.min(
+ GiraphConstants.NUM_CHECKPOINT_IO_THREADS.get(getConfiguration()),
+ numPartitions);
+
+ final Queue<Integer> partitionIdQueue =
+ (numPartitions == 0) ? new LinkedList<Integer>() :
+ new ArrayBlockingQueue<Integer>(numPartitions);
+ Iterables.addAll(partitionIdQueue, getPartitionStore().getPartitionIds());
+
+ final CompressionCodec codec =
+ new CompressionCodecFactory(getConfiguration())
+ .getCodecByClassName(
+ GiraphConstants.CHECKPOINT_COMPRESSION_CODEC
+ .get(getConfiguration()));
+
+ long t0 = System.currentTimeMillis();
+
+ CallableFactory<Void> callableFactory = new CallableFactory<Void>() {
+ @Override
+ public Callable<Void> newCallable(int callableId) {
+ return new Callable<Void>() {
+
+ @Override
+ public Void call() throws Exception {
+ while (!partitionIdQueue.isEmpty()) {
+ Integer partitionId = partitionIdQueue.poll();
+ if (partitionId == null) {
+ break;
+ }
+ Path path =
+ createCheckpointFilePathSafe("_" + partitionId +
+ CHECKPOINT_VERTICES_POSTFIX);
+
+ FSDataOutputStream uncompressedStream =
+ getFs().create(path);
+
+
+ DataOutputStream stream = codec == null ? uncompressedStream :
+ new DataOutputStream(
+ codec.createOutputStream(uncompressedStream));
+
+ Partition<I, V, E> partition =
+ getPartitionStore().getOrCreatePartition(partitionId);
+
+ partition.write(stream);
+
+ getPartitionStore().putPartition(partition);
+
+ stream.close();
+ uncompressedStream.close();
+ }
+ return null;
+ }
+
+
+ };
+ }
+ };
+
+ ProgressableUtils.getResultsWithNCallables(callableFactory, numThreads,
+ "checkpoint-vertices-%d", getContext());
+
+ LOG.info("Save checkpoint in " + (System.currentTimeMillis() - t0) +
+ " ms, using " + numThreads + " threads");
+ }
+
+ /**
+ * Load saved partitions in multiple threads.
+ * @param superstep superstep to load
+ * @param partitions list of partitions to load
+ */
+ private void loadCheckpointVertices(final long superstep,
+ List<Integer> partitions) {
+ int numThreads = Math.min(
+ GiraphConstants.NUM_CHECKPOINT_IO_THREADS.get(getConfiguration()),
+ partitions.size());
+
+ final Queue<Integer> partitionIdQueue =
+ new ConcurrentLinkedQueue<>(partitions);
+
+ final CompressionCodec codec =
+ new CompressionCodecFactory(getConfiguration())
+ .getCodecByClassName(
+ GiraphConstants.CHECKPOINT_COMPRESSION_CODEC
+ .get(getConfiguration()));
+
+ long t0 = System.currentTimeMillis();
+
+ CallableFactory<Void> callableFactory = new CallableFactory<Void>() {
+ @Override
+ public Callable<Void> newCallable(int callableId) {
+ return new Callable<Void>() {
+
+ @Override
+ public Void call() throws Exception {
+ while (!partitionIdQueue.isEmpty()) {
+ Integer partitionId = partitionIdQueue.poll();
+ if (partitionId == null) {
+ break;
+ }
+ Path path =
+ getSavedCheckpoint(superstep, "_" + partitionId +
+ CHECKPOINT_VERTICES_POSTFIX);
+
+ FSDataInputStream compressedStream =
+ getFs().open(path);
+
+ DataInputStream stream = codec == null ? compressedStream :
+ new DataInputStream(
+ codec.createInputStream(compressedStream));
+
+ Partition<I, V, E> partition =
+ getConfiguration().createPartition(partitionId, getContext());
+
+ partition.readFields(stream);
+
+ getPartitionStore().addPartition(partition);
+
+ stream.close();
+ }
+ return null;
+ }
+
+ };
+ }
+ };
+
+ ProgressableUtils.getResultsWithNCallables(callableFactory, numThreads,
+ "load-vertices-%d", getContext());
+
+ LOG.info("Loaded checkpoint in " + (System.currentTimeMillis() - t0) +
+ " ms, using " + numThreads + " threads");
+ }
+
@Override
public VertexEdgeCount loadCheckpoint(long superstep) {
- try {
- // clear old message stores
- getServerData().getIncomingMessageStore().clearAll();
- getServerData().getCurrentMessageStore().clearAll();
- } catch (IOException e) {
- throw new RuntimeException(
- "loadCheckpoint: Failed to clear message stores ", e);
- }
+ Path metadataFilePath =
+ getSavedCheckpoint(superstep, CHECKPOINT_METADATA_POSTFIX);
+ Path checkpointFilePath =
+ getSavedCheckpoint(superstep, CHECKPOINT_DATA_POSTFIX);
// Algorithm:
// Examine all the partition owners and load the ones
// that match my hostname and id from the master designated checkpoint
// prefixes.
- long startPos = 0;
- int loadedPartitions = 0;
- for (PartitionOwner partitionOwner :
- workerGraphPartitioner.getPartitionOwners()) {
- if (partitionOwner.getWorkerInfo().equals(getWorkerInfo())) {
- String metadataFile =
- partitionOwner.getCheckpointFilesPrefix() +
- CHECKPOINT_METADATA_POSTFIX;
- String partitionsFile =
- partitionOwner.getCheckpointFilesPrefix() +
- CHECKPOINT_VERTICES_POSTFIX;
- try {
- int partitionId = -1;
- DataInputStream metadataStream =
- getFs().open(new Path(metadataFile));
- int partitions = metadataStream.readInt();
- for (int i = 0; i < partitions; ++i) {
- startPos = metadataStream.readLong();
- partitionId = metadataStream.readInt();
- if (partitionId == partitionOwner.getPartitionId()) {
- break;
- }
- }
- if (partitionId != partitionOwner.getPartitionId()) {
- throw new IllegalStateException(
- "loadCheckpoint: " + partitionOwner +
- " not found!");
- }
- metadataStream.close();
- Partition<I, V, E> partition =
- getConfiguration().createPartition(partitionId, getContext());
- DataInputStream partitionsStream =
- getFs().open(new Path(partitionsFile));
- if (partitionsStream.skip(startPos) != startPos) {
- throw new IllegalStateException(
- "loadCheckpoint: Failed to skip " + startPos +
- " on " + partitionsFile);
- }
- partition.readFields(partitionsStream);
- getServerData().getIncomingMessageStore().readFieldsForPartition(
- partitionsStream, partitionId);
- partitionsStream.close();
- if (LOG.isInfoEnabled()) {
- LOG.info("loadCheckpoint: Loaded partition " +
- partition);
- }
- if (getPartitionStore().hasPartition(partitionId)) {
- throw new IllegalStateException(
- "loadCheckpoint: Already has partition owner " +
- partitionOwner);
- }
- getPartitionStore().addPartition(partition);
- getContext().progress();
- ++loadedPartitions;
- } catch (IOException e) {
- throw new RuntimeException(
- "loadCheckpoint: Failed to get partition owner " +
- partitionOwner, e);
- }
+ try {
+ DataInputStream metadataStream =
+ getFs().open(metadataFilePath);
+
+ int partitions = metadataStream.readInt();
+ List<Integer> partitionIds = new ArrayList<>(partitions);
+ for (int i = 0; i < partitions; i++) {
+ int partitionId = metadataStream.readInt();
+ partitionIds.add(partitionId);
}
- }
- if (LOG.isInfoEnabled()) {
- LOG.info("loadCheckpoint: Loaded " + loadedPartitions +
- " partitions of out " +
- workerGraphPartitioner.getPartitionOwners().size() +
- " total.");
- }
- // Load global stats and superstep classes
- GlobalStats globalStats = new GlobalStats();
- SuperstepClasses superstepClasses = new SuperstepClasses();
- String finalizedCheckpointPath =
- getCheckpointBasePath(superstep) + CHECKPOINT_FINALIZED_POSTFIX;
- try {
+ loadCheckpointVertices(superstep, partitionIds);
+
+ getContext().progress();
+
+ metadataStream.close();
+
+ DataInputStream checkpointStream =
+ getFs().open(checkpointFilePath);
+ workerContext.readFields(checkpointStream);
+
+ // Load global stats and superstep classes
+ GlobalStats globalStats = new GlobalStats();
+ SuperstepClasses superstepClasses = new SuperstepClasses();
+ String finalizedCheckpointPath =
+ getSavedCheckpointBasePath(superstep) + CHECKPOINT_FINALIZED_POSTFIX;
DataInputStream finalizedStream =
getFs().open(new Path(finalizedCheckpointPath));
globalStats.readFields(finalizedStream);
superstepClasses.readFields(finalizedStream);
getConfiguration().updateSuperstepClasses(superstepClasses);
- } catch (IOException e) {
- throw new IllegalStateException(
- "loadCheckpoint: Failed to load global stats and superstep classes",
- e);
- }
+ getServerData().resetMessageStores();
- getServerData().prepareSuperstep();
- // Communication service needs to setup the connections prior to
- // processing vertices
+ for (int i = 0; i < partitions; i++) {
+ int partitionId = checkpointStream.readInt();
+ getServerData().getCurrentMessageStore().readFieldsForPartition(
+ checkpointStream, partitionId);
+ }
+ checkpointStream.close();
+
+ if (LOG.isInfoEnabled()) {
+ LOG.info("loadCheckpoint: Loaded " +
+ workerGraphPartitioner.getPartitionOwners().size() +
+ " total.");
+ }
+
+ // Communication service needs to setup the connections prior to
+ // processing vertices
/*if[HADOOP_NON_SECURE]
workerClient.setup();
else[HADOOP_NON_SECURE]*/
- workerClient.setup(getConfiguration().authenticate());
+ workerClient.setup(getConfiguration().authenticate());
/*end[HADOOP_NON_SECURE]*/
- return new VertexEdgeCount(globalStats.getVertexCount(),
- globalStats.getEdgeCount());
+ return new VertexEdgeCount(globalStats.getVertexCount(),
+ globalStats.getEdgeCount());
+
+ } catch (IOException e) {
+ throw new RuntimeException(
+ "loadCheckpoint: Failed for superstep=" + superstep, e);
+ }
}
/**
@@ -1651,7 +1753,7 @@ else[HADOOP_NON_SECURE]*/
// 5. Add the partitions to myself.
PartitionExchange partitionExchange =
workerGraphPartitioner.updatePartitionOwners(
- getWorkerInfo(), masterSetPartitionOwners, getPartitionStore());
+ getWorkerInfo(), masterSetPartitionOwners);
workerClient.openConnections();
Map<WorkerInfo, List<Integer>> sendWorkerPartitionMap =
http://git-wip-us.apache.org/repos/asf/giraph/blob/02d9e6c2/giraph-core/src/main/java/org/apache/giraph/worker/WorkerContext.java
----------------------------------------------------------------------
diff --git a/giraph-core/src/main/java/org/apache/giraph/worker/WorkerContext.java b/giraph-core/src/main/java/org/apache/giraph/worker/WorkerContext.java
index 29835c5..aca9944 100644
--- a/giraph-core/src/main/java/org/apache/giraph/worker/WorkerContext.java
+++ b/giraph-core/src/main/java/org/apache/giraph/worker/WorkerContext.java
@@ -25,6 +25,9 @@ import org.apache.giraph.graph.GraphState;
import org.apache.hadoop.io.Writable;
import org.apache.hadoop.mapreduce.Mapper;
+import java.io.DataInput;
+import java.io.DataOutput;
+import java.io.IOException;
import java.util.List;
/**
@@ -33,9 +36,8 @@ import java.util.List;
*/
@SuppressWarnings("rawtypes")
public abstract class WorkerContext
- extends DefaultImmutableClassesGiraphConfigurable
- implements WorkerAggregatorUsage {
-
+ extends DefaultImmutableClassesGiraphConfigurable
+ implements WorkerAggregatorUsage, Writable {
/** Global graph state */
private GraphState graphState;
/** Worker aggregator usage */
@@ -203,4 +205,12 @@ public abstract class WorkerContext
public <A extends Writable> A getAggregatedValue(String name) {
return workerAggregatorUsage.<A>getAggregatedValue(name);
}
+
+ @Override
+ public void write(DataOutput dataOutput) throws IOException {
+ }
+
+ @Override
+ public void readFields(DataInput dataInput) throws IOException {
+ }
}
http://git-wip-us.apache.org/repos/asf/giraph/blob/02d9e6c2/giraph-core/src/test/java/org/apache/giraph/partition/SimpleRangePartitionFactoryTest.java
----------------------------------------------------------------------
diff --git a/giraph-core/src/test/java/org/apache/giraph/partition/SimpleRangePartitionFactoryTest.java b/giraph-core/src/test/java/org/apache/giraph/partition/SimpleRangePartitionFactoryTest.java
index 96bd5d7..57bebbd 100644
--- a/giraph-core/src/test/java/org/apache/giraph/partition/SimpleRangePartitionFactoryTest.java
+++ b/giraph-core/src/test/java/org/apache/giraph/partition/SimpleRangePartitionFactoryTest.java
@@ -66,7 +66,7 @@ public class SimpleRangePartitionFactoryTest {
WorkerGraphPartitioner<LongWritable, Writable, Writable> workerPartitioner =
factory.createWorkerGraphPartitioner();
- workerPartitioner.updatePartitionOwners(null, owners, null);
+ workerPartitioner.updatePartitionOwners(null, owners);
LongWritable longWritable = new LongWritable();
int[] partitions = new int[keySpaceSize];
http://git-wip-us.apache.org/repos/asf/giraph/blob/02d9e6c2/giraph-examples/src/test/java/org/apache/giraph/TestCheckpointing.java
----------------------------------------------------------------------
diff --git a/giraph-examples/src/test/java/org/apache/giraph/TestCheckpointing.java b/giraph-examples/src/test/java/org/apache/giraph/TestCheckpointing.java
new file mode 100644
index 0000000..387b937
--- /dev/null
+++ b/giraph-examples/src/test/java/org/apache/giraph/TestCheckpointing.java
@@ -0,0 +1,266 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.giraph;
+
+import org.apache.giraph.aggregators.LongSumAggregator;
+import org.apache.giraph.conf.GiraphConfiguration;
+import org.apache.giraph.conf.GiraphConstants;
+import org.apache.giraph.edge.Edge;
+import org.apache.giraph.edge.EdgeFactory;
+import org.apache.giraph.examples.SimpleSuperstepComputation;
+import org.apache.giraph.graph.BasicComputation;
+import org.apache.giraph.graph.Vertex;
+import org.apache.giraph.job.GiraphJob;
+import org.apache.giraph.master.DefaultMasterCompute;
+import org.apache.giraph.worker.DefaultWorkerContext;
+import org.apache.hadoop.fs.FileStatus;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.io.FloatWritable;
+import org.apache.hadoop.io.IntWritable;
+import org.apache.hadoop.io.LongWritable;
+import org.apache.log4j.Logger;
+import org.junit.Test;
+
+import java.io.DataInput;
+import java.io.DataOutput;
+import java.io.IOException;
+
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertTrue;
+import static org.junit.Assert.fail;
+
+/**
+ * Tests that worker context and master computation
+ * are properly saved and loaded back at checkpoint.
+ */
+public class TestCheckpointing extends BspCase {
+
+ /** Class logger */
+ private static final Logger LOG =
+ Logger.getLogger(TestCheckpointing.class);
+ /** ID to be used with test job */
+ public static final String TEST_JOB_ID = "test_job";
+ /**
+ * Compute will double check that we don't run supersteps
+ * lesser than specified by this key. That way we ensure that
+ * computation actually restarted and not recalculated from the
+ * beginning.
+ */
+ public static final String KEY_MIN_SUPERSTEP = "minimum.superstep";
+
+ /**
+ * Create the test case
+ */
+ public TestCheckpointing() {
+ super(TestCheckpointing.class.getName());
+ }
+
+
+ @Test
+ public void testBspCheckpoint()
+ throws IOException, InterruptedException, ClassNotFoundException {
+ Path checkpointsDir = getTempPath("checkpointing");
+ Path outputPath = getTempPath(getCallingMethodName());
+ GiraphConfiguration conf = new GiraphConfiguration();
+ conf.setComputationClass(
+ CheckpointComputation.class);
+ conf.setWorkerContextClass(
+ CheckpointVertexWorkerContext.class);
+ conf.setMasterComputeClass(
+ CheckpointVertexMasterCompute.class);
+ conf.setVertexInputFormatClass(SimpleSuperstepComputation.SimpleSuperstepVertexInputFormat.class);
+ conf.setVertexOutputFormatClass(SimpleSuperstepComputation.SimpleSuperstepVertexOutputFormat.class);
+ conf.set("mapred.job.id", TEST_JOB_ID);
+ conf.set(KEY_MIN_SUPERSTEP, "0");
+ GiraphJob job = prepareJob(getCallingMethodName(), conf, outputPath);
+
+ GiraphConfiguration configuration = job.getConfiguration();
+ GiraphConstants.CHECKPOINT_DIRECTORY.set(configuration, checkpointsDir.toString());
+ GiraphConstants.CLEANUP_CHECKPOINTS_AFTER_SUCCESS.set(configuration, false);
+ configuration.setCheckpointFrequency(2);
+
+ assertTrue(job.run(true));
+
+ long idSum = 0;
+ if (!runningInDistributedMode()) {
+ FileStatus fileStatus = getSinglePartFileStatus(job.getConfiguration(),
+ outputPath);
+ idSum = CheckpointVertexWorkerContext
+ .getFinalSum();
+ LOG.info("testBspCheckpoint: idSum = " + idSum +
+ " fileLen = " + fileStatus.getLen());
+ }
+
+ // Restart the test from superstep 2
+ LOG.info("testBspCheckpoint: Restarting from superstep 2" +
+ " with checkpoint path = " + checkpointsDir);
+ outputPath = getTempPath("checkpointing_restarted");
+
+ GiraphConstants.RESTART_JOB_ID.set(conf, TEST_JOB_ID);
+ conf.set("mapred.job.id", "restarted_test_job");
+ conf.set(GiraphConstants.RESTART_SUPERSTEP, "2");
+ conf.set(KEY_MIN_SUPERSTEP, "2");
+
+ GiraphJob restartedJob = prepareJob(getCallingMethodName() + "Restarted",
+ conf, outputPath);
+
+ GiraphConstants.CHECKPOINT_DIRECTORY.set(restartedJob.getConfiguration(),
+ checkpointsDir.toString());
+
+ assertTrue(restartedJob.run(true));
+ if (!runningInDistributedMode()) {
+ long idSumRestarted =
+ CheckpointVertexWorkerContext
+ .getFinalSum();
+ LOG.info("testBspCheckpoint: idSumRestarted = " +
+ idSumRestarted);
+ assertEquals(idSum, idSumRestarted);
+ }
+ }
+
+
+ /**
+ * Actual computation.
+ */
+ public static class CheckpointComputation extends
+ BasicComputation<LongWritable, IntWritable, FloatWritable,
+ FloatWritable> {
+ @Override
+ public void compute(
+ Vertex<LongWritable, IntWritable, FloatWritable> vertex,
+ Iterable<FloatWritable> messages) throws IOException {
+ CheckpointVertexWorkerContext workerContext = getWorkerContext();
+ assertEquals(getSuperstep() + 1, workerContext.testValue);
+
+ if (getSuperstep() < getConf().getInt(KEY_MIN_SUPERSTEP, Integer.MAX_VALUE)){
+ fail("Should not be running compute on superstep " + getSuperstep());
+ }
+
+ if (getSuperstep() > 4) {
+ vertex.voteToHalt();
+ return;
+ }
+
+ aggregate(LongSumAggregator.class.getName(),
+ new LongWritable(vertex.getId().get()));
+
+ float msgValue = 0.0f;
+ for (FloatWritable message : messages) {
+ float curMsgValue = message.get();
+ msgValue += curMsgValue;
+ }
+
+ int vertexValue = vertex.getValue().get();
+ vertex.setValue(new IntWritable(vertexValue + (int) msgValue));
+ for (Edge<LongWritable, FloatWritable> edge : vertex.getEdges()) {
+ FloatWritable newEdgeValue = new FloatWritable(edge.getValue().get() +
+ (float) vertexValue);
+ Edge<LongWritable, FloatWritable> newEdge =
+ EdgeFactory.create(edge.getTargetVertexId(), newEdgeValue);
+ vertex.addEdge(newEdge);
+ sendMessage(edge.getTargetVertexId(), newEdgeValue);
+ }
+ }
+ }
+
+ /**
+ * Worker context associated.
+ */
+ public static class CheckpointVertexWorkerContext
+ extends DefaultWorkerContext {
+ /** User can access this after the application finishes if local */
+ private static long FINAL_SUM;
+
+ private int testValue;
+
+ public static long getFinalSum() {
+ return FINAL_SUM;
+ }
+
+ @Override
+ public void postApplication() {
+ setFinalSum(this.<LongWritable>getAggregatedValue(
+ LongSumAggregator.class.getName()).get());
+ LOG.info("FINAL_SUM=" + FINAL_SUM);
+ }
+
+ /**
+ * Set the final sum
+ *
+ * @param value sum
+ */
+ private static void setFinalSum(long value) {
+ FINAL_SUM = value;
+ }
+
+ @Override
+ public void preSuperstep() {
+ assertEquals(getSuperstep(), testValue++);
+ }
+
+ @Override
+ public void readFields(DataInput dataInput) throws IOException {
+ super.readFields(dataInput);
+ testValue = dataInput.readInt();
+ }
+
+ @Override
+ public void write(DataOutput dataOutput) throws IOException {
+ super.write(dataOutput);
+ dataOutput.writeInt(testValue);
+ }
+ }
+
+ /**
+ * Master compute
+ */
+ public static class CheckpointVertexMasterCompute extends
+ DefaultMasterCompute {
+
+ private int testValue = 0;
+
+ @Override
+ public void compute() {
+ long superstep = getSuperstep();
+ assertEquals(superstep, testValue++);
+ }
+
+ @Override
+ public void initialize() throws InstantiationException,
+ IllegalAccessException {
+ registerAggregator(LongSumAggregator.class.getName(),
+ LongSumAggregator.class);
+ }
+
+ @Override
+ public void readFields(DataInput in) throws IOException {
+ super.readFields(in);
+ testValue = in.readInt();
+ }
+
+ @Override
+ public void write(DataOutput out) throws IOException {
+ super.write(out);
+ out.writeInt(testValue);
+ }
+ }
+
+
+
+}