You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@giraph.apache.org by ma...@apache.org on 2014/07/16 19:30:59 UTC

git commit: updated refs/heads/trunk to 02d9e6c

Repository: giraph
Updated Branches:
  refs/heads/trunk 0a9017716 -> 02d9e6c25


GIRAPH-924: Fix checkpointing (edunov via majakabiljo)


Project: http://git-wip-us.apache.org/repos/asf/giraph/repo
Commit: http://git-wip-us.apache.org/repos/asf/giraph/commit/02d9e6c2
Tree: http://git-wip-us.apache.org/repos/asf/giraph/tree/02d9e6c2
Diff: http://git-wip-us.apache.org/repos/asf/giraph/diff/02d9e6c2

Branch: refs/heads/trunk
Commit: 02d9e6c2533a3cd108f5e6feaf40f26e95deb64c
Parents: 0a90177
Author: Maja Kabiljo <ma...@fb.com>
Authored: Wed Jul 16 10:30:30 2014 -0700
Committer: Maja Kabiljo <ma...@fb.com>
Committed: Wed Jul 16 10:30:30 2014 -0700

----------------------------------------------------------------------
 CHANGELOG                                       |   2 +
 .../giraph/aggregators/AggregatorWrapper.java   |   6 +-
 .../java/org/apache/giraph/bsp/BspService.java  |  24 ++
 .../java/org/apache/giraph/comm/ServerData.java |  17 +
 .../org/apache/giraph/conf/GiraphConstants.java |  19 +
 .../apache/giraph/master/BspServiceMaster.java  | 179 ++++----
 .../giraph/partition/BasicPartitionOwner.java   |  10 -
 .../giraph/partition/HashMasterPartitioner.java |   6 +
 .../giraph/partition/HashWorkerPartitioner.java |   5 +-
 .../partition/MasterGraphPartitioner.java       |   7 +
 .../giraph/partition/PartitionBalancer.java     |   4 +-
 .../apache/giraph/partition/PartitionOwner.java |  16 -
 .../partition/SimpleMasterPartitioner.java      |   6 +
 .../partition/SimpleWorkerPartitioner.java      |   6 +-
 .../partition/WorkerGraphPartitioner.java       |   5 +-
 .../giraph/utils/InternalVertexRunner.java      | 260 ++++++++----
 .../utils/io/ExtendedDataInputOutput.java       |   2 +-
 .../apache/giraph/worker/BspServiceWorker.java  | 404 ++++++++++++-------
 .../org/apache/giraph/worker/WorkerContext.java |  16 +-
 .../SimpleRangePartitionFactoryTest.java        |   2 +-
 .../org/apache/giraph/TestCheckpointing.java    | 266 ++++++++++++
 21 files changed, 882 insertions(+), 380 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/giraph/blob/02d9e6c2/CHANGELOG
----------------------------------------------------------------------
diff --git a/CHANGELOG b/CHANGELOG
index 7287490..4207339 100644
--- a/CHANGELOG
+++ b/CHANGELOG
@@ -1,6 +1,8 @@
 Giraph Change Log
 
 Release 1.1.0 - unreleased
+  GIRAPH-924: Fix checkpointing (edunov via majakabiljo)
+
   GIRAPH-921: Create ByteValueVertex to store vertex values as bytes without object instance (akyrola via majakabiljo)
 
   GIRAPH-929: setIfUnset for EnumConfOption (pavanka)  

http://git-wip-us.apache.org/repos/asf/giraph/blob/02d9e6c2/giraph-core/src/main/java/org/apache/giraph/aggregators/AggregatorWrapper.java
----------------------------------------------------------------------
diff --git a/giraph-core/src/main/java/org/apache/giraph/aggregators/AggregatorWrapper.java b/giraph-core/src/main/java/org/apache/giraph/aggregators/AggregatorWrapper.java
index 9613805..7150402 100644
--- a/giraph-core/src/main/java/org/apache/giraph/aggregators/AggregatorWrapper.java
+++ b/giraph-core/src/main/java/org/apache/giraph/aggregators/AggregatorWrapper.java
@@ -44,13 +44,9 @@ public class AggregatorWrapper<A extends Writable> {
    * @param persistent      False iff aggregator should be reset at the end of
    *                        each super step
    * @param conf            Configuration
-   * @throws IllegalAccessException
-   * @throws InstantiationException
    */
   public AggregatorWrapper(Class<? extends Aggregator<A>> aggregatorClass,
-      boolean persistent, ImmutableClassesGiraphConfiguration conf) throws
-      IllegalAccessException,
-      InstantiationException {
+      boolean persistent, ImmutableClassesGiraphConfiguration conf) {
     this.persistent = persistent;
     currentAggregator = ReflectionUtils.newInstance(aggregatorClass, conf);
     changed = false;

http://git-wip-us.apache.org/repos/asf/giraph/blob/02d9e6c2/giraph-core/src/main/java/org/apache/giraph/bsp/BspService.java
----------------------------------------------------------------------
diff --git a/giraph-core/src/main/java/org/apache/giraph/bsp/BspService.java b/giraph-core/src/main/java/org/apache/giraph/bsp/BspService.java
index 2e35373..02577b9 100644
--- a/giraph-core/src/main/java/org/apache/giraph/bsp/BspService.java
+++ b/giraph-core/src/main/java/org/apache/giraph/bsp/BspService.java
@@ -54,6 +54,7 @@ import java.util.Collections;
 import java.util.List;
 
 import static org.apache.giraph.conf.GiraphConstants.CHECKPOINT_DIRECTORY;
+import static org.apache.giraph.conf.GiraphConstants.RESTART_JOB_ID;
 
 /**
  * Zookeeper-based implementation of {@link CentralizedService}.
@@ -198,6 +199,11 @@ public abstract class BspService<I extends WritableComparable,
    */
   public static final String CHECKPOINT_VALID_POSTFIX = ".valid";
   /**
+   * If at the end of a checkpoint file,
+   * indicates that we store WorkerContext and aggregator handler data.
+   */
+  public static final String CHECKPOINT_DATA_POSTFIX = ".data";
+  /**
    * If at the end of a checkpoint file, indicates the stitched checkpoint
    * file prefixes.  A checkpoint is not valid if this file does not exist.
    */
@@ -226,6 +232,8 @@ public abstract class BspService<I extends WritableComparable,
   protected final String cleanedUpPath;
   /** Path to the checkpoint's root (including job id) */
   protected final String checkpointBasePath;
+  /** Old checkpoint in case we want to restart some job */
+  protected final String savedCheckpointBasePath;
   /** Path to the master election path */
   protected final String masterElectionPath;
   /** Stores progress info of this worker */
@@ -350,6 +358,12 @@ public abstract class BspService<I extends WritableComparable,
         EDGE_INPUT_SPLITS_ALL_READY_NODE, EDGE_INPUT_SPLITS_ALL_DONE_NODE);
     applicationAttemptsPath = basePath + APPLICATION_ATTEMPTS_DIR;
     cleanedUpPath = basePath + CLEANED_UP_DIR;
+
+    String restartJobId = RESTART_JOB_ID.get(conf);
+    savedCheckpointBasePath =
+        CHECKPOINT_DIRECTORY.getWithDefault(getConfiguration(),
+            CHECKPOINT_DIRECTORY.getDefaultValue() + "/" +
+                (restartJobId == null ? getJobId() : restartJobId));
     checkpointBasePath =
         CHECKPOINT_DIRECTORY.getWithDefault(getConfiguration(),
             CHECKPOINT_DIRECTORY.getDefaultValue() + "/" + getJobId());
@@ -575,6 +589,16 @@ public abstract class BspService<I extends WritableComparable,
   }
 
   /**
+   * In case when we restart another job this will give us a path
+   * to saved checkpoint.
+   * @param superstep superstep to use
+   * @return Direcory path for restarted job based on the superstep
+   */
+  public final String getSavedCheckpointBasePath(long superstep) {
+    return savedCheckpointBasePath + "/" + superstep;
+  }
+
+  /**
    * Get the checkpoint from a finalized checkpoint path
    *
    * @param finalizedPath Path of the finalized checkpoint

http://git-wip-us.apache.org/repos/asf/giraph/blob/02d9e6c2/giraph-core/src/main/java/org/apache/giraph/comm/ServerData.java
----------------------------------------------------------------------
diff --git a/giraph-core/src/main/java/org/apache/giraph/comm/ServerData.java b/giraph-core/src/main/java/org/apache/giraph/comm/ServerData.java
index 85bfe04..036510e 100644
--- a/giraph-core/src/main/java/org/apache/giraph/comm/ServerData.java
+++ b/giraph-core/src/main/java/org/apache/giraph/comm/ServerData.java
@@ -161,6 +161,23 @@ public class ServerData<I extends WritableComparable,
     return (MessageStore<I, M>) currentMessageStore;
   }
 
+  /**
+   * Re-initialize message stores.
+   * Discards old values if any.
+   * @throws IOException
+   */
+  public void resetMessageStores() throws IOException {
+    if (currentMessageStore != null) {
+      currentMessageStore.clearAll();
+      currentMessageStore = null;
+    }
+    if (incomingMessageStore != null) {
+      incomingMessageStore.clearAll();
+      incomingMessageStore = null;
+    }
+    prepareSuperstep();
+  }
+
   /** Prepare for next super step */
   public void prepareSuperstep() {
     if (currentMessageStore != null) {

http://git-wip-us.apache.org/repos/asf/giraph/blob/02d9e6c2/giraph-core/src/main/java/org/apache/giraph/conf/GiraphConstants.java
----------------------------------------------------------------------
diff --git a/giraph-core/src/main/java/org/apache/giraph/conf/GiraphConstants.java b/giraph-core/src/main/java/org/apache/giraph/conf/GiraphConstants.java
index 81c0e0b..3d16e9c 100644
--- a/giraph-core/src/main/java/org/apache/giraph/conf/GiraphConstants.java
+++ b/giraph-core/src/main/java/org/apache/giraph/conf/GiraphConstants.java
@@ -880,6 +880,13 @@ public interface GiraphConstants {
   String RESTART_SUPERSTEP = "giraph.restartSuperstep";
 
   /**
+   * If application is restarted manually we need to specify job ID
+   * to restart from.
+   */
+  StrConfOption RESTART_JOB_ID = new StrConfOption("giraph.restart.jobId",
+      null, "Which job ID should I try to restart?");
+
+  /**
    * Base ZNode for Giraph's state in the ZooKeeper cluster.  Must be a root
    * znode on the cluster beginning with "/"
    */
@@ -1124,5 +1131,17 @@ public interface GiraphConstants {
   IntConfOption HDFS_FILE_CREATION_RETRY_WAIT_MS =
       new IntConfOption("giraph.hdfs.file.creation.retry.wait.ms", 30_000,
           "Milliseconds to wait prior to retrying creation of an HDFS file");
+
+  /** Number of threads for writing and reading checkpoints */
+  IntConfOption NUM_CHECKPOINT_IO_THREADS =
+      new IntConfOption("giraph.checkpoint.io.threads", 8,
+          "Number of threads for writing and reading checkpoints");
+
+  /** Compression algorithm to be used for checkpointing */
+  StrConfOption CHECKPOINT_COMPRESSION_CODEC =
+      new StrConfOption("giraph.checkpoint.compression.codec",
+          "org.apache.hadoop.io.compress.DefaultCodec",
+          "Defines compression algorithm we will be using for " +
+              "storing checkpoint");
 }
 // CHECKSTYLE: resume InterfaceIsTypeCheck

http://git-wip-us.apache.org/repos/asf/giraph/blob/02d9e6c2/giraph-core/src/main/java/org/apache/giraph/master/BspServiceMaster.java
----------------------------------------------------------------------
diff --git a/giraph-core/src/main/java/org/apache/giraph/master/BspServiceMaster.java b/giraph-core/src/main/java/org/apache/giraph/master/BspServiceMaster.java
index 0275395..e129390 100644
--- a/giraph-core/src/main/java/org/apache/giraph/master/BspServiceMaster.java
+++ b/giraph-core/src/main/java/org/apache/giraph/master/BspServiceMaster.java
@@ -18,6 +18,8 @@
 
 package org.apache.giraph.master;
 
+import it.unimi.dsi.fastutil.ints.Int2ObjectMap;
+import it.unimi.dsi.fastutil.ints.Int2ObjectOpenHashMap;
 import org.apache.commons.io.FilenameUtils;
 import org.apache.giraph.bsp.ApplicationState;
 import org.apache.giraph.bsp.BspInputFormat;
@@ -42,6 +44,7 @@ import org.apache.giraph.io.GiraphInputFormat;
 import org.apache.giraph.graph.GraphTaskManager;
 import org.apache.giraph.io.MappingInputFormat;
 import org.apache.giraph.io.VertexInputFormat;
+import org.apache.giraph.partition.BasicPartitionOwner;
 import org.apache.giraph.partition.MasterGraphPartitioner;
 import org.apache.giraph.partition.PartitionOwner;
 import org.apache.giraph.partition.PartitionStats;
@@ -100,10 +103,8 @@ import java.util.Arrays;
 import java.util.Collection;
 import java.util.Collections;
 import java.util.Comparator;
-import java.util.HashMap;
 import java.util.HashSet;
 import java.util.List;
-import java.util.Map;
 import java.util.Set;
 import java.util.TreeSet;
 import java.util.concurrent.Callable;
@@ -272,6 +273,7 @@ public class BspServiceMaster<I extends WritableComparable,
           Ids.OPEN_ACL_UNSAFE,
           CreateMode.PERSISTENT_SEQUENTIAL,
           true);
+      LOG.info("setJobState: " + jobState);
     } catch (KeeperException.NodeExistsException e) {
       throw new IllegalStateException(
           "setJobState: Imposible that " +
@@ -740,20 +742,18 @@ public class BspServiceMaster<I extends WritableComparable,
    * finalized checkpoint file and setting it.
    *
    * @param superstep Checkpoint set to examine.
-   * @param partitionOwners Partition owners to modify with checkpoint
-   *        prefixes
    * @throws IOException
    * @throws InterruptedException
    * @throws KeeperException
+   * @return Collection of generated partition owners.
    */
-  private void prepareCheckpointRestart(
-    long superstep,
-    Collection<PartitionOwner> partitionOwners)
+  private Collection<PartitionOwner> prepareCheckpointRestart(long superstep)
     throws IOException, KeeperException, InterruptedException {
+    List<PartitionOwner> partitionOwners = new ArrayList<>();
     FileSystem fs = getFs();
-    List<Path> validMetadataPathList = new ArrayList<Path>();
     String finalizedCheckpointPath =
-        getCheckpointBasePath(superstep) + CHECKPOINT_FINALIZED_POSTFIX;
+        getSavedCheckpointBasePath(superstep) + CHECKPOINT_FINALIZED_POSTFIX;
+    LOG.info("Loading checkpoint from " + finalizedCheckpointPath);
     DataInputStream finalizedStream =
         fs.open(new Path(finalizedCheckpointPath));
     GlobalStats globalStats = new GlobalStats();
@@ -763,51 +763,46 @@ public class BspServiceMaster<I extends WritableComparable,
     superstepClasses.readFields(finalizedStream);
     getConfiguration().updateSuperstepClasses(superstepClasses);
     int prefixFileCount = finalizedStream.readInt();
-    for (int i = 0; i < prefixFileCount; ++i) {
-      String metadataFilePath =
-          finalizedStream.readUTF() + CHECKPOINT_METADATA_POSTFIX;
-      validMetadataPathList.add(new Path(metadataFilePath));
-    }
 
-    aggregatorHandler.readFields(finalizedStream);
-    masterCompute.readFields(finalizedStream);
-    finalizedStream.close();
 
-    Map<Integer, PartitionOwner> idOwnerMap =
-        new HashMap<Integer, PartitionOwner>();
-    for (PartitionOwner partitionOwner : partitionOwners) {
-      if (idOwnerMap.put(partitionOwner.getPartitionId(),
-          partitionOwner) != null) {
-        throw new IllegalStateException(
-            "prepareCheckpointRestart: Duplicate partition " +
-                partitionOwner);
-      }
+    Int2ObjectMap<WorkerInfo> workersMap = new Int2ObjectOpenHashMap<>();
+    for (WorkerInfo worker : chosenWorkerInfoList) {
+      workersMap.put(worker.getTaskId(), worker);
     }
-    // Reading the metadata files.  Simply assign each partition owner
-    // the correct file prefix based on the partition id.
-    for (Path metadataPath : validMetadataPathList) {
-      String checkpointFilePrefix = metadataPath.toString();
-      checkpointFilePrefix =
-          checkpointFilePrefix.substring(
-              0,
-              checkpointFilePrefix.length() -
-              CHECKPOINT_METADATA_POSTFIX.length());
-      DataInputStream metadataStream = fs.open(metadataPath);
+    String checkpointFile =
+        finalizedStream.readUTF();
+    for (int i = 0; i < prefixFileCount; ++i) {
+      int mrTaskId = finalizedStream.readInt();
+
+      DataInputStream metadataStream = fs.open(new Path(checkpointFile +
+          "." + mrTaskId + CHECKPOINT_METADATA_POSTFIX));
       long partitions = metadataStream.readInt();
-      for (long i = 0; i < partitions; ++i) {
-        long dataPos = metadataStream.readLong();
+      WorkerInfo worker = workersMap.get(mrTaskId);
+      for (long p = 0; p < partitions; ++p) {
         int partitionId = metadataStream.readInt();
-        PartitionOwner partitionOwner = idOwnerMap.get(partitionId);
-        if (LOG.isInfoEnabled()) {
-          LOG.info("prepareSuperstepRestart: File " + metadataPath +
-              " with position " + dataPos +
-              ", partition id = " + partitionId +
-              " assigned to " + partitionOwner);
-        }
-        partitionOwner.setCheckpointFilesPrefix(checkpointFilePrefix);
+        PartitionOwner partitionOwner = new BasicPartitionOwner(partitionId,
+            worker);
+        partitionOwners.add(partitionOwner);
+        LOG.info("prepareCheckpointRestart partitionId=" + partitionId +
+            " assigned to " + partitionOwner);
       }
       metadataStream.close();
     }
+    //Ordering appears to be important as of right now we rely on this ordering
+    //in WorkerGraphPartitioner
+    Collections.sort(partitionOwners, new Comparator<PartitionOwner>() {
+      @Override
+      public int compare(PartitionOwner p1, PartitionOwner p2) {
+        return Integer.compare(p1.getPartitionId(), p2.getPartitionId());
+      }
+    });
+
+
+    aggregatorHandler.readFields(finalizedStream);
+    masterCompute.readFields(finalizedStream);
+    finalizedStream.close();
+
+    return partitionOwners;
   }
 
   @Override
@@ -1085,11 +1080,9 @@ public class BspServiceMaster<I extends WritableComparable,
         getZkExt().getData(superstepFinishedNode, false, null));
 
     finalizedOutputStream.writeInt(chosenWorkerInfoList.size());
+    finalizedOutputStream.writeUTF(getCheckpointBasePath(superstep));
     for (WorkerInfo chosenWorkerInfo : chosenWorkerInfoList) {
-      String chosenWorkerInfoPrefix =
-          getCheckpointBasePath(superstep) + "." +
-              chosenWorkerInfo.getHostnameId();
-      finalizedOutputStream.writeUTF(chosenWorkerInfoPrefix);
+      finalizedOutputStream.writeInt(chosenWorkerInfo.getTaskId());
     }
     aggregatorHandler.write(finalizedOutputStream);
     masterCompute.write(finalizedOutputStream);
@@ -1104,18 +1097,10 @@ public class BspServiceMaster<I extends WritableComparable,
    * the workers will know how to do the exchange.  If this was a restarted
    * superstep, then make sure to provide information on where to find the
    * checkpoint file.
-   *
-   * @param allPartitionStatsList All partition stats
-   * @param chosenWorkerInfoList All the chosen worker infos
-   * @param masterGraphPartitioner Master graph partitioner
    */
-  private void assignPartitionOwners(
-      List<PartitionStats> allPartitionStatsList,
-      List<WorkerInfo> chosenWorkerInfoList,
-      MasterGraphPartitioner<I, V, E> masterGraphPartitioner) {
+  private void assignPartitionOwners() {
     Collection<PartitionOwner> partitionOwners;
-    if (getSuperstep() == INPUT_SUPERSTEP ||
-        getSuperstep() == getRestartedSuperstep()) {
+    if (getSuperstep() == INPUT_SUPERSTEP) {
       partitionOwners =
           masterGraphPartitioner.createInitialPartitionOwners(
               chosenWorkerInfoList, maxWorkers);
@@ -1123,23 +1108,10 @@ public class BspServiceMaster<I extends WritableComparable,
         throw new IllegalStateException(
             "assignAndExchangePartitions: No partition owners set");
       }
-    } else {
-      partitionOwners =
-          masterGraphPartitioner.generateChangedPartitionOwners(
-              allPartitionStatsList,
-              chosenWorkerInfoList,
-              maxWorkers,
-              getSuperstep());
-
-      PartitionUtils.analyzePartitionStats(partitionOwners,
-          allPartitionStatsList);
-    }
-    checkPartitions(masterGraphPartitioner.getCurrentPartitionOwners());
-
-    // If restarted, prepare the checkpoint restart
-    if (getRestartedSuperstep() == getSuperstep()) {
+    } else if (getRestartedSuperstep() == getSuperstep()) {
+      // If restarted, prepare the checkpoint restart
       try {
-        prepareCheckpointRestart(getSuperstep(), partitionOwners);
+        partitionOwners = prepareCheckpointRestart(getSuperstep());
       } catch (IOException e) {
         throw new IllegalStateException(
             "assignPartitionOwners: IOException on preparing", e);
@@ -1151,7 +1123,21 @@ public class BspServiceMaster<I extends WritableComparable,
             "assignPartitionOwners: InteruptedException on preparing",
             e);
       }
+      masterGraphPartitioner.setPartitionOwners(partitionOwners);
+    } else {
+      partitionOwners =
+          masterGraphPartitioner.generateChangedPartitionOwners(
+              allPartitionStatsList,
+              chosenWorkerInfoList,
+              maxWorkers,
+              getSuperstep());
+
+      PartitionUtils.analyzePartitionStats(partitionOwners,
+          allPartitionStatsList);
     }
+    checkPartitions(masterGraphPartitioner.getCurrentPartitionOwners());
+
+
 
     // There will be some exchange of partitions
     if (!partitionOwners.isEmpty()) {
@@ -1240,18 +1226,9 @@ public class BspServiceMaster<I extends WritableComparable,
     // 1. Remove all old input split data
     // 2. Increase the application attempt and set to the correct checkpoint
     // 3. Send command to all workers to restart their tasks
-    try {
-      getZkExt().deleteExt(vertexInputSplitsPaths.getPath(), -1,
-          true);
-      getZkExt().deleteExt(edgeInputSplitsPaths.getPath(), -1,
-          true);
-    } catch (InterruptedException e) {
-      throw new RuntimeException(
-          "restartFromCheckpoint: InterruptedException", e);
-    } catch (KeeperException e) {
-      throw new RuntimeException(
-          "restartFromCheckpoint: KeeperException", e);
-    }
+    zkDeleteNode(vertexInputSplitsPaths.getPath());
+    zkDeleteNode(edgeInputSplitsPaths.getPath());
+
     setApplicationAttempt(getApplicationAttempt() + 1);
     setCachedSuperstep(checkpoint);
     setRestartedSuperstep(checkpoint);
@@ -1261,6 +1238,26 @@ public class BspServiceMaster<I extends WritableComparable,
   }
 
   /**
+   * Safely removes node from zookeeper.
+   * Ignores if node is already removed. Can only throw runtime exception if
+   * anything wrong.
+   * @param path path to the node to be removed.
+   */
+  private void zkDeleteNode(String path) {
+    try {
+      getZkExt().deleteExt(path, -1, true);
+    } catch (KeeperException.NoNodeException e) {
+      LOG.info("zkDeleteNode: node has already been removed " + path);
+    } catch (InterruptedException e) {
+      throw new RuntimeException(
+          "zkDeleteNode: InterruptedException", e);
+    } catch (KeeperException e) {
+      throw new RuntimeException(
+          "zkDeleteNode: KeeperException", e);
+    }
+  }
+
+  /**
    * Only get the finalized checkpoint files
    */
   public static class FinalizedCheckpointPathFilter implements PathFilter {
@@ -1277,7 +1274,7 @@ public class BspServiceMaster<I extends WritableComparable,
     if (lastCheckpointedSuperstep == -1) {
       try {
         FileStatus[] fileStatusArray =
-            getFs().listStatus(new Path(checkpointBasePath),
+            getFs().listStatus(new Path(savedCheckpointBasePath),
                 new FinalizedCheckpointPathFilter());
         if (fileStatusArray == null) {
           return -1;
@@ -1582,9 +1579,7 @@ public class BspServiceMaster<I extends WritableComparable,
 
     GiraphStats.getInstance().
         getCurrentWorkers().setValue(chosenWorkerInfoList.size());
-    assignPartitionOwners(allPartitionStatsList,
-        chosenWorkerInfoList,
-        masterGraphPartitioner);
+    assignPartitionOwners();
 
     // We need to finalize aggregators from previous superstep (send them to
     // worker owners) after new worker assignments

http://git-wip-us.apache.org/repos/asf/giraph/blob/02d9e6c2/giraph-core/src/main/java/org/apache/giraph/partition/BasicPartitionOwner.java
----------------------------------------------------------------------
diff --git a/giraph-core/src/main/java/org/apache/giraph/partition/BasicPartitionOwner.java b/giraph-core/src/main/java/org/apache/giraph/partition/BasicPartitionOwner.java
index 545d1af..b6cf813 100644
--- a/giraph-core/src/main/java/org/apache/giraph/partition/BasicPartitionOwner.java
+++ b/giraph-core/src/main/java/org/apache/giraph/partition/BasicPartitionOwner.java
@@ -103,16 +103,6 @@ public class BasicPartitionOwner implements PartitionOwner,
   }
 
   @Override
-  public String getCheckpointFilesPrefix() {
-    return checkpointFilesPrefix;
-  }
-
-  @Override
-  public void setCheckpointFilesPrefix(String checkpointFilesPrefix) {
-    this.checkpointFilesPrefix = checkpointFilesPrefix;
-  }
-
-  @Override
   public void writeWithWorkerIds(DataOutput output) throws IOException {
     output.writeInt(partitionId);
     output.writeInt(workerInfo.getTaskId());

http://git-wip-us.apache.org/repos/asf/giraph/blob/02d9e6c2/giraph-core/src/main/java/org/apache/giraph/partition/HashMasterPartitioner.java
----------------------------------------------------------------------
diff --git a/giraph-core/src/main/java/org/apache/giraph/partition/HashMasterPartitioner.java b/giraph-core/src/main/java/org/apache/giraph/partition/HashMasterPartitioner.java
index 240687e..caede8c 100644
--- a/giraph-core/src/main/java/org/apache/giraph/partition/HashMasterPartitioner.java
+++ b/giraph-core/src/main/java/org/apache/giraph/partition/HashMasterPartitioner.java
@@ -18,6 +18,7 @@
 
 package org.apache.giraph.partition;
 
+import com.google.common.collect.Lists;
 import org.apache.giraph.conf.ImmutableClassesGiraphConfiguration;
 import org.apache.giraph.worker.WorkerInfo;
 import org.apache.hadoop.io.Writable;
@@ -75,6 +76,11 @@ public class HashMasterPartitioner<I extends WritableComparable,
   }
 
   @Override
+  public void setPartitionOwners(Collection<PartitionOwner> partitionOwners) {
+    this.partitionOwnerList = Lists.newArrayList(partitionOwners);
+  }
+
+  @Override
   public Collection<PartitionOwner> getCurrentPartitionOwners() {
     return partitionOwnerList;
   }

http://git-wip-us.apache.org/repos/asf/giraph/blob/02d9e6c2/giraph-core/src/main/java/org/apache/giraph/partition/HashWorkerPartitioner.java
----------------------------------------------------------------------
diff --git a/giraph-core/src/main/java/org/apache/giraph/partition/HashWorkerPartitioner.java b/giraph-core/src/main/java/org/apache/giraph/partition/HashWorkerPartitioner.java
index d833895..12aa417 100644
--- a/giraph-core/src/main/java/org/apache/giraph/partition/HashWorkerPartitioner.java
+++ b/giraph-core/src/main/java/org/apache/giraph/partition/HashWorkerPartitioner.java
@@ -65,10 +65,9 @@ public class HashWorkerPartitioner<I extends WritableComparable,
   @Override
   public PartitionExchange updatePartitionOwners(
       WorkerInfo myWorkerInfo,
-      Collection<? extends PartitionOwner> masterSetPartitionOwners,
-      PartitionStore<I, V, E> partitionStore) {
+      Collection<? extends PartitionOwner> masterSetPartitionOwners) {
     return PartitionBalancer.updatePartitionOwners(partitionOwnerList,
-        myWorkerInfo, masterSetPartitionOwners, partitionStore);
+        myWorkerInfo, masterSetPartitionOwners);
   }
 
   @Override

http://git-wip-us.apache.org/repos/asf/giraph/blob/02d9e6c2/giraph-core/src/main/java/org/apache/giraph/partition/MasterGraphPartitioner.java
----------------------------------------------------------------------
diff --git a/giraph-core/src/main/java/org/apache/giraph/partition/MasterGraphPartitioner.java b/giraph-core/src/main/java/org/apache/giraph/partition/MasterGraphPartitioner.java
index 50c750a..d2363fb 100644
--- a/giraph-core/src/main/java/org/apache/giraph/partition/MasterGraphPartitioner.java
+++ b/giraph-core/src/main/java/org/apache/giraph/partition/MasterGraphPartitioner.java
@@ -47,6 +47,13 @@ public interface MasterGraphPartitioner<I extends WritableComparable,
       Collection<WorkerInfo> availableWorkerInfos, int maxWorkers);
 
   /**
+   * Sets partition owners for the graph.
+   * Used then loading from checkpoint.
+   * @param partitionOwners assigned partition owners.
+   */
+  void setPartitionOwners(Collection<PartitionOwner> partitionOwners);
+
+  /**
    * After the worker stats have been merged to a single list, the master can
    * use this information to send commands to the workers for any
    * {@link Partition} changes. This protocol is specific to the

http://git-wip-us.apache.org/repos/asf/giraph/blob/02d9e6c2/giraph-core/src/main/java/org/apache/giraph/partition/PartitionBalancer.java
----------------------------------------------------------------------
diff --git a/giraph-core/src/main/java/org/apache/giraph/partition/PartitionBalancer.java b/giraph-core/src/main/java/org/apache/giraph/partition/PartitionBalancer.java
index 3454d62..0d8f3cf 100644
--- a/giraph-core/src/main/java/org/apache/giraph/partition/PartitionBalancer.java
+++ b/giraph-core/src/main/java/org/apache/giraph/partition/PartitionBalancer.java
@@ -309,14 +309,12 @@ public class PartitionBalancer {
    * @param myWorkerInfo Worker info
    * @param masterSetPartitionOwners Master set partition owners, received
    *        prior to beginning the superstep
-   * @param partitionStore Partition store for the given worker
    * @return Information for the partition exchange.
    */
   public static PartitionExchange updatePartitionOwners(
       List<PartitionOwner> partitionOwnerList,
       WorkerInfo myWorkerInfo,
-      Collection<? extends PartitionOwner> masterSetPartitionOwners,
-      PartitionStore partitionStore) {
+      Collection<? extends PartitionOwner> masterSetPartitionOwners) {
     partitionOwnerList.clear();
     partitionOwnerList.addAll(masterSetPartitionOwners);
 

http://git-wip-us.apache.org/repos/asf/giraph/blob/02d9e6c2/giraph-core/src/main/java/org/apache/giraph/partition/PartitionOwner.java
----------------------------------------------------------------------
diff --git a/giraph-core/src/main/java/org/apache/giraph/partition/PartitionOwner.java b/giraph-core/src/main/java/org/apache/giraph/partition/PartitionOwner.java
index 0ac74da..f303a09 100644
--- a/giraph-core/src/main/java/org/apache/giraph/partition/PartitionOwner.java
+++ b/giraph-core/src/main/java/org/apache/giraph/partition/PartitionOwner.java
@@ -69,22 +69,6 @@ public interface PartitionOwner extends Writable {
   void setPreviousWorkerInfo(WorkerInfo workerInfo);
 
   /**
-   * If this is a restarted checkpoint, the worker will use this information
-   * to determine where the checkpointed partition was stored on HDFS.
-   *
-   * @return Prefix of the checkpoint HDFS files for this partition, null if
-   *         this is not a restarted superstep.
-   */
-  String getCheckpointFilesPrefix();
-
-  /**
-   * Set the checkpoint files prefix.  Master uses this.
-   *
-   * @param checkpointFilesPrefix HDFS checkpoint file prefix
-   */
-  void setCheckpointFilesPrefix(String checkpointFilesPrefix);
-
-  /**
    * Write to the output, but don't serialize the whole WorkerInfo,
    * instead use just the task id
    *

http://git-wip-us.apache.org/repos/asf/giraph/blob/02d9e6c2/giraph-core/src/main/java/org/apache/giraph/partition/SimpleMasterPartitioner.java
----------------------------------------------------------------------
diff --git a/giraph-core/src/main/java/org/apache/giraph/partition/SimpleMasterPartitioner.java b/giraph-core/src/main/java/org/apache/giraph/partition/SimpleMasterPartitioner.java
index f128f34..7d4c1cb 100644
--- a/giraph-core/src/main/java/org/apache/giraph/partition/SimpleMasterPartitioner.java
+++ b/giraph-core/src/main/java/org/apache/giraph/partition/SimpleMasterPartitioner.java
@@ -22,6 +22,7 @@ import java.util.ArrayList;
 import java.util.Collection;
 import java.util.List;
 
+import com.google.common.collect.Lists;
 import org.apache.giraph.conf.ImmutableClassesGiraphConfiguration;
 import org.apache.giraph.worker.WorkerInfo;
 import org.apache.hadoop.io.Writable;
@@ -74,6 +75,11 @@ public abstract class SimpleMasterPartitioner<I extends WritableComparable,
   }
 
   @Override
+  public void setPartitionOwners(Collection<PartitionOwner> partitionOwners) {
+    partitionOwnerList = Lists.newArrayList(partitionOwners);
+  }
+
+  @Override
   public Collection<PartitionOwner> generateChangedPartitionOwners(
       Collection<PartitionStats> allPartitionStatsList,
       Collection<WorkerInfo> availableWorkers,

http://git-wip-us.apache.org/repos/asf/giraph/blob/02d9e6c2/giraph-core/src/main/java/org/apache/giraph/partition/SimpleWorkerPartitioner.java
----------------------------------------------------------------------
diff --git a/giraph-core/src/main/java/org/apache/giraph/partition/SimpleWorkerPartitioner.java b/giraph-core/src/main/java/org/apache/giraph/partition/SimpleWorkerPartitioner.java
index 3c0de44..0ee8d92 100644
--- a/giraph-core/src/main/java/org/apache/giraph/partition/SimpleWorkerPartitioner.java
+++ b/giraph-core/src/main/java/org/apache/giraph/partition/SimpleWorkerPartitioner.java
@@ -71,11 +71,9 @@ public abstract class SimpleWorkerPartitioner<I extends WritableComparable,
 
   @Override
   public PartitionExchange updatePartitionOwners(WorkerInfo myWorkerInfo,
-      Collection<? extends PartitionOwner> masterSetPartitionOwners,
-      PartitionStore<I, V, E> partitionStore) {
+      Collection<? extends PartitionOwner> masterSetPartitionOwners) {
     PartitionExchange exchange = PartitionBalancer.updatePartitionOwners(
-        partitionOwnerList, myWorkerInfo, masterSetPartitionOwners,
-        partitionStore);
+        partitionOwnerList, myWorkerInfo, masterSetPartitionOwners);
     extractAvailableWorkers();
     return exchange;
   }

http://git-wip-us.apache.org/repos/asf/giraph/blob/02d9e6c2/giraph-core/src/main/java/org/apache/giraph/partition/WorkerGraphPartitioner.java
----------------------------------------------------------------------
diff --git a/giraph-core/src/main/java/org/apache/giraph/partition/WorkerGraphPartitioner.java b/giraph-core/src/main/java/org/apache/giraph/partition/WorkerGraphPartitioner.java
index 004ea81..211fedb 100644
--- a/giraph-core/src/main/java/org/apache/giraph/partition/WorkerGraphPartitioner.java
+++ b/giraph-core/src/main/java/org/apache/giraph/partition/WorkerGraphPartitioner.java
@@ -75,14 +75,11 @@ public interface WorkerGraphPartitioner<I extends WritableComparable,
    * @param myWorkerInfo Worker info.
    * @param masterSetPartitionOwners Master set partition owners, received
    *        prior to beginning the superstep
-   * @param partitionStore Partition store for this worker
-   *        (can be used to fill the return map of partitions to send)
    * @return Information for the partition exchange.
    */
   PartitionExchange updatePartitionOwners(
       WorkerInfo myWorkerInfo,
-      Collection<? extends PartitionOwner> masterSetPartitionOwners,
-      PartitionStore<I, V, E> partitionStore);
+      Collection<? extends PartitionOwner> masterSetPartitionOwners);
 
   /**
    * Get a collection of the {@link PartitionOwner} objects.

http://git-wip-us.apache.org/repos/asf/giraph/blob/02d9e6c2/giraph-core/src/main/java/org/apache/giraph/utils/InternalVertexRunner.java
----------------------------------------------------------------------
diff --git a/giraph-core/src/main/java/org/apache/giraph/utils/InternalVertexRunner.java b/giraph-core/src/main/java/org/apache/giraph/utils/InternalVertexRunner.java
index 2c4606f..bb2865c 100644
--- a/giraph-core/src/main/java/org/apache/giraph/utils/InternalVertexRunner.java
+++ b/giraph-core/src/main/java/org/apache/giraph/utils/InternalVertexRunner.java
@@ -142,81 +142,107 @@ public class InternalVertexRunner {
       GiraphConfiguration conf,
       String[] vertexInputData,
       String[] edgeInputData) throws Exception {
-    File tmpDir = null;
+    // Prepare input file, output folder and temporary folders
+    File tmpDir = FileUtils.createTestDir(conf.getComputationName());
     try {
-      // Prepare input file, output folder and temporary folders
-      tmpDir = FileUtils.createTestDir(conf.getComputationName());
-
-      File vertexInputFile = null;
-      File edgeInputFile = null;
-      if (conf.hasVertexInputFormat()) {
-        vertexInputFile = FileUtils.createTempFile(tmpDir, "vertices.txt");
-      }
-      if (conf.hasEdgeInputFormat()) {
-        edgeInputFile = FileUtils.createTempFile(tmpDir, "edges.txt");
-      }
+      return run(conf, vertexInputData, edgeInputData, null, tmpDir);
+    } finally {
+      FileUtils.delete(tmpDir);
+    }
+  }
 
-      File outputDir = FileUtils.createTempDir(tmpDir, "output");
-      File zkDir = FileUtils.createTempDir(tmpDir, "_bspZooKeeper");
-      File zkMgrDir = FileUtils.createTempDir(tmpDir, "_defaultZkManagerDir");
-      File checkpointsDir = FileUtils.createTempDir(tmpDir, "_checkpoints");
+  /**
+   * Attempts to run the vertex internally in the current JVM, reading from and
+   * writing to a temporary folder on local disk. Will start its own zookeeper
+   * instance.
+   *
+   *
+   * @param conf GiraphClasses specifying which types to use
+   * @param vertexInputData linewise vertex input data
+   * @param edgeInputData linewise edge input data
+   * @param checkpointsDir if set, will use this folder
+   *                          for storing checkpoints.
+   * @param tmpDir file path for storing temporary files.
+   * @return linewise output data, or null if job fails
+   * @throws Exception if anything goes wrong
+   */
+  public static Iterable<String> run(
+      GiraphConfiguration conf,
+      String[] vertexInputData,
+      String[] edgeInputData,
+      String checkpointsDir,
+      File tmpDir) throws Exception {
+    File vertexInputFile = null;
+    File edgeInputFile = null;
+    if (conf.hasVertexInputFormat()) {
+      vertexInputFile = FileUtils.createTempFile(tmpDir, "vertices.txt");
+    }
+    if (conf.hasEdgeInputFormat()) {
+      edgeInputFile = FileUtils.createTempFile(tmpDir, "edges.txt");
+    }
 
-      // Write input data to disk
-      if (conf.hasVertexInputFormat()) {
-        FileUtils.writeLines(vertexInputFile, vertexInputData);
-      }
-      if (conf.hasEdgeInputFormat()) {
-        FileUtils.writeLines(edgeInputFile, edgeInputData);
-      }
+    File outputDir = FileUtils.createTempDir(tmpDir, "output");
+    File zkDir = FileUtils.createTempDir(tmpDir, "_bspZooKeeper");
+    File zkMgrDir = FileUtils.createTempDir(tmpDir, "_defaultZkManagerDir");
+    // Write input data to disk
+    if (conf.hasVertexInputFormat()) {
+      FileUtils.writeLines(vertexInputFile, vertexInputData);
+    }
+    if (conf.hasEdgeInputFormat()) {
+      FileUtils.writeLines(edgeInputFile, edgeInputData);
+    }
 
-      int localZookeeperPort = findAvailablePort();
+    int localZookeeperPort = findAvailablePort();
 
-      conf.setWorkerConfiguration(1, 1, 100.0f);
-      GiraphConstants.SPLIT_MASTER_WORKER.set(conf, false);
-      GiraphConstants.LOCAL_TEST_MODE.set(conf, true);
-      conf.setZookeeperList("localhost:" +
+    conf.setWorkerConfiguration(1, 1, 100.0f);
+    GiraphConstants.SPLIT_MASTER_WORKER.set(conf, false);
+    GiraphConstants.LOCAL_TEST_MODE.set(conf, true);
+    conf.setZookeeperList("localhost:" +
           String.valueOf(localZookeeperPort));
 
-      conf.set(GiraphConstants.ZOOKEEPER_DIR, zkDir.toString());
-      GiraphConstants.ZOOKEEPER_MANAGER_DIRECTORY.set(conf,
-          zkMgrDir.toString());
-      GiraphConstants.CHECKPOINT_DIRECTORY.set(conf, checkpointsDir.toString());
+    conf.set(GiraphConstants.ZOOKEEPER_DIR, zkDir.toString());
+    GiraphConstants.ZOOKEEPER_MANAGER_DIRECTORY.set(conf,
+        zkMgrDir.toString());
 
-      // Create and configure the job to run the vertex
-      GiraphJob job = new GiraphJob(conf, conf.getComputationName());
+    if (checkpointsDir == null) {
+      checkpointsDir = FileUtils.createTempDir(
+          tmpDir, "_checkpoints").toString();
+    }
+    GiraphConstants.CHECKPOINT_DIRECTORY.set(conf, checkpointsDir);
 
-      Job internalJob = job.getInternalJob();
-      if (conf.hasVertexInputFormat()) {
-        GiraphFileInputFormat.setVertexInputPath(internalJob.getConfiguration(),
-            new Path(vertexInputFile.toString()));
-      }
-      if (conf.hasEdgeInputFormat()) {
-        GiraphFileInputFormat.setEdgeInputPath(internalJob.getConfiguration(),
-            new Path(edgeInputFile.toString()));
-      }
-      FileOutputFormat.setOutputPath(job.getInternalJob(),
-          new Path(outputDir.toString()));
+    // Create and configure the job to run the vertex
+    GiraphJob job = new GiraphJob(conf, conf.getComputationName());
 
-      // Configure a local zookeeper instance
-      Properties zkProperties = configLocalZooKeeper(zkDir, localZookeeperPort);
+    Job internalJob = job.getInternalJob();
+    if (conf.hasVertexInputFormat()) {
+      GiraphFileInputFormat.setVertexInputPath(internalJob.getConfiguration(),
+          new Path(vertexInputFile.toString()));
+    }
+    if (conf.hasEdgeInputFormat()) {
+      GiraphFileInputFormat.setEdgeInputPath(internalJob.getConfiguration(),
+          new Path(edgeInputFile.toString()));
+    }
+    FileOutputFormat.setOutputPath(job.getInternalJob(),
+        new Path(outputDir.toString()));
 
-      QuorumPeerConfig qpConfig = new QuorumPeerConfig();
-      qpConfig.parseProperties(zkProperties);
+    // Configure a local zookeeper instance
+    Properties zkProperties = configLocalZooKeeper(zkDir, localZookeeperPort);
 
-      boolean success = runZooKeeperAndJob(qpConfig, job);
-      if (!success) {
-        return null;
-      }
+    QuorumPeerConfig qpConfig = new QuorumPeerConfig();
+    qpConfig.parseProperties(zkProperties);
 
-      File outFile = new File(outputDir, "part-m-00000");
-      if (conf.hasVertexOutputFormat() && outFile.canRead()) {
-        return Files.readLines(outFile, Charsets.UTF_8);
-      } else {
-        return ImmutableList.of();
-      }
-    } finally {
-      FileUtils.delete(tmpDir);
+    boolean success = runZooKeeperAndJob(qpConfig, job);
+    if (!success) {
+      return null;
     }
+
+    File outFile = new File(outputDir, "part-m-00000");
+    if (conf.hasVertexOutputFormat() && outFile.canRead()) {
+      return Files.readLines(outFile, Charsets.UTF_8);
+    } else {
+      return ImmutableList.of();
+    }
+
   }
 
   /**
@@ -236,42 +262,97 @@ public class InternalVertexRunner {
       E extends Writable> void run(
       GiraphConfiguration conf,
       TestGraph<I, V, E> graph) throws Exception {
-    File tmpDir = null;
+    // Prepare temporary folders
+    File tmpDir = FileUtils.createTestDir(conf.getComputationName());
     try {
-      // Prepare temporary folders
-      tmpDir = FileUtils.createTestDir(conf.getComputationName());
+      run(conf, graph, tmpDir, null);
+    } finally {
+      FileUtils.delete(tmpDir);
+    }
+  }
 
-      File zkDir = FileUtils.createTempDir(tmpDir, "_bspZooKeeper");
-      File zkMgrDir = FileUtils.createTempDir(tmpDir, "_defaultZkManagerDir");
-      File checkpointsDir = FileUtils.createTempDir(tmpDir, "_checkpoints");
+  /**
+   * Attempts to run the vertex internally in the current JVM,
+   * reading from an in-memory graph. Will start its own zookeeper
+   * instance.
+   *
+   * @param <I> Vertex ID
+   * @param <V> Vertex Value
+   * @param <E> Edge Value
+   * @param conf GiraphClasses specifying which types to use
+   * @param graph input graph
+   * @param tmpDir file path for storing temporary files.
+   * @param checkpointsDir if set, will use this folder
+   *                          for storing checkpoints.
+   * @throws Exception if anything goes wrong
+   */
+  public static <I extends WritableComparable,
+      V extends Writable,
+      E extends Writable> void run(
+      GiraphConfiguration conf,
+      TestGraph<I, V, E> graph,
+      File tmpDir,
+      String checkpointsDir) throws Exception {
+    File zkDir = FileUtils.createTempDir(tmpDir, "_bspZooKeeper");
+    File zkMgrDir = FileUtils.createTempDir(tmpDir, "_defaultZkManagerDir");
+
+    if (checkpointsDir == null) {
+      checkpointsDir = FileUtils.
+          createTempDir(tmpDir, "_checkpoints").toString();
+    }
 
-      conf.setVertexInputFormatClass(InMemoryVertexInputFormat.class);
+    conf.setVertexInputFormatClass(InMemoryVertexInputFormat.class);
 
-      // Create and configure the job to run the vertex
-      GiraphJob job = new GiraphJob(conf, conf.getComputationName());
+    // Create and configure the job to run the vertex
+    GiraphJob job = new GiraphJob(conf, conf.getComputationName());
 
-      InMemoryVertexInputFormat.setGraph(graph);
+    InMemoryVertexInputFormat.setGraph(graph);
 
-      int localZookeeperPort = findAvailablePort();
+    int localZookeeperPort = findAvailablePort();
 
-      conf.setWorkerConfiguration(1, 1, 100.0f);
-      GiraphConstants.SPLIT_MASTER_WORKER.set(conf, false);
-      GiraphConstants.LOCAL_TEST_MODE.set(conf, true);
-      GiraphConstants.ZOOKEEPER_LIST.set(conf, "localhost:" +
+    conf.setWorkerConfiguration(1, 1, 100.0f);
+    GiraphConstants.SPLIT_MASTER_WORKER.set(conf, false);
+    GiraphConstants.LOCAL_TEST_MODE.set(conf, true);
+    GiraphConstants.ZOOKEEPER_LIST.set(conf, "localhost:" +
           String.valueOf(localZookeeperPort));
 
-      conf.set(GiraphConstants.ZOOKEEPER_DIR, zkDir.toString());
-      GiraphConstants.ZOOKEEPER_MANAGER_DIRECTORY.set(conf,
-          zkMgrDir.toString());
-      GiraphConstants.CHECKPOINT_DIRECTORY.set(conf, checkpointsDir.toString());
+    conf.set(GiraphConstants.ZOOKEEPER_DIR, zkDir.toString());
+    GiraphConstants.ZOOKEEPER_MANAGER_DIRECTORY.set(conf,
+        zkMgrDir.toString());
+    GiraphConstants.CHECKPOINT_DIRECTORY.set(conf, checkpointsDir);
+
+    // Configure a local zookeeper instance
+    Properties zkProperties = configLocalZooKeeper(zkDir, localZookeeperPort);
+
+    QuorumPeerConfig qpConfig = new QuorumPeerConfig();
+    qpConfig.parseProperties(zkProperties);
 
-      // Configure a local zookeeper instance
-      Properties zkProperties = configLocalZooKeeper(zkDir, localZookeeperPort);
+    runZooKeeperAndJob(qpConfig, job);
 
-      QuorumPeerConfig qpConfig = new QuorumPeerConfig();
-      qpConfig.parseProperties(zkProperties);
+  }
 
-      runZooKeeperAndJob(qpConfig, job);
+  /**
+   * Attempts to run the vertex internally in the current JVM, reading and
+   * writing to an in-memory graph. Will start its own zookeeper
+   * instance.
+   *
+   * @param <I> Vertex ID
+   * @param <V> Vertex Value
+   * @param <E> Edge Value
+   * @param conf GiraphClasses specifying which types to use
+   * @param graph input graph
+   * @return Output graph
+   * @throws Exception if anything goes wrong
+   */
+  public static <I extends WritableComparable,
+      V extends Writable,
+      E extends Writable> TestGraph<I, V, E> runWithInMemoryOutput(
+      GiraphConfiguration conf,
+      TestGraph<I, V, E> graph) throws Exception {
+    // Prepare temporary folders
+    File tmpDir = FileUtils.createTestDir(conf.getComputationName());
+    try {
+      return runWithInMemoryOutput(conf, graph, tmpDir, null);
     } finally {
       FileUtils.delete(tmpDir);
     }
@@ -287,6 +368,9 @@ public class InternalVertexRunner {
    * @param <E> Edge Value
    * @param conf GiraphClasses specifying which types to use
    * @param graph input graph
+   * @param tmpDir file path for storing temporary files.
+   * @param checkpointsDir if set, will use this folder
+   *                       for storing checkpoints.
    * @return Output graph
    * @throws Exception if anything goes wrong
    */
@@ -294,10 +378,12 @@ public class InternalVertexRunner {
       V extends Writable,
       E extends Writable> TestGraph<I, V, E> runWithInMemoryOutput(
       GiraphConfiguration conf,
-      TestGraph<I, V, E> graph) throws Exception {
+      TestGraph<I, V, E> graph,
+      File tmpDir,
+      String checkpointsDir) throws Exception {
     conf.setVertexOutputFormatClass(InMemoryVertexOutputFormat.class);
     InMemoryVertexOutputFormat.initializeOutputGraph(conf);
-    InternalVertexRunner.run(conf, graph);
+    InternalVertexRunner.run(conf, graph, tmpDir, checkpointsDir);
     return InMemoryVertexOutputFormat.getOutputGraph();
   }
 

http://git-wip-us.apache.org/repos/asf/giraph/blob/02d9e6c2/giraph-core/src/main/java/org/apache/giraph/utils/io/ExtendedDataInputOutput.java
----------------------------------------------------------------------
diff --git a/giraph-core/src/main/java/org/apache/giraph/utils/io/ExtendedDataInputOutput.java b/giraph-core/src/main/java/org/apache/giraph/utils/io/ExtendedDataInputOutput.java
index af45426..a5b1567 100644
--- a/giraph-core/src/main/java/org/apache/giraph/utils/io/ExtendedDataInputOutput.java
+++ b/giraph-core/src/main/java/org/apache/giraph/utils/io/ExtendedDataInputOutput.java
@@ -65,6 +65,6 @@ public class ExtendedDataInputOutput extends DataInputOutput {
 
   @Override
   public void readFields(DataInput in) throws IOException {
-    WritableUtils.readExtendedDataOutput(in, conf);
+    dataOutput = WritableUtils.readExtendedDataOutput(in, conf);
   }
 }

http://git-wip-us.apache.org/repos/asf/giraph/blob/02d9e6c2/giraph-core/src/main/java/org/apache/giraph/worker/BspServiceWorker.java
----------------------------------------------------------------------
diff --git a/giraph-core/src/main/java/org/apache/giraph/worker/BspServiceWorker.java b/giraph-core/src/main/java/org/apache/giraph/worker/BspServiceWorker.java
index de7af28..0d90a59 100644
--- a/giraph-core/src/main/java/org/apache/giraph/worker/BspServiceWorker.java
+++ b/giraph-core/src/main/java/org/apache/giraph/worker/BspServiceWorker.java
@@ -70,10 +70,13 @@ import org.apache.giraph.utils.ProgressableUtils;
 import org.apache.giraph.utils.WritableUtils;
 import org.apache.giraph.zk.BspEvent;
 import org.apache.giraph.zk.PredicateLock;
+import org.apache.hadoop.fs.FSDataInputStream;
 import org.apache.hadoop.fs.FSDataOutputStream;
 import org.apache.hadoop.fs.Path;
 import org.apache.hadoop.io.Writable;
 import org.apache.hadoop.io.WritableComparable;
+import org.apache.hadoop.io.compress.CompressionCodec;
+import org.apache.hadoop.io.compress.CompressionCodecFactory;
 import org.apache.hadoop.mapreduce.Mapper;
 import org.apache.hadoop.mapreduce.OutputCommitter;
 import org.apache.log4j.Level;
@@ -92,9 +95,7 @@ import com.google.common.collect.Iterables;
 import com.google.common.collect.Lists;
 import net.iharder.Base64;
 
-import java.io.ByteArrayOutputStream;
 import java.io.DataInputStream;
-import java.io.DataOutput;
 import java.io.DataOutputStream;
 import java.io.IOException;
 import java.nio.charset.Charset;
@@ -110,6 +111,7 @@ import java.util.Queue;
 import java.util.Set;
 import java.util.concurrent.ArrayBlockingQueue;
 import java.util.concurrent.Callable;
+import java.util.concurrent.ConcurrentLinkedQueue;
 import java.util.concurrent.TimeUnit;
 
 /**
@@ -566,7 +568,7 @@ public class BspServiceWorker<I extends WritableComparable,
     Collection<? extends PartitionOwner> masterSetPartitionOwners =
         startSuperstep();
     workerGraphPartitioner.updatePartitionOwners(
-        getWorkerInfo(), masterSetPartitionOwners, getPartitionStore());
+        getWorkerInfo(), masterSetPartitionOwners);
 
     /*if[HADOOP_NON_SECURE]
       workerClient.setup();
@@ -1370,73 +1372,42 @@ public class BspServiceWorker<I extends WritableComparable,
     // Algorithm:
     // For each partition, dump vertices and messages
     Path metadataFilePath =
-        new Path(getCheckpointBasePath(getSuperstep()) + "." +
-            getHostnamePartitionId() +
-            CHECKPOINT_METADATA_POSTFIX);
-    Path verticesFilePath =
-        new Path(getCheckpointBasePath(getSuperstep()) + "." +
-            getHostnamePartitionId() +
-            CHECKPOINT_VERTICES_POSTFIX);
+        createCheckpointFilePathSafe(CHECKPOINT_METADATA_POSTFIX);
     Path validFilePath =
-        new Path(getCheckpointBasePath(getSuperstep()) + "." +
-            getHostnamePartitionId() +
-            CHECKPOINT_VALID_POSTFIX);
+        createCheckpointFilePathSafe(CHECKPOINT_VALID_POSTFIX);
+    Path checkpointFilePath =
+        createCheckpointFilePathSafe(CHECKPOINT_DATA_POSTFIX);
 
-    // Remove these files if they already exist (shouldn't though, unless
-    // of previous failure of this worker)
-    if (getFs().delete(validFilePath, false)) {
-      LOG.warn("storeCheckpoint: Removed valid file " +
-          validFilePath);
-    }
-    if (getFs().delete(metadataFilePath, false)) {
-      LOG.warn("storeCheckpoint: Removed metadata file " +
-          metadataFilePath);
-    }
-    if (getFs().delete(verticesFilePath, false)) {
-      LOG.warn("storeCheckpoint: Removed file " + verticesFilePath);
-    }
 
-    FSDataOutputStream verticesOutputStream =
-        getFs().create(verticesFilePath);
-    ByteArrayOutputStream metadataByteStream = new ByteArrayOutputStream();
-    DataOutput metadataOutput = new DataOutputStream(metadataByteStream);
-    for (Integer partitionId : getPartitionStore().getPartitionIds()) {
-      Partition<I, V, E> partition =
-          getPartitionStore().getOrCreatePartition(partitionId);
-      long startPos = verticesOutputStream.getPos();
-      partition.write(verticesOutputStream);
-      // write messages
-      getServerData().getCurrentMessageStore().writePartition(
-          verticesOutputStream, partition.getId());
-      // Write the metadata for this partition
-      // Format:
-      // <index count>
-      //   <index 0 start pos><partition id>
-      //   <index 1 start pos><partition id>
-      metadataOutput.writeLong(startPos);
-      metadataOutput.writeInt(partition.getId());
-      if (LOG.isDebugEnabled()) {
-        LOG.debug("storeCheckpoint: Vertex file starting " +
-            "offset = " + startPos + ", length = " +
-            (verticesOutputStream.getPos() - startPos) +
-            ", partition = " + partition.toString());
-      }
-      getPartitionStore().putPartition(partition);
-      getContext().progress();
-    }
     // Metadata is buffered and written at the end since it's small and
     // needs to know how many partitions this worker owns
     FSDataOutputStream metadataOutputStream =
         getFs().create(metadataFilePath);
     metadataOutputStream.writeInt(getPartitionStore().getNumPartitions());
-    metadataOutputStream.write(metadataByteStream.toByteArray());
+
+    for (Integer partitionId : getPartitionStore().getPartitionIds()) {
+      metadataOutputStream.writeInt(partitionId);
+    }
     metadataOutputStream.close();
-    verticesOutputStream.close();
-    if (LOG.isInfoEnabled()) {
-      LOG.info("storeCheckpoint: Finished metadata (" +
-          metadataFilePath + ") and vertices (" + verticesFilePath + ").");
+
+    storeCheckpointVertices();
+
+    FSDataOutputStream checkpointOutputStream =
+        getFs().create(checkpointFilePath);
+    workerContext.write(checkpointOutputStream);
+    getContext().progress();
+
+    for (Integer partitionId : getPartitionStore().getPartitionIds()) {
+      // write messages
+      checkpointOutputStream.writeInt(partitionId);
+      getServerData().getCurrentMessageStore().writePartition(
+          checkpointOutputStream, partitionId);
+      getContext().progress();
+
     }
 
+    checkpointOutputStream.close();
+
     getFs().createNewFile(validFilePath);
 
     // Notify master that checkpoint is stored
@@ -1462,116 +1433,247 @@ public class BspServiceWorker<I extends WritableComparable,
     }
   }
 
+  /**
+   * Create checkpoint file safely. If file already exists remove it first.
+   * @param name file extension
+   * @return full file path to newly created file
+   * @throws IOException
+   */
+  private Path createCheckpointFilePathSafe(String name) throws IOException {
+    Path validFilePath = new Path(getCheckpointBasePath(getSuperstep()) + "." +
+        getTaskPartition() + name);
+    // Remove these files if they already exist (shouldn't though, unless
+    // of previous failure of this worker)
+    if (getFs().delete(validFilePath, false)) {
+      LOG.warn("storeCheckpoint: Removed " + name + " file " +
+          validFilePath);
+    }
+    return validFilePath;
+  }
+
+  /**
+   * Returns path to saved checkpoint.
+   * Doesn't check if file actually exists.
+   * @param superstep saved superstep.
+   * @param name extension name
+   * @return fill file path to checkpoint file
+   */
+  private Path getSavedCheckpoint(long superstep, String name) {
+    return new Path(getSavedCheckpointBasePath(superstep) + "." +
+        getTaskPartition() + name);
+  }
+
+  /**
+   * Save partitions. To speed up this operation
+   * runs in multiple threads.
+   */
+  private void storeCheckpointVertices() {
+    final int numPartitions = getPartitionStore().getNumPartitions();
+    int numThreads = Math.min(
+        GiraphConstants.NUM_CHECKPOINT_IO_THREADS.get(getConfiguration()),
+        numPartitions);
+
+    final Queue<Integer> partitionIdQueue =
+        (numPartitions == 0) ? new LinkedList<Integer>() :
+            new ArrayBlockingQueue<Integer>(numPartitions);
+    Iterables.addAll(partitionIdQueue, getPartitionStore().getPartitionIds());
+
+    final CompressionCodec codec =
+        new CompressionCodecFactory(getConfiguration())
+            .getCodecByClassName(
+                GiraphConstants.CHECKPOINT_COMPRESSION_CODEC
+                    .get(getConfiguration()));
+
+    long t0 = System.currentTimeMillis();
+
+    CallableFactory<Void> callableFactory = new CallableFactory<Void>() {
+      @Override
+      public Callable<Void> newCallable(int callableId) {
+        return new Callable<Void>() {
+
+          @Override
+          public Void call() throws Exception {
+            while (!partitionIdQueue.isEmpty()) {
+              Integer partitionId = partitionIdQueue.poll();
+              if (partitionId == null) {
+                break;
+              }
+              Path path =
+                  createCheckpointFilePathSafe("_" + partitionId +
+                      CHECKPOINT_VERTICES_POSTFIX);
+
+              FSDataOutputStream uncompressedStream =
+                  getFs().create(path);
+
+
+              DataOutputStream stream = codec == null ? uncompressedStream :
+                  new DataOutputStream(
+                      codec.createOutputStream(uncompressedStream));
+
+              Partition<I, V, E> partition =
+                  getPartitionStore().getOrCreatePartition(partitionId);
+
+              partition.write(stream);
+
+              getPartitionStore().putPartition(partition);
+
+              stream.close();
+              uncompressedStream.close();
+            }
+            return null;
+          }
+
+
+        };
+      }
+    };
+
+    ProgressableUtils.getResultsWithNCallables(callableFactory, numThreads,
+        "checkpoint-vertices-%d", getContext());
+
+    LOG.info("Save checkpoint in " + (System.currentTimeMillis() - t0) +
+        " ms, using " + numThreads + " threads");
+  }
+
+  /**
+   * Load saved partitions in multiple threads.
+   * @param superstep superstep to load
+   * @param partitions list of partitions to load
+   */
+  private void loadCheckpointVertices(final long superstep,
+                                      List<Integer> partitions) {
+    int numThreads = Math.min(
+        GiraphConstants.NUM_CHECKPOINT_IO_THREADS.get(getConfiguration()),
+        partitions.size());
+
+    final Queue<Integer> partitionIdQueue =
+        new ConcurrentLinkedQueue<>(partitions);
+
+    final CompressionCodec codec =
+        new CompressionCodecFactory(getConfiguration())
+            .getCodecByClassName(
+                GiraphConstants.CHECKPOINT_COMPRESSION_CODEC
+                    .get(getConfiguration()));
+
+    long t0 = System.currentTimeMillis();
+
+    CallableFactory<Void> callableFactory = new CallableFactory<Void>() {
+      @Override
+      public Callable<Void> newCallable(int callableId) {
+        return new Callable<Void>() {
+
+          @Override
+          public Void call() throws Exception {
+            while (!partitionIdQueue.isEmpty()) {
+              Integer partitionId = partitionIdQueue.poll();
+              if (partitionId == null) {
+                break;
+              }
+              Path path =
+                  getSavedCheckpoint(superstep, "_" + partitionId +
+                      CHECKPOINT_VERTICES_POSTFIX);
+
+              FSDataInputStream compressedStream =
+                  getFs().open(path);
+
+              DataInputStream stream = codec == null ? compressedStream :
+                  new DataInputStream(
+                      codec.createInputStream(compressedStream));
+
+              Partition<I, V, E> partition =
+                  getConfiguration().createPartition(partitionId, getContext());
+
+              partition.readFields(stream);
+
+              getPartitionStore().addPartition(partition);
+
+              stream.close();
+            }
+            return null;
+          }
+
+        };
+      }
+    };
+
+    ProgressableUtils.getResultsWithNCallables(callableFactory, numThreads,
+        "load-vertices-%d", getContext());
+
+    LOG.info("Loaded checkpoint in " + (System.currentTimeMillis() - t0) +
+        " ms, using " + numThreads + " threads");
+  }
+
   @Override
   public VertexEdgeCount loadCheckpoint(long superstep) {
-    try {
-      // clear old message stores
-      getServerData().getIncomingMessageStore().clearAll();
-      getServerData().getCurrentMessageStore().clearAll();
-    } catch (IOException e) {
-      throw new RuntimeException(
-          "loadCheckpoint: Failed to clear message stores ", e);
-    }
+    Path metadataFilePath =
+        getSavedCheckpoint(superstep, CHECKPOINT_METADATA_POSTFIX);
 
+    Path checkpointFilePath =
+        getSavedCheckpoint(superstep, CHECKPOINT_DATA_POSTFIX);
     // Algorithm:
     // Examine all the partition owners and load the ones
     // that match my hostname and id from the master designated checkpoint
     // prefixes.
-    long startPos = 0;
-    int loadedPartitions = 0;
-    for (PartitionOwner partitionOwner :
-      workerGraphPartitioner.getPartitionOwners()) {
-      if (partitionOwner.getWorkerInfo().equals(getWorkerInfo())) {
-        String metadataFile =
-            partitionOwner.getCheckpointFilesPrefix() +
-            CHECKPOINT_METADATA_POSTFIX;
-        String partitionsFile =
-            partitionOwner.getCheckpointFilesPrefix() +
-            CHECKPOINT_VERTICES_POSTFIX;
-        try {
-          int partitionId = -1;
-          DataInputStream metadataStream =
-              getFs().open(new Path(metadataFile));
-          int partitions = metadataStream.readInt();
-          for (int i = 0; i < partitions; ++i) {
-            startPos = metadataStream.readLong();
-            partitionId = metadataStream.readInt();
-            if (partitionId == partitionOwner.getPartitionId()) {
-              break;
-            }
-          }
-          if (partitionId != partitionOwner.getPartitionId()) {
-            throw new IllegalStateException(
-                "loadCheckpoint: " + partitionOwner +
-                " not found!");
-          }
-          metadataStream.close();
-          Partition<I, V, E> partition =
-              getConfiguration().createPartition(partitionId, getContext());
-          DataInputStream partitionsStream =
-              getFs().open(new Path(partitionsFile));
-          if (partitionsStream.skip(startPos) != startPos) {
-            throw new IllegalStateException(
-                "loadCheckpoint: Failed to skip " + startPos +
-                " on " + partitionsFile);
-          }
-          partition.readFields(partitionsStream);
-          getServerData().getIncomingMessageStore().readFieldsForPartition(
-              partitionsStream, partitionId);
-          partitionsStream.close();
-          if (LOG.isInfoEnabled()) {
-            LOG.info("loadCheckpoint: Loaded partition " +
-                partition);
-          }
-          if (getPartitionStore().hasPartition(partitionId)) {
-            throw new IllegalStateException(
-                "loadCheckpoint: Already has partition owner " +
-                    partitionOwner);
-          }
-          getPartitionStore().addPartition(partition);
-          getContext().progress();
-          ++loadedPartitions;
-        } catch (IOException e) {
-          throw new RuntimeException(
-              "loadCheckpoint: Failed to get partition owner " +
-                  partitionOwner, e);
-        }
+    try {
+      DataInputStream metadataStream =
+          getFs().open(metadataFilePath);
+
+      int partitions = metadataStream.readInt();
+      List<Integer> partitionIds = new ArrayList<>(partitions);
+      for (int i = 0; i < partitions; i++) {
+        int partitionId = metadataStream.readInt();
+        partitionIds.add(partitionId);
       }
-    }
-    if (LOG.isInfoEnabled()) {
-      LOG.info("loadCheckpoint: Loaded " + loadedPartitions +
-          " partitions of out " +
-          workerGraphPartitioner.getPartitionOwners().size() +
-          " total.");
-    }
 
-    // Load global stats and superstep classes
-    GlobalStats globalStats = new GlobalStats();
-    SuperstepClasses superstepClasses = new SuperstepClasses();
-    String finalizedCheckpointPath =
-        getCheckpointBasePath(superstep) + CHECKPOINT_FINALIZED_POSTFIX;
-    try {
+      loadCheckpointVertices(superstep, partitionIds);
+
+      getContext().progress();
+
+      metadataStream.close();
+
+      DataInputStream checkpointStream =
+          getFs().open(checkpointFilePath);
+      workerContext.readFields(checkpointStream);
+
+      // Load global stats and superstep classes
+      GlobalStats globalStats = new GlobalStats();
+      SuperstepClasses superstepClasses = new SuperstepClasses();
+      String finalizedCheckpointPath =
+          getSavedCheckpointBasePath(superstep) + CHECKPOINT_FINALIZED_POSTFIX;
       DataInputStream finalizedStream =
           getFs().open(new Path(finalizedCheckpointPath));
       globalStats.readFields(finalizedStream);
       superstepClasses.readFields(finalizedStream);
       getConfiguration().updateSuperstepClasses(superstepClasses);
-    } catch (IOException e) {
-      throw new IllegalStateException(
-          "loadCheckpoint: Failed to load global stats and superstep classes",
-          e);
-    }
+      getServerData().resetMessageStores();
 
-    getServerData().prepareSuperstep();
-    // Communication service needs to setup the connections prior to
-    // processing vertices
+      for (int i = 0; i < partitions; i++) {
+        int partitionId = checkpointStream.readInt();
+        getServerData().getCurrentMessageStore().readFieldsForPartition(
+            checkpointStream, partitionId);
+      }
+      checkpointStream.close();
+
+      if (LOG.isInfoEnabled()) {
+        LOG.info("loadCheckpoint: Loaded " +
+            workerGraphPartitioner.getPartitionOwners().size() +
+            " total.");
+      }
+
+      // Communication service needs to setup the connections prior to
+      // processing vertices
 /*if[HADOOP_NON_SECURE]
     workerClient.setup();
 else[HADOOP_NON_SECURE]*/
-    workerClient.setup(getConfiguration().authenticate());
+      workerClient.setup(getConfiguration().authenticate());
 /*end[HADOOP_NON_SECURE]*/
-    return new VertexEdgeCount(globalStats.getVertexCount(),
-        globalStats.getEdgeCount());
+      return new VertexEdgeCount(globalStats.getVertexCount(),
+          globalStats.getEdgeCount());
+
+    } catch (IOException e) {
+      throw new RuntimeException(
+          "loadCheckpoint: Failed for superstep=" + superstep, e);
+    }
   }
 
   /**
@@ -1651,7 +1753,7 @@ else[HADOOP_NON_SECURE]*/
     // 5. Add the partitions to myself.
     PartitionExchange partitionExchange =
         workerGraphPartitioner.updatePartitionOwners(
-            getWorkerInfo(), masterSetPartitionOwners, getPartitionStore());
+            getWorkerInfo(), masterSetPartitionOwners);
     workerClient.openConnections();
 
     Map<WorkerInfo, List<Integer>> sendWorkerPartitionMap =

http://git-wip-us.apache.org/repos/asf/giraph/blob/02d9e6c2/giraph-core/src/main/java/org/apache/giraph/worker/WorkerContext.java
----------------------------------------------------------------------
diff --git a/giraph-core/src/main/java/org/apache/giraph/worker/WorkerContext.java b/giraph-core/src/main/java/org/apache/giraph/worker/WorkerContext.java
index 29835c5..aca9944 100644
--- a/giraph-core/src/main/java/org/apache/giraph/worker/WorkerContext.java
+++ b/giraph-core/src/main/java/org/apache/giraph/worker/WorkerContext.java
@@ -25,6 +25,9 @@ import org.apache.giraph.graph.GraphState;
 import org.apache.hadoop.io.Writable;
 import org.apache.hadoop.mapreduce.Mapper;
 
+import java.io.DataInput;
+import java.io.DataOutput;
+import java.io.IOException;
 import java.util.List;
 
 /**
@@ -33,9 +36,8 @@ import java.util.List;
  */
 @SuppressWarnings("rawtypes")
 public abstract class WorkerContext
-    extends DefaultImmutableClassesGiraphConfigurable
-    implements WorkerAggregatorUsage {
-
+  extends DefaultImmutableClassesGiraphConfigurable
+  implements WorkerAggregatorUsage, Writable {
   /** Global graph state */
   private GraphState graphState;
   /** Worker aggregator usage */
@@ -203,4 +205,12 @@ public abstract class WorkerContext
   public <A extends Writable> A getAggregatedValue(String name) {
     return workerAggregatorUsage.<A>getAggregatedValue(name);
   }
+
+  @Override
+  public void write(DataOutput dataOutput) throws IOException {
+  }
+
+  @Override
+  public void readFields(DataInput dataInput) throws IOException {
+  }
 }

http://git-wip-us.apache.org/repos/asf/giraph/blob/02d9e6c2/giraph-core/src/test/java/org/apache/giraph/partition/SimpleRangePartitionFactoryTest.java
----------------------------------------------------------------------
diff --git a/giraph-core/src/test/java/org/apache/giraph/partition/SimpleRangePartitionFactoryTest.java b/giraph-core/src/test/java/org/apache/giraph/partition/SimpleRangePartitionFactoryTest.java
index 96bd5d7..57bebbd 100644
--- a/giraph-core/src/test/java/org/apache/giraph/partition/SimpleRangePartitionFactoryTest.java
+++ b/giraph-core/src/test/java/org/apache/giraph/partition/SimpleRangePartitionFactoryTest.java
@@ -66,7 +66,7 @@ public class SimpleRangePartitionFactoryTest {
 
     WorkerGraphPartitioner<LongWritable, Writable, Writable> workerPartitioner =
         factory.createWorkerGraphPartitioner();
-    workerPartitioner.updatePartitionOwners(null, owners, null);
+    workerPartitioner.updatePartitionOwners(null, owners);
     LongWritable longWritable = new LongWritable();
 
     int[] partitions = new int[keySpaceSize];

http://git-wip-us.apache.org/repos/asf/giraph/blob/02d9e6c2/giraph-examples/src/test/java/org/apache/giraph/TestCheckpointing.java
----------------------------------------------------------------------
diff --git a/giraph-examples/src/test/java/org/apache/giraph/TestCheckpointing.java b/giraph-examples/src/test/java/org/apache/giraph/TestCheckpointing.java
new file mode 100644
index 0000000..387b937
--- /dev/null
+++ b/giraph-examples/src/test/java/org/apache/giraph/TestCheckpointing.java
@@ -0,0 +1,266 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.giraph;
+
+import org.apache.giraph.aggregators.LongSumAggregator;
+import org.apache.giraph.conf.GiraphConfiguration;
+import org.apache.giraph.conf.GiraphConstants;
+import org.apache.giraph.edge.Edge;
+import org.apache.giraph.edge.EdgeFactory;
+import org.apache.giraph.examples.SimpleSuperstepComputation;
+import org.apache.giraph.graph.BasicComputation;
+import org.apache.giraph.graph.Vertex;
+import org.apache.giraph.job.GiraphJob;
+import org.apache.giraph.master.DefaultMasterCompute;
+import org.apache.giraph.worker.DefaultWorkerContext;
+import org.apache.hadoop.fs.FileStatus;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.io.FloatWritable;
+import org.apache.hadoop.io.IntWritable;
+import org.apache.hadoop.io.LongWritable;
+import org.apache.log4j.Logger;
+import org.junit.Test;
+
+import java.io.DataInput;
+import java.io.DataOutput;
+import java.io.IOException;
+
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertTrue;
+import static org.junit.Assert.fail;
+
+/**
+ * Tests that worker context and master computation
+ * are properly saved and loaded back at checkpoint.
+ */
+public class TestCheckpointing extends BspCase {
+
+  /** Class logger */
+  private static final Logger LOG =
+      Logger.getLogger(TestCheckpointing.class);
+  /** ID to be used with test job */
+  public static final String TEST_JOB_ID = "test_job";
+  /**
+   * Compute will double check that we don't run supersteps
+   * lesser than specified by this key. That way we ensure that
+   * computation actually restarted and not recalculated from the
+   * beginning.
+   */
+  public static final String KEY_MIN_SUPERSTEP = "minimum.superstep";
+
+  /**
+   * Create the test case
+   */
+  public TestCheckpointing() {
+    super(TestCheckpointing.class.getName());
+  }
+
+
+  @Test
+  public void testBspCheckpoint()
+      throws IOException, InterruptedException, ClassNotFoundException {
+    Path checkpointsDir = getTempPath("checkpointing");
+    Path outputPath = getTempPath(getCallingMethodName());
+    GiraphConfiguration conf = new GiraphConfiguration();
+    conf.setComputationClass(
+        CheckpointComputation.class);
+    conf.setWorkerContextClass(
+        CheckpointVertexWorkerContext.class);
+    conf.setMasterComputeClass(
+        CheckpointVertexMasterCompute.class);
+    conf.setVertexInputFormatClass(SimpleSuperstepComputation.SimpleSuperstepVertexInputFormat.class);
+    conf.setVertexOutputFormatClass(SimpleSuperstepComputation.SimpleSuperstepVertexOutputFormat.class);
+    conf.set("mapred.job.id", TEST_JOB_ID);
+    conf.set(KEY_MIN_SUPERSTEP, "0");
+    GiraphJob job = prepareJob(getCallingMethodName(), conf, outputPath);
+
+    GiraphConfiguration configuration = job.getConfiguration();
+    GiraphConstants.CHECKPOINT_DIRECTORY.set(configuration, checkpointsDir.toString());
+    GiraphConstants.CLEANUP_CHECKPOINTS_AFTER_SUCCESS.set(configuration, false);
+    configuration.setCheckpointFrequency(2);
+
+    assertTrue(job.run(true));
+
+    long idSum = 0;
+    if (!runningInDistributedMode()) {
+      FileStatus fileStatus = getSinglePartFileStatus(job.getConfiguration(),
+          outputPath);
+      idSum = CheckpointVertexWorkerContext
+          .getFinalSum();
+      LOG.info("testBspCheckpoint: idSum = " + idSum +
+          " fileLen = " + fileStatus.getLen());
+    }
+
+    // Restart the test from superstep 2
+    LOG.info("testBspCheckpoint: Restarting from superstep 2" +
+        " with checkpoint path = " + checkpointsDir);
+    outputPath = getTempPath("checkpointing_restarted");
+
+    GiraphConstants.RESTART_JOB_ID.set(conf, TEST_JOB_ID);
+    conf.set("mapred.job.id", "restarted_test_job");
+    conf.set(GiraphConstants.RESTART_SUPERSTEP, "2");
+    conf.set(KEY_MIN_SUPERSTEP, "2");
+
+    GiraphJob restartedJob = prepareJob(getCallingMethodName() + "Restarted",
+        conf, outputPath);
+
+    GiraphConstants.CHECKPOINT_DIRECTORY.set(restartedJob.getConfiguration(),
+        checkpointsDir.toString());
+
+    assertTrue(restartedJob.run(true));
+    if (!runningInDistributedMode()) {
+      long idSumRestarted =
+          CheckpointVertexWorkerContext
+              .getFinalSum();
+      LOG.info("testBspCheckpoint: idSumRestarted = " +
+          idSumRestarted);
+      assertEquals(idSum, idSumRestarted);
+    }
+  }
+
+
+  /**
+   * Actual computation.
+   */
+  public static class CheckpointComputation extends
+      BasicComputation<LongWritable, IntWritable, FloatWritable,
+          FloatWritable> {
+    @Override
+    public void compute(
+        Vertex<LongWritable, IntWritable, FloatWritable> vertex,
+        Iterable<FloatWritable> messages) throws IOException {
+      CheckpointVertexWorkerContext workerContext = getWorkerContext();
+      assertEquals(getSuperstep() + 1, workerContext.testValue);
+
+      if (getSuperstep() < getConf().getInt(KEY_MIN_SUPERSTEP, Integer.MAX_VALUE)){
+        fail("Should not be running compute on superstep " + getSuperstep());
+      }
+
+      if (getSuperstep() > 4) {
+        vertex.voteToHalt();
+        return;
+      }
+
+      aggregate(LongSumAggregator.class.getName(),
+          new LongWritable(vertex.getId().get()));
+
+      float msgValue = 0.0f;
+      for (FloatWritable message : messages) {
+        float curMsgValue = message.get();
+        msgValue += curMsgValue;
+      }
+
+      int vertexValue = vertex.getValue().get();
+      vertex.setValue(new IntWritable(vertexValue + (int) msgValue));
+      for (Edge<LongWritable, FloatWritable> edge : vertex.getEdges()) {
+        FloatWritable newEdgeValue = new FloatWritable(edge.getValue().get() +
+            (float) vertexValue);
+        Edge<LongWritable, FloatWritable> newEdge =
+            EdgeFactory.create(edge.getTargetVertexId(), newEdgeValue);
+        vertex.addEdge(newEdge);
+        sendMessage(edge.getTargetVertexId(), newEdgeValue);
+      }
+    }
+  }
+
+  /**
+   * Worker context associated.
+   */
+  public static class CheckpointVertexWorkerContext
+      extends DefaultWorkerContext {
+    /** User can access this after the application finishes if local */
+    private static long FINAL_SUM;
+
+    private int testValue;
+
+    public static long getFinalSum() {
+      return FINAL_SUM;
+    }
+
+    @Override
+    public void postApplication() {
+      setFinalSum(this.<LongWritable>getAggregatedValue(
+          LongSumAggregator.class.getName()).get());
+      LOG.info("FINAL_SUM=" + FINAL_SUM);
+    }
+
+    /**
+     * Set the final sum
+     *
+     * @param value sum
+     */
+    private static void setFinalSum(long value) {
+      FINAL_SUM = value;
+    }
+
+    @Override
+    public void preSuperstep() {
+      assertEquals(getSuperstep(), testValue++);
+    }
+
+    @Override
+    public void readFields(DataInput dataInput) throws IOException {
+      super.readFields(dataInput);
+      testValue = dataInput.readInt();
+    }
+
+    @Override
+    public void write(DataOutput dataOutput) throws IOException {
+      super.write(dataOutput);
+      dataOutput.writeInt(testValue);
+    }
+  }
+
+  /**
+   * Master compute
+   */
+  public static class CheckpointVertexMasterCompute extends
+      DefaultMasterCompute {
+
+    private int testValue = 0;
+
+    @Override
+    public void compute() {
+      long superstep = getSuperstep();
+      assertEquals(superstep, testValue++);
+    }
+
+    @Override
+    public void initialize() throws InstantiationException,
+        IllegalAccessException {
+      registerAggregator(LongSumAggregator.class.getName(),
+          LongSumAggregator.class);
+    }
+
+    @Override
+    public void readFields(DataInput in) throws IOException {
+      super.readFields(in);
+      testValue = in.readInt();
+    }
+
+    @Override
+    public void write(DataOutput out) throws IOException {
+      super.write(out);
+      out.writeInt(testValue);
+    }
+  }
+
+
+
+}