You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@giraph.apache.org by rv...@apache.org on 2014/10/26 02:22:11 UTC

[28/47] git commit: updated refs/heads/release-1.1 to 4c139ee

GIRAPH-933: Checkpointing improvements (edunov via majakabiljo)


Project: http://git-wip-us.apache.org/repos/asf/giraph/repo
Commit: http://git-wip-us.apache.org/repos/asf/giraph/commit/5adca63d
Tree: http://git-wip-us.apache.org/repos/asf/giraph/tree/5adca63d
Diff: http://git-wip-us.apache.org/repos/asf/giraph/diff/5adca63d

Branch: refs/heads/release-1.1
Commit: 5adca63deca25d84f4fdea053c35a85efc8bbb3d
Parents: bc9f823
Author: Maja Kabiljo <ma...@fb.com>
Authored: Fri Aug 15 15:03:19 2014 -0700
Committer: Maja Kabiljo <ma...@fb.com>
Committed: Fri Aug 15 15:03:19 2014 -0700

----------------------------------------------------------------------
 CHANGELOG                                       |   2 +
 .../java/org/apache/giraph/bsp/BspService.java  | 108 ++++++----
 .../apache/giraph/bsp/CentralizedService.java   |   9 -
 .../giraph/bsp/CentralizedServiceMaster.java    |   4 +-
 .../giraph/bsp/CentralizedServiceWorker.java    |   8 +
 .../org/apache/giraph/bsp/CheckpointStatus.java |  31 +++
 .../org/apache/giraph/bsp/SuperstepState.java   |  30 ++-
 .../java/org/apache/giraph/comm/ServerData.java |  10 +
 .../org/apache/giraph/conf/GiraphConstants.java |  10 +-
 .../giraph/graph/FinishedSuperstepStats.java    |  20 +-
 .../org/apache/giraph/graph/GlobalStats.java    |  27 ++-
 .../apache/giraph/graph/GraphTaskManager.java   |  44 ++--
 .../job/DefaultGiraphJobRetryChecker.java       |   5 +
 .../java/org/apache/giraph/job/GiraphJob.java   |  23 ++
 .../giraph/job/GiraphJobRetryChecker.java       |   6 +
 .../java/org/apache/giraph/job/HadoopUtils.java |  15 ++
 .../apache/giraph/master/BspServiceMaster.java  | 147 +++++++++----
 .../org/apache/giraph/master/MasterThread.java  |  10 +-
 .../apache/giraph/utils/CheckpointingUtils.java |  62 ++++++
 .../org/apache/giraph/utils/WritableUtils.java  |  63 ++++++
 .../apache/giraph/worker/BspServiceWorker.java  |  47 ++++-
 .../apache/giraph/utils/TestWritableUtils.java  |  70 +++++++
 .../org/apache/giraph/TestCheckpointing.java    | 208 +++++++++++++++----
 pom.xml                                         |   2 +-
 24 files changed, 787 insertions(+), 174 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/giraph/blob/5adca63d/CHANGELOG
----------------------------------------------------------------------
diff --git a/CHANGELOG b/CHANGELOG
index 66136b2..b64ce2c 100644
--- a/CHANGELOG
+++ b/CHANGELOG
@@ -1,6 +1,8 @@
 Giraph Change Log
 
 Release 1.1.0 - unreleased
+  GIRAPH-933: Checkpointing improvements (edunov via majakabiljo)
+
   GIRAPH-943: Perf regression due to netty 4.0.21 (pavanka)
 
   GIRAPH-935: Loosen modifiers when needed (ikabiljo via majakabiljo)

http://git-wip-us.apache.org/repos/asf/giraph/blob/5adca63d/giraph-core/src/main/java/org/apache/giraph/bsp/BspService.java
----------------------------------------------------------------------
diff --git a/giraph-core/src/main/java/org/apache/giraph/bsp/BspService.java b/giraph-core/src/main/java/org/apache/giraph/bsp/BspService.java
index 02577b9..c418a89 100644
--- a/giraph-core/src/main/java/org/apache/giraph/bsp/BspService.java
+++ b/giraph-core/src/main/java/org/apache/giraph/bsp/BspService.java
@@ -24,13 +24,16 @@ import org.apache.giraph.graph.GraphTaskManager;
 import org.apache.giraph.graph.InputSplitEvents;
 import org.apache.giraph.graph.InputSplitPaths;
 import org.apache.giraph.partition.GraphPartitionerFactory;
+import org.apache.giraph.utils.CheckpointingUtils;
 import org.apache.giraph.worker.WorkerInfo;
 import org.apache.giraph.zk.BspEvent;
 import org.apache.giraph.zk.PredicateLock;
 import org.apache.giraph.zk.ZooKeeperExt;
 import org.apache.giraph.zk.ZooKeeperManager;
+import org.apache.hadoop.fs.FileStatus;
 import org.apache.hadoop.fs.FileSystem;
 import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.fs.PathFilter;
 import org.apache.hadoop.io.Writable;
 import org.apache.hadoop.io.WritableComparable;
 import org.apache.hadoop.mapreduce.Mapper;
@@ -50,10 +53,10 @@ import java.net.UnknownHostException;
 import java.nio.charset.Charset;
 import java.security.InvalidParameterException;
 import java.util.ArrayList;
+import java.util.Arrays;
 import java.util.Collections;
 import java.util.List;
 
-import static org.apache.giraph.conf.GiraphConstants.CHECKPOINT_DIRECTORY;
 import static org.apache.giraph.conf.GiraphConstants.RESTART_JOB_ID;
 
 /**
@@ -162,6 +165,8 @@ public abstract class BspService<I extends WritableComparable,
   public static final String WORKER_PROGRESSES = "/_workerProgresses";
   /** Denotes that computation should be halted */
   public static final String HALT_COMPUTATION_NODE = "/_haltComputation";
+  /** User sets this flag to checkpoint and stop the job */
+  public static final String FORCE_CHECKPOINT_USER_FLAG = "/_checkpointAndStop";
   /** Denotes which workers have been cleaned up */
   public static final String CLEANED_UP_DIR = "/_cleanedUpDir";
   /** JSON partition stats key */
@@ -283,8 +288,6 @@ public abstract class BspService<I extends WritableComparable,
   private final GraphTaskManager<I, V, E> graphTaskManager;
   /** File system */
   private final FileSystem fs;
-  /** Checkpoint frequency */
-  private final int checkpointFrequency;
 
   /**
    * Constructor.
@@ -325,13 +328,6 @@ public abstract class BspService<I extends WritableComparable,
     this.taskPartition = conf.getTaskPartition();
     this.restartedSuperstep = conf.getLong(
         GiraphConstants.RESTART_SUPERSTEP, UNSET_SUPERSTEP);
-    this.cachedSuperstep = restartedSuperstep;
-    if ((restartedSuperstep != UNSET_SUPERSTEP) &&
-        (restartedSuperstep < 0)) {
-      throw new IllegalArgumentException(
-          "BspService: Invalid superstep to restart - " +
-              restartedSuperstep);
-    }
     try {
       this.hostname = conf.getLocalHostname();
     } catch (UnknownHostException e) {
@@ -340,8 +336,6 @@ public abstract class BspService<I extends WritableComparable,
     this.hostnamePartitionId = hostname + "_" + getTaskPartition();
     this.graphPartitionerFactory = conf.createGraphPartitioner();
 
-    this.checkpointFrequency = conf.getCheckpointFrequency();
-
     basePath = ZooKeeperManager.getBasePath(conf) + BASE_DIR + "/" + jobId;
     getContext().getCounter(GiraphConstants.ZOOKEEPER_BASE_PATH_COUNTER_GROUP,
         basePath);
@@ -360,13 +354,14 @@ public abstract class BspService<I extends WritableComparable,
     cleanedUpPath = basePath + CLEANED_UP_DIR;
 
     String restartJobId = RESTART_JOB_ID.get(conf);
+
     savedCheckpointBasePath =
-        CHECKPOINT_DIRECTORY.getWithDefault(getConfiguration(),
-            CHECKPOINT_DIRECTORY.getDefaultValue() + "/" +
-                (restartJobId == null ? getJobId() : restartJobId));
-    checkpointBasePath =
-        CHECKPOINT_DIRECTORY.getWithDefault(getConfiguration(),
-            CHECKPOINT_DIRECTORY.getDefaultValue() + "/" + getJobId());
+        CheckpointingUtils.getCheckpointBasePath(getConfiguration(),
+            restartJobId == null ? getJobId() : restartJobId);
+
+    checkpointBasePath = CheckpointingUtils.
+        getCheckpointBasePath(getConfiguration(), getJobId());
+
     masterElectionPath = basePath + MASTER_ELECTION_DIR;
     myProgressPath = basePath + WORKER_PROGRESSES + "/" + taskPartition;
     String serverPortList = conf.getZookeeperList();
@@ -392,6 +387,24 @@ public abstract class BspService<I extends WritableComparable,
     } catch (IOException e) {
       throw new RuntimeException(e);
     }
+
+    //Trying to restart from the latest superstep
+    if (restartJobId != null &&
+        restartedSuperstep == UNSET_SUPERSTEP) {
+      try {
+        restartedSuperstep = getLastCheckpointedSuperstep();
+      } catch (IOException e) {
+        throw new RuntimeException(e);
+      }
+    }
+    this.cachedSuperstep = restartedSuperstep;
+    if ((restartedSuperstep != UNSET_SUPERSTEP) &&
+        (restartedSuperstep < 0)) {
+      throw new IllegalArgumentException(
+          "BspService: Invalid superstep to restart - " +
+              restartedSuperstep);
+    }
+
   }
 
   /**
@@ -643,28 +656,6 @@ public abstract class BspService<I extends WritableComparable,
   }
 
   /**
-   * Should checkpoint on this superstep?  If checkpointing, always
-   * checkpoint the first user superstep.  If restarting, the first
-   * checkpoint is after the frequency has been met.
-   *
-   * @param superstep Decide if checkpointing no this superstep
-   * @return True if this superstep should be checkpointed, false otherwise
-   */
-  public final boolean checkpointFrequencyMet(long superstep) {
-    if (checkpointFrequency == 0) {
-      return false;
-    }
-    long firstCheckpoint = INPUT_SUPERSTEP + 1;
-    if (getRestartedSuperstep() != UNSET_SUPERSTEP) {
-      firstCheckpoint = getRestartedSuperstep() + checkpointFrequency;
-    }
-    if (superstep < firstCheckpoint) {
-      return false;
-    }
-    return ((superstep - firstCheckpoint) % checkpointFrequency) == 0;
-  }
-
-  /**
    * Get the file system
    *
    * @return file system
@@ -1241,4 +1232,41 @@ public abstract class BspService<I extends WritableComparable,
     }
     return eventProcessed;
   }
+
+  /**
+   * Get the last saved superstep.
+   *
+   * @return Last good superstep number
+   * @throws IOException
+   */
+  protected long getLastCheckpointedSuperstep() throws IOException {
+    FileStatus[] fileStatusArray =
+        getFs().listStatus(new Path(savedCheckpointBasePath),
+            new FinalizedCheckpointPathFilter());
+    if (fileStatusArray == null) {
+      return -1;
+    }
+    Arrays.sort(fileStatusArray);
+    long lastCheckpointedSuperstep = getCheckpoint(
+        fileStatusArray[fileStatusArray.length - 1].getPath());
+    if (LOG.isInfoEnabled()) {
+      LOG.info("getLastGoodCheckpoint: Found last good checkpoint " +
+          lastCheckpointedSuperstep + " from " +
+          fileStatusArray[fileStatusArray.length - 1].
+              getPath().toString());
+    }
+    return lastCheckpointedSuperstep;
+  }
+
+  /**
+   * Only get the finalized checkpoint files
+   */
+  private static class FinalizedCheckpointPathFilter implements PathFilter {
+    @Override
+    public boolean accept(Path path) {
+      return path.getName().endsWith(BspService.CHECKPOINT_FINALIZED_POSTFIX);
+    }
+
+  }
+
 }

http://git-wip-us.apache.org/repos/asf/giraph/blob/5adca63d/giraph-core/src/main/java/org/apache/giraph/bsp/CentralizedService.java
----------------------------------------------------------------------
diff --git a/giraph-core/src/main/java/org/apache/giraph/bsp/CentralizedService.java b/giraph-core/src/main/java/org/apache/giraph/bsp/CentralizedService.java
index ff3e427..560f1fb 100644
--- a/giraph-core/src/main/java/org/apache/giraph/bsp/CentralizedService.java
+++ b/giraph-core/src/main/java/org/apache/giraph/bsp/CentralizedService.java
@@ -51,15 +51,6 @@ public interface CentralizedService<I extends WritableComparable,
   long getRestartedSuperstep();
 
   /**
-   * Given a superstep, should it be checkpointed based on the
-   * checkpoint frequency?
-   *
-   * @param superstep superstep to check against frequency
-   * @return true if checkpoint frequency met or superstep is 1.
-   */
-  boolean checkpointFrequencyMet(long superstep);
-
-  /**
    * Get list of workers
    *
    * @return List of workers

http://git-wip-us.apache.org/repos/asf/giraph/blob/5adca63d/giraph-core/src/main/java/org/apache/giraph/bsp/CentralizedServiceMaster.java
----------------------------------------------------------------------
diff --git a/giraph-core/src/main/java/org/apache/giraph/bsp/CentralizedServiceMaster.java b/giraph-core/src/main/java/org/apache/giraph/bsp/CentralizedServiceMaster.java
index e5b7cf3..9b4f9d6 100644
--- a/giraph-core/src/main/java/org/apache/giraph/bsp/CentralizedServiceMaster.java
+++ b/giraph-core/src/main/java/org/apache/giraph/bsp/CentralizedServiceMaster.java
@@ -175,7 +175,9 @@ public interface CentralizedServiceMaster<I extends WritableComparable,
    *
    * @throws IOException
    * @throws InterruptedException
+   * @param superstepState what was the state
+   *                       of the last complete superstep?
    */
-  void cleanup()
+  void cleanup(SuperstepState superstepState)
     throws IOException, InterruptedException;
 }

http://git-wip-us.apache.org/repos/asf/giraph/blob/5adca63d/giraph-core/src/main/java/org/apache/giraph/bsp/CentralizedServiceWorker.java
----------------------------------------------------------------------
diff --git a/giraph-core/src/main/java/org/apache/giraph/bsp/CentralizedServiceWorker.java b/giraph-core/src/main/java/org/apache/giraph/bsp/CentralizedServiceWorker.java
index e5d0ae1..37aed45 100644
--- a/giraph-core/src/main/java/org/apache/giraph/bsp/CentralizedServiceWorker.java
+++ b/giraph-core/src/main/java/org/apache/giraph/bsp/CentralizedServiceWorker.java
@@ -21,6 +21,7 @@ package org.apache.giraph.bsp;
 import org.apache.giraph.comm.ServerData;
 import org.apache.giraph.comm.WorkerClient;
 import org.apache.giraph.graph.FinishedSuperstepStats;
+import org.apache.giraph.graph.GlobalStats;
 import org.apache.giraph.graph.GraphTaskManager;
 import org.apache.giraph.graph.VertexEdgeCount;
 import org.apache.giraph.io.superstep_output.SuperstepOutput;
@@ -237,4 +238,11 @@ public interface CentralizedServiceWorker<I extends WritableComparable,
    */
   void cleanup(FinishedSuperstepStats finishedSuperstepStats)
     throws IOException, InterruptedException;
+
+  /**
+   * Loads Global stats from zookeeper.
+   * @return global stats stored in zookeeper for
+   * previous superstep.
+   */
+  GlobalStats getGlobalStats();
 }

http://git-wip-us.apache.org/repos/asf/giraph/blob/5adca63d/giraph-core/src/main/java/org/apache/giraph/bsp/CheckpointStatus.java
----------------------------------------------------------------------
diff --git a/giraph-core/src/main/java/org/apache/giraph/bsp/CheckpointStatus.java b/giraph-core/src/main/java/org/apache/giraph/bsp/CheckpointStatus.java
new file mode 100644
index 0000000..74db490
--- /dev/null
+++ b/giraph-core/src/main/java/org/apache/giraph/bsp/CheckpointStatus.java
@@ -0,0 +1,31 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.giraph.bsp;
+
+/**
+ * Enum represents possible checkpoint state.
+ */
+public enum CheckpointStatus {
+  /** Do nothing, no checkpoint required */
+  NONE,
+  /** Regular checkpoint */
+  CHECKPOINT,
+  /** Do checkpoint and then halt further computation */
+  CHECKPOINT_AND_HALT
+}

http://git-wip-us.apache.org/repos/asf/giraph/blob/5adca63d/giraph-core/src/main/java/org/apache/giraph/bsp/SuperstepState.java
----------------------------------------------------------------------
diff --git a/giraph-core/src/main/java/org/apache/giraph/bsp/SuperstepState.java b/giraph-core/src/main/java/org/apache/giraph/bsp/SuperstepState.java
index c384fbf..768278b 100644
--- a/giraph-core/src/main/java/org/apache/giraph/bsp/SuperstepState.java
+++ b/giraph-core/src/main/java/org/apache/giraph/bsp/SuperstepState.java
@@ -23,11 +23,33 @@ package org.apache.giraph.bsp;
  */
 public enum SuperstepState {
   /** Nothing happened yet */
-  INITIAL,
+  INITIAL(false),
   /** A worker died during this superstep */
-  WORKER_FAILURE,
+  WORKER_FAILURE(false),
   /** This superstep completed correctly */
-  THIS_SUPERSTEP_DONE,
+  THIS_SUPERSTEP_DONE(false),
   /** All supersteps are complete */
-  ALL_SUPERSTEPS_DONE,
+  ALL_SUPERSTEPS_DONE(true),
+  /** Execution halted */
+  CHECKPOINT_AND_HALT(true);
+
+  /** Should we stop execution after this superstep? */
+  private boolean executionComplete;
+
+  /**
+   * Enum constructor
+   * @param executionComplete is final state?
+   */
+  SuperstepState(boolean executionComplete) {
+    this.executionComplete = executionComplete;
+  }
+
+  /**
+   * Returns true if execution has to be stopped after this
+   * superstep.
+   * @return whether execution is complete.
+   */
+  public boolean isExecutionComplete() {
+    return executionComplete;
+  }
 }

http://git-wip-us.apache.org/repos/asf/giraph/blob/5adca63d/giraph-core/src/main/java/org/apache/giraph/comm/ServerData.java
----------------------------------------------------------------------
diff --git a/giraph-core/src/main/java/org/apache/giraph/comm/ServerData.java b/giraph-core/src/main/java/org/apache/giraph/comm/ServerData.java
index a92cd1c..1fd85e4 100644
--- a/giraph-core/src/main/java/org/apache/giraph/comm/ServerData.java
+++ b/giraph-core/src/main/java/org/apache/giraph/comm/ServerData.java
@@ -269,4 +269,14 @@ public class ServerData<I extends WritableComparable,
   public void addIncomingWorkerToWorkerMessage(Writable message) {
     incomingWorkerToWorkerMessages.add(message);
   }
+
+
+  /**
+   * Get worker to worker messages received in previous superstep.
+   * @return list of current worker to worker messages.
+   */
+  public List<Writable> getCurrentWorkerToWorkerMessages() {
+    return currentWorkerToWorkerMessages;
+  }
+
 }

http://git-wip-us.apache.org/repos/asf/giraph/blob/5adca63d/giraph-core/src/main/java/org/apache/giraph/conf/GiraphConstants.java
----------------------------------------------------------------------
diff --git a/giraph-core/src/main/java/org/apache/giraph/conf/GiraphConstants.java b/giraph-core/src/main/java/org/apache/giraph/conf/GiraphConstants.java
index 0424a47..da0a8db 100644
--- a/giraph-core/src/main/java/org/apache/giraph/conf/GiraphConstants.java
+++ b/giraph-core/src/main/java/org/apache/giraph/conf/GiraphConstants.java
@@ -1137,12 +1137,16 @@ public interface GiraphConstants {
       new IntConfOption("giraph.checkpoint.io.threads", 8,
           "Number of threads for writing and reading checkpoints");
 
-  /** Compression algorithm to be used for checkpointing */
+  /**
+   * Compression algorithm to be used for checkpointing.
+   * Defined by extension for hadoop compatibility reasons.
+  */
   StrConfOption CHECKPOINT_COMPRESSION_CODEC =
       new StrConfOption("giraph.checkpoint.compression.codec",
-          "org.apache.hadoop.io.compress.DefaultCodec",
+          ".deflate",
           "Defines compression algorithm we will be using for " +
-              "storing checkpoint");
+              "storing checkpoint. Available options include but " +
+              "not restricted to: .deflate, .gz, .bz2, .lzo");
 
   /** Number of threads to use in async message store, 0 means
    * we should not use async message processing */

http://git-wip-us.apache.org/repos/asf/giraph/blob/5adca63d/giraph-core/src/main/java/org/apache/giraph/graph/FinishedSuperstepStats.java
----------------------------------------------------------------------
diff --git a/giraph-core/src/main/java/org/apache/giraph/graph/FinishedSuperstepStats.java b/giraph-core/src/main/java/org/apache/giraph/graph/FinishedSuperstepStats.java
index c351778..f7895a9 100644
--- a/giraph-core/src/main/java/org/apache/giraph/graph/FinishedSuperstepStats.java
+++ b/giraph-core/src/main/java/org/apache/giraph/graph/FinishedSuperstepStats.java
@@ -17,6 +17,8 @@
  */
 package org.apache.giraph.graph;
 
+import org.apache.giraph.bsp.CheckpointStatus;
+
 /**
  * Immutable graph stats after the completion of a superstep
  */
@@ -27,6 +29,11 @@ public class FinishedSuperstepStats extends VertexEdgeCount {
   private final boolean allVerticesHalted;
   /** Needs to load a checkpoint */
   private final boolean mustLoadCheckpoint;
+  /**
+   * Master decides when we need to checkpoint and what should
+   * we do next.
+   */
+  private final CheckpointStatus checkpointStatus;
 
   /**
    * Constructor.
@@ -36,16 +43,19 @@ public class FinishedSuperstepStats extends VertexEdgeCount {
    * @param numVertices Number of vertices
    * @param numEdges Number of edges
    * @param mustLoadCheckpoint Has to load a checkpoint?
+   * @param checkpointStatus Should we checkpoint after this superstep?
    */
   public FinishedSuperstepStats(long numLocalVertices,
                                 boolean allVerticesHalted,
                                 long numVertices,
                                 long numEdges,
-                                boolean mustLoadCheckpoint) {
+                                boolean mustLoadCheckpoint,
+                                CheckpointStatus checkpointStatus) {
     super(numVertices, numEdges);
     this.localVertexCount = numLocalVertices;
     this.allVerticesHalted = allVerticesHalted;
     this.mustLoadCheckpoint = mustLoadCheckpoint;
+    this.checkpointStatus = checkpointStatus;
   }
 
   public long getLocalVertexCount() {
@@ -69,4 +79,12 @@ public class FinishedSuperstepStats extends VertexEdgeCount {
   public boolean mustLoadCheckpoint() {
     return mustLoadCheckpoint;
   }
+
+  /**
+   * What master thinks about checkpointing after this superstep.
+   * @return CheckpointStatus that reflects master decision.
+   */
+  public CheckpointStatus getCheckpointStatus() {
+    return checkpointStatus;
+  }
 }

http://git-wip-us.apache.org/repos/asf/giraph/blob/5adca63d/giraph-core/src/main/java/org/apache/giraph/graph/GlobalStats.java
----------------------------------------------------------------------
diff --git a/giraph-core/src/main/java/org/apache/giraph/graph/GlobalStats.java b/giraph-core/src/main/java/org/apache/giraph/graph/GlobalStats.java
index bc56c9c..e11f02c 100644
--- a/giraph-core/src/main/java/org/apache/giraph/graph/GlobalStats.java
+++ b/giraph-core/src/main/java/org/apache/giraph/graph/GlobalStats.java
@@ -22,6 +22,7 @@ import java.io.DataInput;
 import java.io.DataOutput;
 import java.io.IOException;
 
+import org.apache.giraph.bsp.CheckpointStatus;
 import org.apache.giraph.partition.PartitionStats;
 import org.apache.hadoop.io.Writable;
 
@@ -41,6 +42,12 @@ public class GlobalStats implements Writable {
   private long messageBytesCount = 0;
   /** Whether the computation should be halted */
   private boolean haltComputation = false;
+  /**
+   * Master's decision on whether we should checkpoint and
+   * what to do next.
+   */
+  private CheckpointStatus checkpointStatus =
+      CheckpointStatus.NONE;
 
   /**
    * Add the stats of a partition to the global stats.
@@ -81,6 +88,14 @@ public class GlobalStats implements Writable {
     haltComputation = value;
   }
 
+  public CheckpointStatus getCheckpointStatus() {
+    return checkpointStatus;
+  }
+
+  public void setCheckpointStatus(CheckpointStatus checkpointStatus) {
+    this.checkpointStatus = checkpointStatus;
+  }
+
   /**
    * Add messages to the global stats.
    *
@@ -107,6 +122,11 @@ public class GlobalStats implements Writable {
     messageCount = input.readLong();
     messageBytesCount = input.readLong();
     haltComputation = input.readBoolean();
+    if (input.readBoolean()) {
+      checkpointStatus = CheckpointStatus.values()[input.readInt()];
+    } else {
+      checkpointStatus = null;
+    }
   }
 
   @Override
@@ -117,6 +137,10 @@ public class GlobalStats implements Writable {
     output.writeLong(messageCount);
     output.writeLong(messageBytesCount);
     output.writeBoolean(haltComputation);
+    output.writeBoolean(checkpointStatus != null);
+    if (checkpointStatus != null) {
+      output.writeInt(checkpointStatus.ordinal());
+    }
   }
 
   @Override
@@ -124,6 +148,7 @@ public class GlobalStats implements Writable {
     return "(vtx=" + vertexCount + ",finVtx=" +
         finishedVertexCount + ",edges=" + edgeCount + ",msgCount=" +
         messageCount + ",msgBytesCount=" +
-          messageBytesCount + ",haltComputation=" + haltComputation + ")";
+          messageBytesCount + ",haltComputation=" + haltComputation +
+        ", checkpointStatus=" + checkpointStatus + ')';
   }
 }

http://git-wip-us.apache.org/repos/asf/giraph/blob/5adca63d/giraph-core/src/main/java/org/apache/giraph/graph/GraphTaskManager.java
----------------------------------------------------------------------
diff --git a/giraph-core/src/main/java/org/apache/giraph/graph/GraphTaskManager.java b/giraph-core/src/main/java/org/apache/giraph/graph/GraphTaskManager.java
index 6ebb002..8a97939 100644
--- a/giraph-core/src/main/java/org/apache/giraph/graph/GraphTaskManager.java
+++ b/giraph-core/src/main/java/org/apache/giraph/graph/GraphTaskManager.java
@@ -21,6 +21,7 @@ package org.apache.giraph.graph;
 import org.apache.giraph.bsp.BspService;
 import org.apache.giraph.bsp.CentralizedServiceMaster;
 import org.apache.giraph.bsp.CentralizedServiceWorker;
+import org.apache.giraph.bsp.CheckpointStatus;
 import org.apache.giraph.comm.messages.MessageStore;
 import org.apache.giraph.conf.GiraphConstants;
 import org.apache.giraph.conf.ImmutableClassesGiraphConfiguration;
@@ -120,7 +121,7 @@ public class GraphTaskManager<I extends WritableComparable, V extends Writable,
   private GraphFunctions graphFunctions = GraphFunctions.UNKNOWN;
   /** Superstep stats */
   private FinishedSuperstepStats finishedSuperstepStats =
-      new FinishedSuperstepStats(0, false, 0, 0, false);
+      new FinishedSuperstepStats(0, false, 0, 0, false, CheckpointStatus.NONE);
 
   // Per-Job Metrics
   /** Timer for WorkerContext#preApplication() */
@@ -281,7 +282,18 @@ public class GraphTaskManager<I extends WritableComparable, V extends Writable,
       context.progress();
       serviceWorker.exchangeVertexPartitions(masterAssignedPartitionOwners);
       context.progress();
-      graphState = checkSuperstepRestarted(superstep, graphState);
+      boolean hasBeenRestarted = checkSuperstepRestarted(superstep);
+
+      GlobalStats globalStats = serviceWorker.getGlobalStats();
+
+      if (hasBeenRestarted) {
+        graphState = new GraphState(superstep,
+            finishedSuperstepStats.getVertexCount(),
+            finishedSuperstepStats.getEdgeCount(),
+            context);
+      } else if (storeCheckpoint(globalStats.getCheckpointStatus())) {
+        break;
+      }
       prepareForSuperstep(graphState);
       context.progress();
       MessageStore<I, Writable> messageStore =
@@ -735,11 +747,9 @@ public class GraphTaskManager<I extends WritableComparable, V extends Writable,
   /**
    * Handle the event that this superstep is a restart of a failed one.
    * @param superstep current superstep
-   * @param graphState the BSP graph state
    * @return the graph state, updated if this is a restart superstep
    */
-  private GraphState checkSuperstepRestarted(long superstep,
-    GraphState graphState) throws IOException {
+  private boolean checkSuperstepRestarted(long superstep) throws IOException {
     // Might need to restart from another superstep
     // (manually or automatic), or store a checkpoint
     if (serviceWorker.getRestartedSuperstep() == superstep) {
@@ -750,15 +760,25 @@ public class GraphTaskManager<I extends WritableComparable, V extends Writable,
         serviceWorker.getRestartedSuperstep());
       finishedSuperstepStats = new FinishedSuperstepStats(0, false,
           vertexEdgeCount.getVertexCount(), vertexEdgeCount.getEdgeCount(),
-          false);
-      graphState = new GraphState(superstep,
-          finishedSuperstepStats.getVertexCount(),
-          finishedSuperstepStats.getEdgeCount(),
-          context);
-    } else if (serviceWorker.checkpointFrequencyMet(superstep)) {
+          false, CheckpointStatus.NONE);
+      return true;
+    }
+    return false;
+  }
+
+  /**
+   * Check if it's time to checkpoint and actually does checkpointing
+   * if it is.
+   * @param checkpointStatus master's decision
+   * @return true if we need to stop computation after checkpoint
+   * @throws IOException
+   */
+  private boolean storeCheckpoint(CheckpointStatus checkpointStatus)
+    throws IOException {
+    if (checkpointStatus != CheckpointStatus.NONE) {
       serviceWorker.storeCheckpoint();
     }
-    return graphState;
+    return checkpointStatus == CheckpointStatus.CHECKPOINT_AND_HALT;
   }
 
   /**

http://git-wip-us.apache.org/repos/asf/giraph/blob/5adca63d/giraph-core/src/main/java/org/apache/giraph/job/DefaultGiraphJobRetryChecker.java
----------------------------------------------------------------------
diff --git a/giraph-core/src/main/java/org/apache/giraph/job/DefaultGiraphJobRetryChecker.java b/giraph-core/src/main/java/org/apache/giraph/job/DefaultGiraphJobRetryChecker.java
index 0cab86c..edf6bce 100644
--- a/giraph-core/src/main/java/org/apache/giraph/job/DefaultGiraphJobRetryChecker.java
+++ b/giraph-core/src/main/java/org/apache/giraph/job/DefaultGiraphJobRetryChecker.java
@@ -30,4 +30,9 @@ public class DefaultGiraphJobRetryChecker implements GiraphJobRetryChecker {
     // By default, don't retry failed jobs
     return false;
   }
+
+  @Override
+  public boolean shouldRestartCheckpoint() {
+    return false;
+  }
 }

http://git-wip-us.apache.org/repos/asf/giraph/blob/5adca63d/giraph-core/src/main/java/org/apache/giraph/job/GiraphJob.java
----------------------------------------------------------------------
diff --git a/giraph-core/src/main/java/org/apache/giraph/job/GiraphJob.java b/giraph-core/src/main/java/org/apache/giraph/job/GiraphJob.java
index 4a1f02e..436126b 100644
--- a/giraph-core/src/main/java/org/apache/giraph/job/GiraphJob.java
+++ b/giraph-core/src/main/java/org/apache/giraph/job/GiraphJob.java
@@ -24,9 +24,13 @@ import org.apache.giraph.conf.GiraphConfiguration;
 import org.apache.giraph.conf.GiraphConstants;
 import org.apache.giraph.conf.ImmutableClassesGiraphConfiguration;
 import org.apache.giraph.graph.GraphMapper;
+import org.apache.giraph.utils.CheckpointingUtils;
 import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.Path;
 import org.apache.hadoop.ipc.Client;
 import org.apache.hadoop.mapreduce.Job;
+import org.apache.hadoop.mapreduce.JobID;
 import org.apache.log4j.Logger;
 
 import java.io.IOException;
@@ -261,6 +265,25 @@ public class GiraphJob {
         jobProgressTracker.stop();
       }
       jobObserver.jobFinished(submittedJob, passed);
+
+      FileSystem fs = FileSystem.get(conf);
+      JobID jobID = HadoopUtils.getJobID(submittedJob);
+      if (jobID != null) {
+        Path checkpointMark =
+            CheckpointingUtils.getCheckpointMarkPath(conf, jobID.toString());
+
+        if (fs.exists(checkpointMark)) {
+          if (retryChecker.shouldRestartCheckpoint()) {
+            GiraphConstants.RESTART_JOB_ID.set(conf, jobID.toString());
+            continue;
+          }
+        }
+      } else {
+        LOG.warn("jobID is null, are you using hadoop 0.20.203? " +
+            "Please report this issue here " +
+            "https://issues.apache.org/jira/browse/GIRAPH-933");
+      }
+
       if (passed || !retryChecker.shouldRetry(submittedJob, tryCount)) {
         return passed;
       }

http://git-wip-us.apache.org/repos/asf/giraph/blob/5adca63d/giraph-core/src/main/java/org/apache/giraph/job/GiraphJobRetryChecker.java
----------------------------------------------------------------------
diff --git a/giraph-core/src/main/java/org/apache/giraph/job/GiraphJobRetryChecker.java b/giraph-core/src/main/java/org/apache/giraph/job/GiraphJobRetryChecker.java
index 53a800e..556b128 100644
--- a/giraph-core/src/main/java/org/apache/giraph/job/GiraphJobRetryChecker.java
+++ b/giraph-core/src/main/java/org/apache/giraph/job/GiraphJobRetryChecker.java
@@ -33,4 +33,10 @@ public interface GiraphJobRetryChecker {
    * @return True iff job should be retried
    */
   boolean shouldRetry(Job submittedJob, int tryCount);
+
+  /**
+   * The job has been checkpointed and halted. Should we now restart it?
+   * @return true if checkpointed job should be automatically restarted.
+   */
+  boolean shouldRestartCheckpoint();
 }

http://git-wip-us.apache.org/repos/asf/giraph/blob/5adca63d/giraph-core/src/main/java/org/apache/giraph/job/HadoopUtils.java
----------------------------------------------------------------------
diff --git a/giraph-core/src/main/java/org/apache/giraph/job/HadoopUtils.java b/giraph-core/src/main/java/org/apache/giraph/job/HadoopUtils.java
index 9530fd6..f2c673b 100644
--- a/giraph-core/src/main/java/org/apache/giraph/job/HadoopUtils.java
+++ b/giraph-core/src/main/java/org/apache/giraph/job/HadoopUtils.java
@@ -18,6 +18,7 @@
 package org.apache.giraph.job;
 
 import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.mapreduce.Job;
 import org.apache.hadoop.mapreduce.JobID;
 import org.apache.hadoop.mapreduce.JobContext;
 import org.apache.hadoop.mapreduce.TaskAttemptID;
@@ -102,6 +103,20 @@ public class HadoopUtils {
   }
 
   /**
+   * Get Job ID from job.
+   * May return null for hadoop 0.20.203
+   * @param job submitted job
+   * @return JobId for submitted job.
+   */
+  public static JobID getJobID(Job job) {
+    /*if[HADOOP_JOB_ID_AVAILABLE]
+    return job.getID();
+    else[HADOOP_JOB_ID_AVAILABLE]*/
+    return job.getJobID();
+    /*end[HADOOP_JOB_ID_AVAILABLE]*/
+  }
+
+  /**
    * Create a JobContext, supporting many Hadoops.
    *
    * @param conf Configuration

http://git-wip-us.apache.org/repos/asf/giraph/blob/5adca63d/giraph-core/src/main/java/org/apache/giraph/master/BspServiceMaster.java
----------------------------------------------------------------------
diff --git a/giraph-core/src/main/java/org/apache/giraph/master/BspServiceMaster.java b/giraph-core/src/main/java/org/apache/giraph/master/BspServiceMaster.java
index e129390..671df23 100644
--- a/giraph-core/src/main/java/org/apache/giraph/master/BspServiceMaster.java
+++ b/giraph-core/src/main/java/org/apache/giraph/master/BspServiceMaster.java
@@ -24,6 +24,7 @@ import org.apache.commons.io.FilenameUtils;
 import org.apache.giraph.bsp.ApplicationState;
 import org.apache.giraph.bsp.BspInputFormat;
 import org.apache.giraph.bsp.CentralizedServiceMaster;
+import org.apache.giraph.bsp.CheckpointStatus;
 import org.apache.giraph.bsp.SuperstepState;
 import org.apache.giraph.comm.MasterClient;
 import org.apache.giraph.comm.MasterServer;
@@ -56,6 +57,7 @@ import org.apache.giraph.metrics.GiraphTimerContext;
 import org.apache.giraph.metrics.ResetSuperstepMetricsObserver;
 import org.apache.giraph.metrics.SuperstepMetricsRegistry;
 import org.apache.giraph.metrics.WorkerSuperstepMetrics;
+import org.apache.giraph.utils.CheckpointingUtils;
 import org.apache.giraph.utils.JMapHistoDumper;
 import org.apache.giraph.utils.ReactiveJMapHistoDumper;
 import org.apache.giraph.utils.ProgressableUtils;
@@ -67,10 +69,8 @@ import org.apache.giraph.worker.WorkerInfo;
 import org.apache.giraph.zk.BspEvent;
 import org.apache.giraph.zk.PredicateLock;
 import org.apache.hadoop.fs.FSDataOutputStream;
-import org.apache.hadoop.fs.FileStatus;
 import org.apache.hadoop.fs.FileSystem;
 import org.apache.hadoop.fs.Path;
-import org.apache.hadoop.fs.PathFilter;
 import org.apache.hadoop.io.Text;
 import org.apache.hadoop.io.Writable;
 import org.apache.hadoop.io.WritableComparable;
@@ -99,7 +99,6 @@ import java.io.IOException;
 import java.io.PrintStream;
 import java.nio.charset.Charset;
 import java.util.ArrayList;
-import java.util.Arrays;
 import java.util.Collection;
 import java.util.Collections;
 import java.util.Comparator;
@@ -189,6 +188,11 @@ public class BspServiceMaster<I extends WritableComparable,
   /** MasterCompute time */
   private GiraphTimer masterComputeTimer;
 
+  /** Checkpoint frequency */
+  private final int checkpointFrequency;
+  /** Current checkpoint status */
+  private CheckpointStatus checkpointStatus;
+
   /**
    * Constructor for setting up the master.
    *
@@ -224,6 +228,9 @@ public class BspServiceMaster<I extends WritableComparable,
     }
     observers = conf.createMasterObservers();
 
+    this.checkpointFrequency = conf.getCheckpointFrequency();
+    this.checkpointStatus = CheckpointStatus.NONE;
+
     GiraphMetrics.get().addSuperstepResetObserver(this);
     GiraphStats.init((Mapper.Context) context);
   }
@@ -365,7 +372,11 @@ public class BspServiceMaster<I extends WritableComparable,
         @SuppressWarnings("deprecation")
         JobID jobId = JobID.forName(getJobId());
         RunningJob job = jobClient.getJob(jobId);
-        job.killJob();
+        if (job != null) {
+          job.killJob();
+        } else {
+          LOG.error("Jon not found for jobId=" + getJobId());
+        }
       }
     } catch (IOException ioe) {
       throw new RuntimeException(ioe);
@@ -1196,11 +1207,11 @@ public class BspServiceMaster<I extends WritableComparable,
    *
    * @param chosenWorkerInfoHealthPath Path to the healthy workers in ZooKeeper
    * @param chosenWorkerInfoList List of the healthy workers
-   * @return true if they are all alive, false otherwise.
+   * @return a list of dead workers. Empty list if all workers are alive.
    * @throws InterruptedException
    * @throws KeeperException
    */
-  private boolean superstepChosenWorkerAlive(
+  private Collection<WorkerInfo> superstepChosenWorkerAlive(
     String chosenWorkerInfoHealthPath,
     List<WorkerInfo> chosenWorkerInfoList)
     throws KeeperException, InterruptedException {
@@ -1208,16 +1219,13 @@ public class BspServiceMaster<I extends WritableComparable,
         getWorkerInfosFromPath(chosenWorkerInfoHealthPath, false);
     Set<WorkerInfo> chosenWorkerInfoHealthySet =
         new HashSet<WorkerInfo>(chosenWorkerInfoHealthyList);
-    boolean allChosenWorkersHealthy = true;
+    List<WorkerInfo> deadWorkers = new ArrayList<>();
     for (WorkerInfo chosenWorkerInfo : chosenWorkerInfoList) {
       if (!chosenWorkerInfoHealthySet.contains(chosenWorkerInfo)) {
-        allChosenWorkersHealthy = false;
-        LOG.error("superstepChosenWorkerAlive: Missing chosen " +
-            "worker " + chosenWorkerInfo +
-            " on superstep " + getSuperstep());
+        deadWorkers.add(chosenWorkerInfo);
       }
     }
-    return allChosenWorkersHealthy;
+    return deadWorkers;
   }
 
   @Override
@@ -1257,37 +1265,13 @@ public class BspServiceMaster<I extends WritableComparable,
     }
   }
 
-  /**
-   * Only get the finalized checkpoint files
-   */
-  public static class FinalizedCheckpointPathFilter implements PathFilter {
-    @Override
-    public boolean accept(Path path) {
-      return path.getName().endsWith(BspService.CHECKPOINT_FINALIZED_POSTFIX);
-    }
-  }
-
   @Override
   public long getLastGoodCheckpoint() throws IOException {
     // Find the last good checkpoint if none have been written to the
     // knowledge of this master
     if (lastCheckpointedSuperstep == -1) {
       try {
-        FileStatus[] fileStatusArray =
-            getFs().listStatus(new Path(savedCheckpointBasePath),
-                new FinalizedCheckpointPathFilter());
-        if (fileStatusArray == null) {
-          return -1;
-        }
-        Arrays.sort(fileStatusArray);
-        lastCheckpointedSuperstep = getCheckpoint(
-            fileStatusArray[fileStatusArray.length - 1].getPath());
-        if (LOG.isInfoEnabled()) {
-          LOG.info("getLastGoodCheckpoint: Found last good checkpoint " +
-              lastCheckpointedSuperstep + " from " +
-              fileStatusArray[fileStatusArray.length - 1].
-                  getPath().toString());
-        }
+        lastCheckpointedSuperstep = getLastCheckpointedSuperstep();
       } catch (IOException e) {
         LOG.fatal("getLastGoodCheckpoint: No last good checkpoints can be " +
             "found, killing the job.", e);
@@ -1306,12 +1290,15 @@ public class BspServiceMaster<I extends WritableComparable,
    *        hostname and id
    * @param workerInfoList List of the workers to wait for
    * @param event Event to wait on for a chance to be done.
+   * @param ignoreDeath In case if worker died after making it through
+   *                    barrier, we will ignore death if set to true.
    * @return True if barrier was successful, false if there was a worker
    *         failure
    */
   private boolean barrierOnWorkerList(String finishedWorkerPath,
       List<WorkerInfo> workerInfoList,
-      BspEvent event) {
+      BspEvent event,
+      boolean ignoreDeath) {
     try {
       getZkExt().createOnceExt(finishedWorkerPath,
           null,
@@ -1339,6 +1326,7 @@ public class BspServiceMaster<I extends WritableComparable,
     final int defaultTaskTimeoutMsec = 10 * 60 * 1000;  // from TaskTracker
     final int taskTimeoutMsec = getContext().getConfiguration().getInt(
         "mapred.task.timeout", defaultTaskTimeoutMsec);
+    List<WorkerInfo> deadWorkers = new ArrayList<>();
     while (true) {
       try {
         finishedHostnameIdList =
@@ -1389,6 +1377,15 @@ public class BspServiceMaster<I extends WritableComparable,
         break;
       }
 
+      for (WorkerInfo deadWorker : deadWorkers) {
+        if (!finishedHostnameIdList.contains(deadWorker.getHostnameId())) {
+          LOG.error("barrierOnWorkerList: no results arived from " +
+              "worker that was pronounced dead: " + deadWorker +
+              " on superstep " + getSuperstep());
+          return false;
+        }
+      }
+
       // Wait for a signal or timeout
       event.waitMsecs(taskTimeoutMsec / 2);
       event.reset();
@@ -1396,9 +1393,13 @@ public class BspServiceMaster<I extends WritableComparable,
 
       // Did a worker die?
       try {
-        if (!superstepChosenWorkerAlive(
+        deadWorkers.addAll(superstepChosenWorkerAlive(
                 workerInfoHealthyPath,
-                workerInfoList)) {
+                workerInfoList));
+        if (!ignoreDeath && deadWorkers.size() > 0) {
+          LOG.error("barrierOnWorkerList: Missing chosen " +
+              "workers " + deadWorkers +
+              " on superstep " + getSuperstep());
           return false;
         }
       } catch (KeeperException e) {
@@ -1462,7 +1463,8 @@ public class BspServiceMaster<I extends WritableComparable,
     String logPrefix = "coordinate" + inputSplitsType + "InputSplits";
     if (!barrierOnWorkerList(inputSplitPaths.getDonePath(),
         chosenWorkerInfoList,
-        inputSplitEvents.getDoneStateChanged())) {
+        inputSplitEvents.getDoneStateChanged(),
+        false)) {
       throw new IllegalStateException(logPrefix + ": Worker failed during " +
           "input split (currently not supported)");
     }
@@ -1589,14 +1591,15 @@ public class BspServiceMaster<I extends WritableComparable,
 
     // Finalize the valid checkpoint file prefixes and possibly
     // the aggregators.
-    if (checkpointFrequencyMet(getSuperstep())) {
+    if (checkpointStatus != CheckpointStatus.NONE) {
       String workerWroteCheckpointPath =
           getWorkerWroteCheckpointPath(getApplicationAttempt(),
               getSuperstep());
       // first wait for all the workers to write their checkpoint data
       if (!barrierOnWorkerList(workerWroteCheckpointPath,
           chosenWorkerInfoList,
-          getWorkerWroteCheckpointEvent())) {
+          getWorkerWroteCheckpointEvent(),
+          checkpointStatus == CheckpointStatus.CHECKPOINT_AND_HALT)) {
         return SuperstepState.WORKER_FAILURE;
       }
       try {
@@ -1606,6 +1609,9 @@ public class BspServiceMaster<I extends WritableComparable,
             "coordinateSuperstep: IOException on finalizing checkpoint",
             e);
       }
+      if (checkpointStatus == CheckpointStatus.CHECKPOINT_AND_HALT) {
+        return SuperstepState.CHECKPOINT_AND_HALT;
+      }
     }
 
     if (getSuperstep() == INPUT_SUPERSTEP) {
@@ -1630,7 +1636,8 @@ public class BspServiceMaster<I extends WritableComparable,
         getWorkerFinishedPath(getApplicationAttempt(), getSuperstep());
     if (!barrierOnWorkerList(finishedWorkerPath,
         chosenWorkerInfoList,
-        getSuperstepStateChangedEvent())) {
+        getSuperstepStateChangedEvent(),
+        false)) {
       return SuperstepState.WORKER_FAILURE;
     }
 
@@ -1677,10 +1684,14 @@ public class BspServiceMaster<I extends WritableComparable,
     }
     getConfiguration().updateSuperstepClasses(superstepClasses);
 
+    //Signal workers that we want to checkpoint
+    checkpointStatus = getCheckpointStatus(getSuperstep() + 1);
+    globalStats.setCheckpointStatus(checkpointStatus);
     // Let everyone know the aggregated application state through the
     // superstep finishing znode.
     String superstepFinishedNode =
         getSuperstepFinishedPath(getApplicationAttempt(), getSuperstep());
+
     WritableUtils.writeToZnode(
         getZkExt(), superstepFinishedNode, -1, globalStats, superstepClasses);
     updateCounters(globalStats);
@@ -1703,6 +1714,43 @@ public class BspServiceMaster<I extends WritableComparable,
   }
 
   /**
+   * Should checkpoint on this superstep?  If checkpointing, always
+   * checkpoint the first user superstep.  If restarting, the first
+   * checkpoint is after the frequency has been met.
+   *
+   * @param superstep Decide if checkpointing no this superstep
+   * @return True if this superstep should be checkpointed, false otherwise
+   */
+  private CheckpointStatus getCheckpointStatus(long superstep) {
+    try {
+      if (getZkExt().
+          exists(basePath + FORCE_CHECKPOINT_USER_FLAG, false) != null) {
+        return CheckpointStatus.CHECKPOINT_AND_HALT;
+      }
+    } catch (KeeperException e) {
+      throw new IllegalStateException(
+          "cleanupZooKeeper: Got KeeperException", e);
+    } catch (InterruptedException e) {
+      throw new IllegalStateException(
+          "cleanupZooKeeper: Got IllegalStateException", e);
+    }
+    if (checkpointFrequency == 0) {
+      return CheckpointStatus.NONE;
+    }
+    long firstCheckpoint = INPUT_SUPERSTEP + 1;
+    if (getRestartedSuperstep() != UNSET_SUPERSTEP) {
+      firstCheckpoint = getRestartedSuperstep() + checkpointFrequency;
+    }
+    if (superstep < firstCheckpoint) {
+      return CheckpointStatus.NONE;
+    }
+    if (((superstep - firstCheckpoint) % checkpointFrequency) == 0) {
+      return CheckpointStatus.CHECKPOINT;
+    }
+    return CheckpointStatus.NONE;
+  }
+
+  /**
    * This doMasterCompute is only called
    * after masterCompute is initialized
    */
@@ -1837,7 +1885,7 @@ public class BspServiceMaster<I extends WritableComparable,
   }
 
   @Override
-  public void cleanup() throws IOException {
+  public void cleanup(SuperstepState superstepState) throws IOException {
     ImmutableClassesGiraphConfiguration conf = getConfiguration();
 
     // All master processes should denote they are done by adding special
@@ -1872,7 +1920,8 @@ public class BspServiceMaster<I extends WritableComparable,
       getGraphTaskManager().setIsMaster(true);
       cleanUpZooKeeper();
       // If desired, cleanup the checkpoint directory
-      if (GiraphConstants.CLEANUP_CHECKPOINTS_AFTER_SUCCESS.get(conf)) {
+      if (superstepState == SuperstepState.ALL_SUPERSTEPS_DONE &&
+          GiraphConstants.CLEANUP_CHECKPOINTS_AFTER_SUCCESS.get(conf)) {
         boolean success =
             getFs().delete(new Path(checkpointBasePath), true);
         if (LOG.isInfoEnabled()) {
@@ -1882,6 +1931,12 @@ public class BspServiceMaster<I extends WritableComparable,
               " succeeded ");
         }
       }
+      if (superstepState == SuperstepState.CHECKPOINT_AND_HALT) {
+        getFs().create(CheckpointingUtils.getCheckpointMarkPath(conf,
+            getJobId()), true);
+        failJob(new Exception("Checkpoint and halt requested. " +
+            "Killing this job."));
+      }
       aggregatorHandler.close();
       masterClient.closeConnections();
       masterServer.close();

http://git-wip-us.apache.org/repos/asf/giraph/blob/5adca63d/giraph-core/src/main/java/org/apache/giraph/master/MasterThread.java
----------------------------------------------------------------------
diff --git a/giraph-core/src/main/java/org/apache/giraph/master/MasterThread.java b/giraph-core/src/main/java/org/apache/giraph/master/MasterThread.java
index 0635210..8e4e0b8 100644
--- a/giraph-core/src/main/java/org/apache/giraph/master/MasterThread.java
+++ b/giraph-core/src/main/java/org/apache/giraph/master/MasterThread.java
@@ -96,6 +96,8 @@ public class MasterThread<I extends WritableComparable, V extends Writable,
       long initializeMillis = 0;
       long endMillis = 0;
       bspServiceMaster.setup();
+      SuperstepState superstepState = SuperstepState.INITIAL;
+
       if (bspServiceMaster.becomeMaster()) {
         // First call to checkWorkers waits for all pending resources.
         // If these resources are still available at subsequent calls it just
@@ -113,11 +115,9 @@ public class MasterThread<I extends WritableComparable, V extends Writable,
           long setupMillis = System.currentTimeMillis() - initializeMillis;
           GiraphTimers.getInstance().getSetupMs().increment(setupMillis);
           setupSecs = setupMillis / 1000.0d;
-          SuperstepState superstepState = SuperstepState.INITIAL;
-          long cachedSuperstep = BspService.UNSET_SUPERSTEP;
-          while (superstepState != SuperstepState.ALL_SUPERSTEPS_DONE) {
+          while (!superstepState.isExecutionComplete()) {
             long startSuperstepMillis = System.currentTimeMillis();
-            cachedSuperstep = bspServiceMaster.getSuperstep();
+            long cachedSuperstep = bspServiceMaster.getSuperstep();
             GiraphMetrics.get().resetSuperstepMetrics(cachedSuperstep);
             Class<? extends Computation> computationClass =
                 bspServiceMaster.getMasterCompute().getComputation();
@@ -153,7 +153,7 @@ public class MasterThread<I extends WritableComparable, V extends Writable,
           bspServiceMaster.setJobState(ApplicationState.FINISHED, -1, -1);
         }
       }
-      bspServiceMaster.cleanup();
+      bspServiceMaster.cleanup(superstepState);
       if (!superstepSecsMap.isEmpty()) {
         GiraphTimers.getInstance().getShutdownMs().
           increment(System.currentTimeMillis() - endMillis);

http://git-wip-us.apache.org/repos/asf/giraph/blob/5adca63d/giraph-core/src/main/java/org/apache/giraph/utils/CheckpointingUtils.java
----------------------------------------------------------------------
diff --git a/giraph-core/src/main/java/org/apache/giraph/utils/CheckpointingUtils.java b/giraph-core/src/main/java/org/apache/giraph/utils/CheckpointingUtils.java
new file mode 100644
index 0000000..11d5e4f
--- /dev/null
+++ b/giraph-core/src/main/java/org/apache/giraph/utils/CheckpointingUtils.java
@@ -0,0 +1,62 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.giraph.utils;
+
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.Path;
+
+import static org.apache.giraph.conf.GiraphConstants.CHECKPOINT_DIRECTORY;
+
+/**
+ * Holds useful functions to get checkpoint paths
+ * in hdfs.
+ */
+public class CheckpointingUtils {
+
+  /**
+   * Do not call constructor.
+   */
+  private CheckpointingUtils() {
+  }
+
+  /**
+   * Path to the checkpoint's root (including job id)
+   * @param conf Immutable configuration of the job
+   * @param jobId job ID
+   * @return checkpoint's root
+   */
+  public static String getCheckpointBasePath(Configuration conf,
+                                             String jobId) {
+    return CHECKPOINT_DIRECTORY.getWithDefault(conf,
+        CHECKPOINT_DIRECTORY.getDefaultValue() + "/" + jobId);
+  }
+
+  /**
+   * Path to checkpoint&halt node in hdfs.
+   * It is set to let client know that master has
+   * successfully finished checkpointing and job can be restarted.
+   * @param conf Immutable configuration of the job
+   * @param jobId job ID
+   * @return path to checkpoint&halt node in hdfs.
+   */
+  public static Path getCheckpointMarkPath(Configuration conf,
+                                           String jobId) {
+    return new Path(getCheckpointBasePath(conf, jobId), "halt");
+  }
+}

http://git-wip-us.apache.org/repos/asf/giraph/blob/5adca63d/giraph-core/src/main/java/org/apache/giraph/utils/WritableUtils.java
----------------------------------------------------------------------
diff --git a/giraph-core/src/main/java/org/apache/giraph/utils/WritableUtils.java b/giraph-core/src/main/java/org/apache/giraph/utils/WritableUtils.java
index 763f59d..3c5cbad 100644
--- a/giraph-core/src/main/java/org/apache/giraph/utils/WritableUtils.java
+++ b/giraph-core/src/main/java/org/apache/giraph/utils/WritableUtils.java
@@ -676,4 +676,67 @@ public class WritableUtils {
       return null;
     }
   }
+
+  /**
+   * Writes a list of Writable objects into output stream.
+   * This method is trying to optimize space occupied by class information only
+   * storing class object if it is different from the previous one
+   * as in most cases arrays tend to have objects of the same type inside.
+   * @param list serialized object
+   * @param output the output stream
+   * @throws IOException
+   */
+  public static void writeList(List<Writable> list, DataOutput output)
+    throws IOException {
+    output.writeInt(list.size());
+    Class<? extends Writable> clazz = null;
+    for (Writable element : list) {
+      output.writeBoolean(element == null);
+      if (element != null) {
+        if (element.getClass() != clazz) {
+          clazz = element.getClass();
+          output.writeBoolean(true);
+          writeClass(clazz, output);
+        } else {
+          output.writeBoolean(false);
+        }
+        element.write(output);
+      }
+    }
+  }
+
+  /**
+   * Reads list of Writable objects from data input stream.
+   * Input stream should have class information along with object data.
+   * @param input input stream
+   * @return deserialized list
+   * @throws IOException
+   */
+  public static List<Writable> readList(DataInput input) throws IOException {
+    try {
+
+      int size = input.readInt();
+      List<Writable> res = new ArrayList<>(size);
+      Class<? extends Writable> clazz = null;
+      for (int i = 0; i < size; i++) {
+        boolean isNull = input.readBoolean();
+        if (isNull) {
+          res.add(null);
+        } else {
+          boolean hasClassInfo = input.readBoolean();
+          if (hasClassInfo) {
+            clazz = readClass(input);
+          }
+          Writable element = clazz.newInstance();
+          element.readFields(input);
+          res.add(element);
+        }
+      }
+      return res;
+
+    } catch (InstantiationException | IllegalAccessException e) {
+      throw new IllegalStateException("unable to instantiate object", e);
+    }
+  }
+
 }

http://git-wip-us.apache.org/repos/asf/giraph/blob/5adca63d/giraph-core/src/main/java/org/apache/giraph/worker/BspServiceWorker.java
----------------------------------------------------------------------
diff --git a/giraph-core/src/main/java/org/apache/giraph/worker/BspServiceWorker.java b/giraph-core/src/main/java/org/apache/giraph/worker/BspServiceWorker.java
index d2d24ee..447bb6f 100644
--- a/giraph-core/src/main/java/org/apache/giraph/worker/BspServiceWorker.java
+++ b/giraph-core/src/main/java/org/apache/giraph/worker/BspServiceWorker.java
@@ -21,6 +21,7 @@ package org.apache.giraph.worker;
 import org.apache.giraph.bsp.ApplicationState;
 import org.apache.giraph.bsp.BspService;
 import org.apache.giraph.bsp.CentralizedServiceWorker;
+import org.apache.giraph.bsp.CheckpointStatus;
 import org.apache.giraph.comm.ServerData;
 import org.apache.giraph.comm.WorkerClient;
 import org.apache.giraph.comm.WorkerClientRequestProcessor;
@@ -540,7 +541,8 @@ public class BspServiceWorker<I extends WritableComparable,
     // 6. Wait for superstep INPUT_SUPERSTEP to complete.
     if (getRestartedSuperstep() != UNSET_SUPERSTEP) {
       setCachedSuperstep(getRestartedSuperstep());
-      return new FinishedSuperstepStats(0, false, 0, 0, true);
+      return new FinishedSuperstepStats(0, false, 0, 0, true,
+          CheckpointStatus.NONE);
     }
 
     JSONObject jobState = getJobState();
@@ -557,7 +559,8 @@ public class BspServiceWorker<I extends WritableComparable,
                 getApplicationAttempt());
           }
           setRestartedSuperstep(getSuperstep());
-          return new FinishedSuperstepStats(0, false, 0, 0, true);
+          return new FinishedSuperstepStats(0, false, 0, 0, true,
+              CheckpointStatus.NONE);
         }
       } catch (JSONException e) {
         throw new RuntimeException(
@@ -946,7 +949,8 @@ public class BspServiceWorker<I extends WritableComparable,
         globalStats.getHaltComputation(),
         globalStats.getVertexCount(),
         globalStats.getEdgeCount(),
-        false);
+        false,
+        globalStats.getCheckpointStatus());
   }
 
   /**
@@ -1314,8 +1318,11 @@ public class BspServiceWorker<I extends WritableComparable,
     throws IOException, InterruptedException {
     workerClient.closeConnections();
     setCachedSuperstep(getSuperstep() - 1);
-    saveVertices(finishedSuperstepStats.getLocalVertexCount());
-    saveEdges();
+    if (finishedSuperstepStats.getCheckpointStatus() !=
+        CheckpointStatus.CHECKPOINT_AND_HALT) {
+      saveVertices(finishedSuperstepStats.getLocalVertexCount());
+      saveEdges();
+    }
     WorkerProgress.get().finishStoring();
     if (workerProgressWriter != null) {
       workerProgressWriter.stop();
@@ -1414,6 +1421,10 @@ public class BspServiceWorker<I extends WritableComparable,
 
     }
 
+    List<Writable> w2wMessages =
+        getServerData().getCurrentWorkerToWorkerMessages();
+    WritableUtils.writeList(w2wMessages, checkpointOutputStream);
+
     checkpointOutputStream.close();
 
     getFs().createNewFile(validFilePath);
@@ -1488,9 +1499,9 @@ public class BspServiceWorker<I extends WritableComparable,
 
     final CompressionCodec codec =
         new CompressionCodecFactory(getConfiguration())
-            .getCodecByClassName(
+            .getCodec(new Path(
                 GiraphConstants.CHECKPOINT_COMPRESSION_CODEC
-                    .get(getConfiguration()));
+                    .get(getConfiguration())));
 
     long t0 = System.currentTimeMillis();
 
@@ -1559,9 +1570,9 @@ public class BspServiceWorker<I extends WritableComparable,
 
     final CompressionCodec codec =
         new CompressionCodecFactory(getConfiguration())
-            .getCodecByClassName(
+            .getCodec(new Path(
                 GiraphConstants.CHECKPOINT_COMPRESSION_CODEC
-                    .get(getConfiguration()));
+                    .get(getConfiguration())));
 
     long t0 = System.currentTimeMillis();
 
@@ -1660,6 +1671,10 @@ public class BspServiceWorker<I extends WritableComparable,
         getServerData().getCurrentMessageStore().readFieldsForPartition(
             checkpointStream, partitionId);
       }
+
+      List<Writable> w2wMessages = WritableUtils.readList(checkpointStream);
+      getServerData().getCurrentWorkerToWorkerMessages().addAll(w2wMessages);
+
       checkpointStream.close();
 
       if (LOG.isInfoEnabled()) {
@@ -1920,4 +1935,18 @@ else[HADOOP_NON_SECURE]*/
   public SuperstepOutput<I, V, E> getSuperstepOutput() {
     return superstepOutput;
   }
+
+  @Override
+  public GlobalStats getGlobalStats() {
+    GlobalStats globalStats = new GlobalStats();
+    if (getSuperstep() > Math.max(INPUT_SUPERSTEP, getRestartedSuperstep())) {
+      String superstepFinishedNode =
+          getSuperstepFinishedPath(getApplicationAttempt(),
+              getSuperstep() - 1);
+      WritableUtils.readFieldsFromZnode(
+          getZkExt(), superstepFinishedNode, false, null,
+          globalStats);
+    }
+    return globalStats;
+  }
 }

http://git-wip-us.apache.org/repos/asf/giraph/blob/5adca63d/giraph-core/src/test/java/org/apache/giraph/utils/TestWritableUtils.java
----------------------------------------------------------------------
diff --git a/giraph-core/src/test/java/org/apache/giraph/utils/TestWritableUtils.java b/giraph-core/src/test/java/org/apache/giraph/utils/TestWritableUtils.java
new file mode 100644
index 0000000..c712b5a
--- /dev/null
+++ b/giraph-core/src/test/java/org/apache/giraph/utils/TestWritableUtils.java
@@ -0,0 +1,70 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.giraph.utils;
+
+import org.apache.hadoop.io.FloatWritable;
+import org.apache.hadoop.io.LongWritable;
+import org.apache.hadoop.io.Writable;
+import org.junit.Assert;
+import org.junit.Test;
+
+import java.io.ByteArrayInputStream;
+import java.io.ByteArrayOutputStream;
+import java.io.DataInputStream;
+import java.io.DataOutputStream;
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.List;
+
+/**
+ * Test case for WritableUtils.
+ */
+public class TestWritableUtils {
+
+  /**
+   * Tests readList and writeList functions in writable utils.
+   * @throws IOException
+   */
+  @Test
+  public void testListSerialization() throws IOException {
+    List<Writable> list = new ArrayList<>();
+    list.add(new LongWritable(1));
+    list.add(new LongWritable(2));
+    list.add(null);
+    list.add(new FloatWritable(3));
+    list.add(new FloatWritable(4));
+    list.add(new LongWritable(5));
+    list.add(new LongWritable(6));
+
+    ByteArrayOutputStream bos = new ByteArrayOutputStream();
+    DataOutputStream dos = new DataOutputStream(bos);
+    WritableUtils.writeList(list, dos);
+    dos.close();
+
+    byte[] data = bos.toByteArray();
+
+    DataInputStream input =
+        new DataInputStream(new ByteArrayInputStream(data));
+
+    List<Writable> result = WritableUtils.readList(input);
+
+    Assert.assertEquals(list, result);
+
+  }
+
+}

http://git-wip-us.apache.org/repos/asf/giraph/blob/5adca63d/giraph-examples/src/test/java/org/apache/giraph/TestCheckpointing.java
----------------------------------------------------------------------
diff --git a/giraph-examples/src/test/java/org/apache/giraph/TestCheckpointing.java b/giraph-examples/src/test/java/org/apache/giraph/TestCheckpointing.java
index 2939af7..9502557 100644
--- a/giraph-examples/src/test/java/org/apache/giraph/TestCheckpointing.java
+++ b/giraph-examples/src/test/java/org/apache/giraph/TestCheckpointing.java
@@ -19,8 +19,10 @@
 package org.apache.giraph;
 
 import org.apache.giraph.aggregators.LongSumAggregator;
+import org.apache.giraph.bsp.BspService;
 import org.apache.giraph.conf.GiraphConfiguration;
 import org.apache.giraph.conf.GiraphConstants;
+import org.apache.giraph.conf.ImmutableClassesGiraphConfiguration;
 import org.apache.giraph.edge.Edge;
 import org.apache.giraph.edge.EdgeFactory;
 import org.apache.giraph.examples.SimpleSuperstepComputation;
@@ -29,17 +31,25 @@ import org.apache.giraph.graph.Vertex;
 import org.apache.giraph.job.GiraphJob;
 import org.apache.giraph.master.DefaultMasterCompute;
 import org.apache.giraph.worker.DefaultWorkerContext;
+import org.apache.giraph.zk.ZooKeeperExt;
+import org.apache.giraph.zk.ZooKeeperManager;
 import org.apache.hadoop.fs.FileStatus;
 import org.apache.hadoop.fs.Path;
 import org.apache.hadoop.io.FloatWritable;
 import org.apache.hadoop.io.IntWritable;
 import org.apache.hadoop.io.LongWritable;
+import org.apache.hadoop.io.Writable;
 import org.apache.log4j.Logger;
+import org.apache.zookeeper.CreateMode;
+import org.apache.zookeeper.KeeperException;
+import org.apache.zookeeper.ZooDefs;
+import org.junit.Assert;
 import org.junit.Test;
 
 import java.io.DataInput;
 import java.io.DataOutput;
 import java.io.IOException;
+import java.util.List;
 
 import static org.junit.Assert.assertEquals;
 import static org.junit.Assert.assertTrue;
@@ -56,13 +66,8 @@ public class TestCheckpointing extends BspCase {
       Logger.getLogger(TestCheckpointing.class);
   /** ID to be used with test job */
   public static final String TEST_JOB_ID = "test_job";
-  /**
-   * Compute will double check that we don't run supersteps
-   * lesser than specified by this key. That way we ensure that
-   * computation actually restarted and not recalculated from the
-   * beginning.
-   */
-  public static final String KEY_MIN_SUPERSTEP = "minimum.superstep";
+
+  private static SuperstepCallback SUPERSTEP_CALLBACK;
 
   /**
    * Create the test case
@@ -84,49 +89,45 @@ public class TestCheckpointing extends BspCase {
   public void testBspCheckpoint(boolean useAsyncMessageStore)
       throws IOException, InterruptedException, ClassNotFoundException {
     Path checkpointsDir = getTempPath("checkpointing");
-    Path outputPath = getTempPath(getCallingMethodName());
     GiraphConfiguration conf = new GiraphConfiguration();
-    conf.setComputationClass(
-        CheckpointComputation.class);
-    conf.setWorkerContextClass(
-        CheckpointVertexWorkerContext.class);
-    conf.setMasterComputeClass(
-        CheckpointVertexMasterCompute.class);
-    conf.setVertexInputFormatClass(SimpleSuperstepComputation.SimpleSuperstepVertexInputFormat.class);
-    conf.setVertexOutputFormatClass(SimpleSuperstepComputation.SimpleSuperstepVertexOutputFormat.class);
-    conf.set("mapred.job.id", TEST_JOB_ID);
-    conf.set(KEY_MIN_SUPERSTEP, "0");
     if (useAsyncMessageStore) {
       GiraphConstants.ASYNC_MESSAGE_STORE_THREADS_COUNT.set(conf, 2);
     }
-    GiraphJob job = prepareJob(getCallingMethodName(), conf, outputPath);
 
-    GiraphConfiguration configuration = job.getConfiguration();
-    GiraphConstants.CHECKPOINT_DIRECTORY.set(configuration, checkpointsDir.toString());
-    GiraphConstants.CLEANUP_CHECKPOINTS_AFTER_SUCCESS.set(configuration, false);
-    configuration.setCheckpointFrequency(2);
+    SUPERSTEP_CALLBACK = null;
 
-    assertTrue(job.run(true));
+    GiraphConstants.CLEANUP_CHECKPOINTS_AFTER_SUCCESS.set(conf, false);
+    conf.setCheckpointFrequency(2);
 
-    long idSum = 0;
-    if (!runningInDistributedMode()) {
-      FileStatus fileStatus = getSinglePartFileStatus(job.getConfiguration(),
-          outputPath);
-      idSum = CheckpointVertexWorkerContext
-          .getFinalSum();
-      LOG.info("testBspCheckpoint: idSum = " + idSum +
-          " fileLen = " + fileStatus.getLen());
-    }
+    long idSum = runOriginalJob(checkpointsDir, conf);
+    assertEquals(10, idSum);
+
+    SUPERSTEP_CALLBACK = new SuperstepCallback() {
+      @Override
+      public void superstep(long superstep,
+                            ImmutableClassesGiraphConfiguration<LongWritable, IntWritable, FloatWritable> conf) {
+        if (superstep < 2) {
+          Assert.fail("Restarted JOB should not be executed on superstep " + superstep);
+        }
+      }
+    };
+
+    runRestartedJob(checkpointsDir, conf, idSum, 2);
+
+
+  }
 
-    // Restart the test from superstep 2
-    LOG.info("testBspCheckpoint: Restarting from superstep 2" +
-        " with checkpoint path = " + checkpointsDir);
+  private void runRestartedJob(Path checkpointsDir, GiraphConfiguration conf, long idSum, long restartFrom) throws IOException, InterruptedException, ClassNotFoundException {
+    Path outputPath;
+    LOG.info("testBspCheckpoint: Restarting from the latest superstep " +
+        "with checkpoint path = " + checkpointsDir);
     outputPath = getTempPath("checkpointing_restarted");
 
     GiraphConstants.RESTART_JOB_ID.set(conf, TEST_JOB_ID);
     conf.set("mapred.job.id", "restarted_test_job");
-    conf.set(GiraphConstants.RESTART_SUPERSTEP, "2");
-    conf.set(KEY_MIN_SUPERSTEP, "2");
+    if (restartFrom >= 0) {
+      conf.set(GiraphConstants.RESTART_SUPERSTEP, Long.toString(restartFrom));
+    }
 
     GiraphJob restartedJob = prepareJob(getCallingMethodName() + "Restarted",
         conf, outputPath);
@@ -135,6 +136,8 @@ public class TestCheckpointing extends BspCase {
         checkpointsDir.toString());
 
     assertTrue(restartedJob.run(true));
+
+
     if (!runningInDistributedMode()) {
       long idSumRestarted =
           CheckpointVertexWorkerContext
@@ -145,6 +148,36 @@ public class TestCheckpointing extends BspCase {
     }
   }
 
+  private long runOriginalJob(Path checkpointsDir,  GiraphConfiguration conf) throws IOException, InterruptedException, ClassNotFoundException {
+    Path outputPath = getTempPath("checkpointing_original");
+    conf.setComputationClass(
+        CheckpointComputation.class);
+    conf.setWorkerContextClass(
+        CheckpointVertexWorkerContext.class);
+    conf.setMasterComputeClass(
+        CheckpointVertexMasterCompute.class);
+    conf.setVertexInputFormatClass(SimpleSuperstepComputation.SimpleSuperstepVertexInputFormat.class);
+    conf.setVertexOutputFormatClass(SimpleSuperstepComputation.SimpleSuperstepVertexOutputFormat.class);
+    conf.set("mapred.job.id", TEST_JOB_ID);
+    GiraphJob job = prepareJob(getCallingMethodName(), conf, outputPath);
+
+    GiraphConfiguration configuration = job.getConfiguration();
+    GiraphConstants.CHECKPOINT_DIRECTORY.set(configuration, checkpointsDir.toString());
+
+    assertTrue(job.run(true));
+
+    long idSum = 0;
+    if (!runningInDistributedMode()) {
+      FileStatus fileStatus = getSinglePartFileStatus(job.getConfiguration(),
+          outputPath);
+      idSum = CheckpointVertexWorkerContext
+          .getFinalSum();
+      LOG.info("testBspCheckpoint: idSum = " + idSum +
+          " fileLen = " + fileStatus.getLen());
+    }
+    return idSum;
+  }
+
 
   /**
    * Actual computation.
@@ -159,10 +192,6 @@ public class TestCheckpointing extends BspCase {
       CheckpointVertexWorkerContext workerContext = getWorkerContext();
       assertEquals(getSuperstep() + 1, workerContext.testValue);
 
-      if (getSuperstep() < getConf().getInt(KEY_MIN_SUPERSTEP, Integer.MAX_VALUE)){
-        fail("Should not be running compute on superstep " + getSuperstep());
-      }
-
       if (getSuperstep() > 4) {
         vertex.voteToHalt();
         return;
@@ -186,10 +215,76 @@ public class TestCheckpointing extends BspCase {
             EdgeFactory.create(edge.getTargetVertexId(), newEdgeValue);
         vertex.addEdge(newEdge);
         sendMessage(edge.getTargetVertexId(), newEdgeValue);
+
       }
     }
   }
 
+  @Test
+  public void testManualCheckpointAtTheBeginning()
+      throws InterruptedException, IOException, ClassNotFoundException {
+    testManualCheckpoint(0);
+  }
+
+  @Test
+  public void testManualCheckpoint()
+      throws InterruptedException, IOException, ClassNotFoundException {
+    testManualCheckpoint(2);
+  }
+
+
+  private void testManualCheckpoint(final int checkpointSuperstep)
+      throws IOException, InterruptedException, ClassNotFoundException {
+    Path checkpointsDir = getTempPath("checkpointing");
+    GiraphConfiguration conf = new GiraphConfiguration();
+
+    SUPERSTEP_CALLBACK = new SuperstepCallback() {
+
+      @Override
+      public void superstep(long superstep, ImmutableClassesGiraphConfiguration<LongWritable, IntWritable, FloatWritable> conf) {
+        if (superstep == checkpointSuperstep) {
+          try {
+            ZooKeeperExt zooKeeperExt = new ZooKeeperExt(conf.getZookeeperList(),
+                conf.getZooKeeperSessionTimeout(),
+                conf.getZookeeperOpsMaxAttempts(),
+                conf.getZookeeperOpsRetryWaitMsecs(),
+                TestCheckpointing.this);
+            String basePath = ZooKeeperManager.getBasePath(conf) + BspService.BASE_DIR + "/" + conf.get("mapred.job.id");
+            zooKeeperExt.createExt(
+                basePath + BspService.FORCE_CHECKPOINT_USER_FLAG,
+                null,
+                ZooDefs.Ids.OPEN_ACL_UNSAFE,
+                CreateMode.PERSISTENT,
+                true);
+          } catch (IOException | InterruptedException | KeeperException e) {
+            throw new RuntimeException(e);
+          }
+        } else if (superstep > checkpointSuperstep) {
+          Assert.fail("Job should be stopped by now " + superstep);
+        }
+      }
+    };
+
+    try {
+      runOriginalJob(checkpointsDir, conf);
+      fail("Original job should fail after checkpointing");
+    } catch (Exception e) {
+      LOG.info("Original job failed, that's OK " + e);
+    }
+
+    SUPERSTEP_CALLBACK = new SuperstepCallback() {
+      @Override
+      public void superstep(long superstep,
+                            ImmutableClassesGiraphConfiguration<LongWritable, IntWritable, FloatWritable> conf) {
+        if (superstep < checkpointSuperstep) {
+          Assert.fail("Restarted JOB should not be executed on superstep " + superstep);
+        }
+      }
+    };
+
+    runRestartedJob(checkpointsDir, conf, 10, -1);
+  }
+
   /**
    * Worker context associated.
    */
@@ -205,6 +300,21 @@ public class TestCheckpointing extends BspCase {
     }
 
     @Override
+    public void postSuperstep() {
+      super.postSuperstep();
+      sendMessageToMyself(new LongWritable(getSuperstep()));
+    }
+
+    /**
+     * Send message to all workers (except this worker)
+     *
+     * @param message Message to send
+     */
+    private void sendMessageToMyself(Writable message) {
+      sendMessageToWorker(message, getMyWorkerIndex());
+    }
+
+    @Override
     public void postApplication() {
       setFinalSum(this.<LongWritable>getAggregatedValue(
           LongSumAggregator.class.getName()).get());
@@ -223,6 +333,11 @@ public class TestCheckpointing extends BspCase {
     @Override
     public void preSuperstep() {
       assertEquals(getSuperstep(), testValue++);
+      if (getSuperstep() > 0) {
+        List<Writable> messages = getAndClearMessagesFromOtherWorkers();
+        assertEquals(1, messages.size());
+        assertEquals(getSuperstep() - 1, ((LongWritable)(messages.get(0))).get());
+      }
     }
 
     @Override
@@ -249,6 +364,9 @@ public class TestCheckpointing extends BspCase {
     @Override
     public void compute() {
       long superstep = getSuperstep();
+      if (SUPERSTEP_CALLBACK != null) {
+        SUPERSTEP_CALLBACK.superstep(getSuperstep(), getConf());
+      }
       assertEquals(superstep, testValue++);
     }
 
@@ -272,6 +390,12 @@ public class TestCheckpointing extends BspCase {
     }
   }
 
+  private static interface SuperstepCallback {
 
+    public void superstep(long superstep,
+                          ImmutableClassesGiraphConfiguration<LongWritable,
+                              IntWritable, FloatWritable> conf);
+
+  }
 
 }

http://git-wip-us.apache.org/repos/asf/giraph/blob/5adca63d/pom.xml
----------------------------------------------------------------------
diff --git a/pom.xml b/pom.xml
index 672ec44..b4d78ae 100644
--- a/pom.xml
+++ b/pom.xml
@@ -1059,7 +1059,7 @@ under the License.
       </modules>
       <properties>
         <hadoop.version>0.20.0</hadoop.version>
-        <munge.symbols>HADOOP_NON_SECURE,HADOOP_NON_JOBCONTEXT_IS_INTERFACE</munge.symbols>
+        <munge.symbols>HADOOP_NON_SECURE,HADOOP_NON_JOBCONTEXT_IS_INTERFACE,HADOOP_JOB_ID_AVAILABLE</munge.symbols>
       </properties>
       <dependencies>
         <!-- sorted lexicographically -->