You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@giraph.apache.org by ma...@apache.org on 2014/10/20 19:16:32 UTC

git commit: updated refs/heads/trunk to 54a1a8d

Repository: giraph
Updated Branches:
  refs/heads/trunk d32c429a1 -> 54a1a8ded


Auto-restart from checkpoint doesn't pick up latest checkpoint

Summary:
While running different jobs with checkpoints enabled I noticed some issues:
1) The way we pick up latest checkpoint is not correct. Current implementation just picks whatever is returned last from FileSystem.list(), which is not necessarily the last checkpoint
2) If job restarts from checkpoint it immediately creates another checkpoint.
3) We need more flexibility in GiraphJobRetryChecker to allow restarts after multiple failures.

Test Plan: Run our production jobs with checkpointing

Reviewers: majakabiljo, pavanka, pavanka.26, maja.kabiljo

Reviewed By: maja.kabiljo

Differential Revision: https://reviews.facebook.net/D23913


Project: http://git-wip-us.apache.org/repos/asf/giraph/repo
Commit: http://git-wip-us.apache.org/repos/asf/giraph/commit/54a1a8de
Tree: http://git-wip-us.apache.org/repos/asf/giraph/tree/54a1a8de
Diff: http://git-wip-us.apache.org/repos/asf/giraph/diff/54a1a8de

Branch: refs/heads/trunk
Commit: 54a1a8dedbd4af64ca8079da69878f89a69d9e5b
Parents: d32c429
Author: Sergey Edunov <ed...@fb.com>
Authored: Mon Oct 20 10:08:56 2014 -0700
Committer: Maja Kabiljo <ma...@fb.com>
Committed: Mon Oct 20 10:15:57 2014 -0700

----------------------------------------------------------------------
 .../java/org/apache/giraph/bsp/BspService.java  |  72 +---------
 .../job/DefaultGiraphJobRetryChecker.java       |   5 +-
 .../java/org/apache/giraph/job/GiraphJob.java   |  24 +---
 .../giraph/job/GiraphJobRetryChecker.java       |   9 +-
 .../apache/giraph/master/BspServiceMaster.java  |  10 +-
 .../apache/giraph/utils/CheckpointingUtils.java |  94 +++++++++++++
 .../org/apache/giraph/utils/WritableUtils.java  | 139 +++++++++++++++----
 .../apache/giraph/worker/BspServiceWorker.java  |  32 +++--
 .../apache/giraph/utils/TestWritableUtils.java  |  40 +++++-
 9 files changed, 283 insertions(+), 142 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/giraph/blob/54a1a8de/giraph-core/src/main/java/org/apache/giraph/bsp/BspService.java
----------------------------------------------------------------------
diff --git a/giraph-core/src/main/java/org/apache/giraph/bsp/BspService.java b/giraph-core/src/main/java/org/apache/giraph/bsp/BspService.java
index 2a50489..579c772 100644
--- a/giraph-core/src/main/java/org/apache/giraph/bsp/BspService.java
+++ b/giraph-core/src/main/java/org/apache/giraph/bsp/BspService.java
@@ -31,10 +31,7 @@ import org.apache.giraph.zk.BspEvent;
 import org.apache.giraph.zk.PredicateLock;
 import org.apache.giraph.zk.ZooKeeperExt;
 import org.apache.giraph.zk.ZooKeeperManager;
-import org.apache.hadoop.fs.FileStatus;
 import org.apache.hadoop.fs.FileSystem;
-import org.apache.hadoop.fs.Path;
-import org.apache.hadoop.fs.PathFilter;
 import org.apache.hadoop.io.Writable;
 import org.apache.hadoop.io.WritableComparable;
 import org.apache.hadoop.mapreduce.Mapper;
@@ -52,9 +49,7 @@ import org.json.JSONObject;
 import java.io.IOException;
 import java.net.UnknownHostException;
 import java.nio.charset.Charset;
-import java.security.InvalidParameterException;
 import java.util.ArrayList;
-import java.util.Arrays;
 import java.util.Collections;
 import java.util.List;
 
@@ -190,28 +185,7 @@ public abstract class BspService<I extends WritableComparable,
   public static final String WORKER_SUFFIX = "_worker";
   /** Suffix denotes a master */
   public static final String MASTER_SUFFIX = "_master";
-  /** If at the end of a checkpoint file, indicates metadata */
-  public static final String CHECKPOINT_METADATA_POSTFIX = ".metadata";
-  /**
-   * If at the end of a checkpoint file, indicates vertices, edges,
-   * messages, etc.
-   */
-  public static final String CHECKPOINT_VERTICES_POSTFIX = ".vertices";
-  /**
-   * If at the end of a checkpoint file, indicates metadata and data is valid
-   * for the same filenames without .valid
-   */
-  public static final String CHECKPOINT_VALID_POSTFIX = ".valid";
-  /**
-   * If at the end of a checkpoint file,
-   * indicates that we store WorkerContext and aggregator handler data.
-   */
-  public static final String CHECKPOINT_DATA_POSTFIX = ".data";
-  /**
-   * If at the end of a checkpoint file, indicates the stitched checkpoint
-   * file prefixes.  A checkpoint is not valid if this file does not exist.
-   */
-  public static final String CHECKPOINT_FINALIZED_POSTFIX = ".finalized";
+
   /** Class logger */
   private static final Logger LOG = Logger.getLogger(BspService.class);
   /** Path to the job's root */
@@ -606,22 +580,6 @@ public abstract class BspService<I extends WritableComparable,
     return savedCheckpointBasePath + "/" + superstep;
   }
 
-  /**
-   * Get the checkpoint from a finalized checkpoint path
-   *
-   * @param finalizedPath Path of the finalized checkpoint
-   * @return Superstep referring to a checkpoint of the finalized path
-   */
-  public static long getCheckpoint(Path finalizedPath) {
-    if (!finalizedPath.getName().endsWith(CHECKPOINT_FINALIZED_POSTFIX)) {
-      throw new InvalidParameterException(
-          "getCheckpoint: " + finalizedPath + "Doesn't end in " +
-              CHECKPOINT_FINALIZED_POSTFIX);
-    }
-    String checkpointString =
-        finalizedPath.getName().replace(CHECKPOINT_FINALIZED_POSTFIX, "");
-    return Long.parseLong(checkpointString);
-  }
 
   /**
    * Get the ZooKeeperExt instance.
@@ -1235,22 +1193,8 @@ public abstract class BspService<I extends WritableComparable,
    * @throws IOException
    */
   protected long getLastCheckpointedSuperstep() throws IOException {
-    FileStatus[] fileStatusArray =
-        getFs().listStatus(new Path(savedCheckpointBasePath),
-            new FinalizedCheckpointPathFilter());
-    if (fileStatusArray == null) {
-      return -1;
-    }
-    Arrays.sort(fileStatusArray);
-    long lastCheckpointedSuperstep = getCheckpoint(
-        fileStatusArray[fileStatusArray.length - 1].getPath());
-    if (LOG.isInfoEnabled()) {
-      LOG.info("getLastGoodCheckpoint: Found last good checkpoint " +
-          lastCheckpointedSuperstep + " from " +
-          fileStatusArray[fileStatusArray.length - 1].
-              getPath().toString());
-    }
-    return lastCheckpointedSuperstep;
+    return CheckpointingUtils.getLastCheckpointedSuperstep(getFs(),
+        savedCheckpointBasePath);
   }
 
   @Override
@@ -1258,15 +1202,5 @@ public abstract class BspService<I extends WritableComparable,
     return getGraphTaskManager().getJobProgressTracker();
   }
 
-  /**
-   * Only get the finalized checkpoint files
-   */
-  private static class FinalizedCheckpointPathFilter implements PathFilter {
-    @Override
-    public boolean accept(Path path) {
-      return path.getName().endsWith(BspService.CHECKPOINT_FINALIZED_POSTFIX);
-    }
-
-  }
 
 }

http://git-wip-us.apache.org/repos/asf/giraph/blob/54a1a8de/giraph-core/src/main/java/org/apache/giraph/job/DefaultGiraphJobRetryChecker.java
----------------------------------------------------------------------
diff --git a/giraph-core/src/main/java/org/apache/giraph/job/DefaultGiraphJobRetryChecker.java b/giraph-core/src/main/java/org/apache/giraph/job/DefaultGiraphJobRetryChecker.java
index edf6bce..405bde3 100644
--- a/giraph-core/src/main/java/org/apache/giraph/job/DefaultGiraphJobRetryChecker.java
+++ b/giraph-core/src/main/java/org/apache/giraph/job/DefaultGiraphJobRetryChecker.java
@@ -32,7 +32,8 @@ public class DefaultGiraphJobRetryChecker implements GiraphJobRetryChecker {
   }
 
   @Override
-  public boolean shouldRestartCheckpoint() {
-    return false;
+  public String shouldRestartCheckpoint(Job lastAttempt) {
+    return null;
   }
+
 }

http://git-wip-us.apache.org/repos/asf/giraph/blob/54a1a8de/giraph-core/src/main/java/org/apache/giraph/job/GiraphJob.java
----------------------------------------------------------------------
diff --git a/giraph-core/src/main/java/org/apache/giraph/job/GiraphJob.java b/giraph-core/src/main/java/org/apache/giraph/job/GiraphJob.java
index ca1ad1c..11ae7fc 100644
--- a/giraph-core/src/main/java/org/apache/giraph/job/GiraphJob.java
+++ b/giraph-core/src/main/java/org/apache/giraph/job/GiraphJob.java
@@ -24,13 +24,9 @@ import org.apache.giraph.conf.GiraphConfiguration;
 import org.apache.giraph.conf.GiraphConstants;
 import org.apache.giraph.conf.ImmutableClassesGiraphConfiguration;
 import org.apache.giraph.graph.GraphMapper;
-import org.apache.giraph.utils.CheckpointingUtils;
 import org.apache.hadoop.conf.Configuration;
-import org.apache.hadoop.fs.FileSystem;
-import org.apache.hadoop.fs.Path;
 import org.apache.hadoop.ipc.Client;
 import org.apache.hadoop.mapreduce.Job;
-import org.apache.hadoop.mapreduce.JobID;
 import org.apache.log4j.Logger;
 
 import java.io.IOException;
@@ -276,22 +272,12 @@ public class GiraphJob {
       }
       jobObserver.jobFinished(submittedJob, passed);
 
-      FileSystem fs = FileSystem.get(conf);
-      JobID jobID = HadoopUtils.getJobID(submittedJob);
-      if (jobID != null) {
-        Path checkpointMark =
-            CheckpointingUtils.getCheckpointMarkPath(conf, jobID.toString());
-
-        if (fs.exists(checkpointMark)) {
-          if (retryChecker.shouldRestartCheckpoint()) {
-            GiraphConstants.RESTART_JOB_ID.set(conf, jobID.toString());
-            continue;
-          }
+      if (!passed) {
+        String restartFrom = retryChecker.shouldRestartCheckpoint(submittedJob);
+        if (restartFrom != null) {
+          GiraphConstants.RESTART_JOB_ID.set(conf, restartFrom);
+          continue;
         }
-      } else {
-        LOG.warn("jobID is null, are you using hadoop 0.20.203? " +
-            "Please report this issue here " +
-            "https://issues.apache.org/jira/browse/GIRAPH-933");
       }
 
       if (passed || !retryChecker.shouldRetry(submittedJob, tryCount)) {

http://git-wip-us.apache.org/repos/asf/giraph/blob/54a1a8de/giraph-core/src/main/java/org/apache/giraph/job/GiraphJobRetryChecker.java
----------------------------------------------------------------------
diff --git a/giraph-core/src/main/java/org/apache/giraph/job/GiraphJobRetryChecker.java b/giraph-core/src/main/java/org/apache/giraph/job/GiraphJobRetryChecker.java
index 556b128..b213c51 100644
--- a/giraph-core/src/main/java/org/apache/giraph/job/GiraphJobRetryChecker.java
+++ b/giraph-core/src/main/java/org/apache/giraph/job/GiraphJobRetryChecker.java
@@ -20,6 +20,9 @@ package org.apache.giraph.job;
 
 import org.apache.hadoop.mapreduce.Job;
 
+import java.io.IOException;
+
+
 /**
  * Class to decide whether a GiraphJob should be restarted after failure.
  */
@@ -36,7 +39,9 @@ public interface GiraphJobRetryChecker {
 
   /**
    * The job has been checkpointed and halted. Should we now restart it?
-   * @return true if checkpointed job should be automatically restarted.
+   * @param lastAttempt latest failed job
+   * @return JobID of job to be restarted or null if
+   * we don't want to restart anything
    */
-  boolean shouldRestartCheckpoint();
+  String shouldRestartCheckpoint(Job lastAttempt) throws IOException;
 }

http://git-wip-us.apache.org/repos/asf/giraph/blob/54a1a8de/giraph-core/src/main/java/org/apache/giraph/master/BspServiceMaster.java
----------------------------------------------------------------------
diff --git a/giraph-core/src/main/java/org/apache/giraph/master/BspServiceMaster.java b/giraph-core/src/main/java/org/apache/giraph/master/BspServiceMaster.java
index af7e5fd..62b089c 100644
--- a/giraph-core/src/main/java/org/apache/giraph/master/BspServiceMaster.java
+++ b/giraph-core/src/main/java/org/apache/giraph/master/BspServiceMaster.java
@@ -772,8 +772,8 @@ public class BspServiceMaster<I extends WritableComparable,
     throws IOException, KeeperException, InterruptedException {
     List<PartitionOwner> partitionOwners = new ArrayList<>();
     FileSystem fs = getFs();
-    String finalizedCheckpointPath =
-        getSavedCheckpointBasePath(superstep) + CHECKPOINT_FINALIZED_POSTFIX;
+    String finalizedCheckpointPath = getSavedCheckpointBasePath(superstep) +
+        CheckpointingUtils.CHECKPOINT_FINALIZED_POSTFIX;
     LOG.info("Loading checkpoint from " + finalizedCheckpointPath);
     DataInputStream finalizedStream =
         fs.open(new Path(finalizedCheckpointPath));
@@ -796,7 +796,7 @@ public class BspServiceMaster<I extends WritableComparable,
       int mrTaskId = finalizedStream.readInt();
 
       DataInputStream metadataStream = fs.open(new Path(checkpointFile +
-          "." + mrTaskId + CHECKPOINT_METADATA_POSTFIX));
+          "." + mrTaskId + CheckpointingUtils.CHECKPOINT_METADATA_POSTFIX));
       long partitions = metadataStream.readInt();
       WorkerInfo worker = workersMap.get(mrTaskId);
       for (long p = 0; p < partitions; ++p) {
@@ -1081,7 +1081,7 @@ public class BspServiceMaster<I extends WritableComparable,
     throws IOException, KeeperException, InterruptedException {
     Path finalizedCheckpointPath =
         new Path(getCheckpointBasePath(superstep) +
-            CHECKPOINT_FINALIZED_POSTFIX);
+            CheckpointingUtils.CHECKPOINT_FINALIZED_POSTFIX);
     try {
       getFs().delete(finalizedCheckpointPath, false);
     } catch (IOException e) {
@@ -1761,7 +1761,7 @@ public class BspServiceMaster<I extends WritableComparable,
     if (checkpointFrequency == 0) {
       return CheckpointStatus.NONE;
     }
-    long firstCheckpoint = INPUT_SUPERSTEP + 1;
+    long firstCheckpoint = INPUT_SUPERSTEP + 1 + checkpointFrequency;
     if (getRestartedSuperstep() != UNSET_SUPERSTEP) {
       firstCheckpoint = getRestartedSuperstep() + checkpointFrequency;
     }

http://git-wip-us.apache.org/repos/asf/giraph/blob/54a1a8de/giraph-core/src/main/java/org/apache/giraph/utils/CheckpointingUtils.java
----------------------------------------------------------------------
diff --git a/giraph-core/src/main/java/org/apache/giraph/utils/CheckpointingUtils.java b/giraph-core/src/main/java/org/apache/giraph/utils/CheckpointingUtils.java
index 11d5e4f..f431260 100644
--- a/giraph-core/src/main/java/org/apache/giraph/utils/CheckpointingUtils.java
+++ b/giraph-core/src/main/java/org/apache/giraph/utils/CheckpointingUtils.java
@@ -19,7 +19,14 @@
 package org.apache.giraph.utils;
 
 import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.FileStatus;
+import org.apache.hadoop.fs.FileSystem;
 import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.fs.PathFilter;
+import org.apache.log4j.Logger;
+
+import java.io.IOException;
+import java.security.InvalidParameterException;
 
 import static org.apache.giraph.conf.GiraphConstants.CHECKPOINT_DIRECTORY;
 
@@ -29,6 +36,32 @@ import static org.apache.giraph.conf.GiraphConstants.CHECKPOINT_DIRECTORY;
  */
 public class CheckpointingUtils {
 
+  /** If at the end of a checkpoint file, indicates metadata */
+  public static final String CHECKPOINT_METADATA_POSTFIX = ".metadata";
+  /**
+   * If at the end of a checkpoint file, indicates vertices, edges,
+   * messages, etc.
+   */
+  public static final String CHECKPOINT_VERTICES_POSTFIX = ".vertices";
+  /**
+   * If at the end of a checkpoint file, indicates metadata and data is valid
+   * for the same filenames without .valid
+   */
+  public static final String CHECKPOINT_VALID_POSTFIX = ".valid";
+  /**
+   * If at the end of a checkpoint file,
+   * indicates that we store WorkerContext and aggregator handler data.
+   */
+  public static final String CHECKPOINT_DATA_POSTFIX = ".data";
+  /**
+   * If at the end of a checkpoint file, indicates the stitched checkpoint
+   * file prefixes.  A checkpoint is not valid if this file does not exist.
+   */
+  public static final String CHECKPOINT_FINALIZED_POSTFIX = ".finalized";
+
+  /** Class logger */
+  private static final Logger LOG = Logger.getLogger(CheckpointingUtils.class);
+
   /**
    * Do not call constructor.
    */
@@ -59,4 +92,65 @@ public class CheckpointingUtils {
                                            String jobId) {
     return new Path(getCheckpointBasePath(conf, jobId), "halt");
   }
+
+  /**
+   * Get the last saved superstep.
+   *
+   * @param fs file system where checkpoint is stored.
+   * @param checkpointBasePath path to checkpoints folder
+   * @return Last good superstep number
+   * @throws java.io.IOException
+   */
+  public static long getLastCheckpointedSuperstep(
+      FileSystem fs, String checkpointBasePath) throws IOException {
+    FileStatus[] fileStatusArray =
+        fs.listStatus(new Path(checkpointBasePath),
+            new FinalizedCheckpointPathFilter());
+    if (fileStatusArray == null) {
+      return -1;
+    }
+    long lastCheckpointedSuperstep = Long.MIN_VALUE;
+    for (FileStatus file : fileStatusArray) {
+      long superstep = getCheckpoint(file);
+      if (superstep > lastCheckpointedSuperstep) {
+        lastCheckpointedSuperstep = superstep;
+      }
+    }
+    if (LOG.isInfoEnabled()) {
+      LOG.info("getLastGoodCheckpoint: Found last good checkpoint " +
+          lastCheckpointedSuperstep);
+    }
+    return lastCheckpointedSuperstep;
+  }
+
+  /**
+   * Get the checkpoint from a finalized checkpoint path
+   *
+   * @param finalizedPath Path of the finalized checkpoint
+   * @return Superstep referring to a checkpoint of the finalized path
+   */
+  private static long getCheckpoint(FileStatus finalizedPath) {
+    if (!finalizedPath.getPath().getName().
+        endsWith(CHECKPOINT_FINALIZED_POSTFIX)) {
+      throw new InvalidParameterException(
+          "getCheckpoint: " + finalizedPath + "Doesn't end in " +
+              CHECKPOINT_FINALIZED_POSTFIX);
+    }
+    String checkpointString =
+        finalizedPath.getPath().getName().
+            replace(CHECKPOINT_FINALIZED_POSTFIX, "");
+    return Long.parseLong(checkpointString);
+  }
+
+
+  /**
+   * Only get the finalized checkpoint files
+   */
+  private static class FinalizedCheckpointPathFilter implements PathFilter {
+    @Override
+    public boolean accept(Path path) {
+      return path.getName().endsWith(CHECKPOINT_FINALIZED_POSTFIX);
+    }
+
+  }
 }

http://git-wip-us.apache.org/repos/asf/giraph/blob/54a1a8de/giraph-core/src/main/java/org/apache/giraph/utils/WritableUtils.java
----------------------------------------------------------------------
diff --git a/giraph-core/src/main/java/org/apache/giraph/utils/WritableUtils.java b/giraph-core/src/main/java/org/apache/giraph/utils/WritableUtils.java
index 8c24697..0081fc0 100644
--- a/giraph-core/src/main/java/org/apache/giraph/utils/WritableUtils.java
+++ b/giraph-core/src/main/java/org/apache/giraph/utils/WritableUtils.java
@@ -686,21 +686,24 @@ public class WritableUtils {
    * @param output the output stream
    * @throws IOException
    */
-  public static void writeList(List<Writable> list, DataOutput output)
+  public static void writeList(List<? extends Writable> list, DataOutput output)
     throws IOException {
-    output.writeInt(list.size());
-    Class<? extends Writable> clazz = null;
-    for (Writable element : list) {
-      output.writeBoolean(element == null);
-      if (element != null) {
-        if (element.getClass() != clazz) {
-          clazz = element.getClass();
-          output.writeBoolean(true);
-          writeClass(clazz, output);
-        } else {
-          output.writeBoolean(false);
+    output.writeBoolean(list != null);
+    if (list != null) {
+      output.writeInt(list.size());
+      Class<? extends Writable> clazz = null;
+      for (Writable element : list) {
+        output.writeBoolean(element == null);
+        if (element != null) {
+          if (element.getClass() != clazz) {
+            clazz = element.getClass();
+            output.writeBoolean(true);
+            writeClass(clazz, output);
+          } else {
+            output.writeBoolean(false);
+          }
+          element.write(output);
         }
-        element.write(output);
       }
     }
   }
@@ -712,24 +715,27 @@ public class WritableUtils {
    * @return deserialized list
    * @throws IOException
    */
-  public static List<Writable> readList(DataInput input) throws IOException {
+  public static List<? extends Writable> readList(DataInput input)
+    throws IOException {
     try {
-
-      int size = input.readInt();
-      List<Writable> res = new ArrayList<>(size);
-      Class<? extends Writable> clazz = null;
-      for (int i = 0; i < size; i++) {
-        boolean isNull = input.readBoolean();
-        if (isNull) {
-          res.add(null);
-        } else {
-          boolean hasClassInfo = input.readBoolean();
-          if (hasClassInfo) {
-            clazz = readClass(input);
+      List<Writable> res = null;
+      if (input.readBoolean()) {
+        int size = input.readInt();
+        res = new ArrayList<>(size);
+        Class<? extends Writable> clazz = null;
+        for (int i = 0; i < size; i++) {
+          boolean isNull = input.readBoolean();
+          if (isNull) {
+            res.add(null);
+          } else {
+            boolean hasClassInfo = input.readBoolean();
+            if (hasClassInfo) {
+              clazz = readClass(input);
+            }
+            Writable element = clazz.newInstance();
+            element.readFields(input);
+            res.add(element);
           }
-          Writable element = clazz.newInstance();
-          element.readFields(input);
-          res.add(element);
         }
       }
       return res;
@@ -775,4 +781,79 @@ public class WritableUtils {
     return copy;
   }
 
+  /**
+   * Writes primitive int array of ints into output stream.
+   * Array can be null or empty.
+   * @param array array to be written
+   * @param dataOutput output stream
+   * @throws IOException
+   */
+  public static void writeIntArray(int[] array, DataOutput dataOutput)
+    throws IOException {
+    if (array != null) {
+      dataOutput.writeInt(array.length);
+      for (int r : array) {
+        dataOutput.writeInt(r);
+      }
+    } else {
+      dataOutput.writeInt(-1);
+    }
+  }
+
+  /**
+   * Reads primitive int array from input stream.
+   * @param dataInput input stream to read from
+   * @return may return null or empty array.
+   * @throws IOException
+   */
+  public static int[] readIntArray(DataInput dataInput)
+    throws IOException {
+    int [] res = null;
+    int size = dataInput.readInt();
+    if (size >= 0) {
+      res = new int[size];
+      for (int i = 0; i < size; i++) {
+        res[i] = dataInput.readInt();
+      }
+    }
+    return res;
+  }
+
+  /**
+   * Writes primitive long array of ints into output stream.
+   * Array can be null or empty.
+   * @param array array to be written
+   * @param dataOutput output stream
+   * @throws IOException
+   */
+  public static void writeLongArray(DataOutput dataOutput, long[] array)
+    throws IOException {
+    if (array != null) {
+      dataOutput.writeInt(array.length);
+      for (long r : array) {
+        dataOutput.writeLong(r);
+      }
+    } else {
+      dataOutput.writeInt(-1);
+    }
+  }
+  /**
+   * Reads primitive long array from input stream.
+   * @param dataInput input stream to read from
+   * @return may return null or empty array.
+   * @throws IOException
+   */
+  public static long[] readLongArray(DataInput dataInput)
+    throws IOException {
+    long [] res = null;
+    int size = dataInput.readInt();
+    if (size >= 0) {
+      res = new long[size];
+      for (int i = 0; i < size; i++) {
+        res[i] = dataInput.readLong();
+      }
+    }
+    return res;
+  }
+
 }

http://git-wip-us.apache.org/repos/asf/giraph/blob/54a1a8de/giraph-core/src/main/java/org/apache/giraph/worker/BspServiceWorker.java
----------------------------------------------------------------------
diff --git a/giraph-core/src/main/java/org/apache/giraph/worker/BspServiceWorker.java b/giraph-core/src/main/java/org/apache/giraph/worker/BspServiceWorker.java
index f61e817..4ad8400 100644
--- a/giraph-core/src/main/java/org/apache/giraph/worker/BspServiceWorker.java
+++ b/giraph-core/src/main/java/org/apache/giraph/worker/BspServiceWorker.java
@@ -86,6 +86,7 @@ import org.apache.giraph.partition.PartitionStats;
 import org.apache.giraph.partition.PartitionStore;
 import org.apache.giraph.partition.WorkerGraphPartitioner;
 import org.apache.giraph.utils.CallableFactory;
+import org.apache.giraph.utils.CheckpointingUtils;
 import org.apache.giraph.utils.JMapHistoDumper;
 import org.apache.giraph.utils.LoggerUtils;
 import org.apache.giraph.utils.MemoryUtils;
@@ -1388,12 +1389,12 @@ public class BspServiceWorker<I extends WritableComparable,
 
     // Algorithm:
     // For each partition, dump vertices and messages
-    Path metadataFilePath =
-        createCheckpointFilePathSafe(CHECKPOINT_METADATA_POSTFIX);
-    Path validFilePath =
-        createCheckpointFilePathSafe(CHECKPOINT_VALID_POSTFIX);
-    Path checkpointFilePath =
-        createCheckpointFilePathSafe(CHECKPOINT_DATA_POSTFIX);
+    Path metadataFilePath = createCheckpointFilePathSafe(
+        CheckpointingUtils.CHECKPOINT_METADATA_POSTFIX);
+    Path validFilePath = createCheckpointFilePathSafe(
+        CheckpointingUtils.CHECKPOINT_VALID_POSTFIX);
+    Path checkpointFilePath = createCheckpointFilePathSafe(
+        CheckpointingUtils.CHECKPOINT_DATA_POSTFIX);
 
 
     // Metadata is buffered and written at the end since it's small and
@@ -1521,7 +1522,7 @@ public class BspServiceWorker<I extends WritableComparable,
               }
               Path path =
                   createCheckpointFilePathSafe("_" + partitionId +
-                      CHECKPOINT_VERTICES_POSTFIX);
+                      CheckpointingUtils.CHECKPOINT_VERTICES_POSTFIX);
 
               FSDataOutputStream uncompressedStream =
                   getFs().create(path);
@@ -1592,7 +1593,7 @@ public class BspServiceWorker<I extends WritableComparable,
               }
               Path path =
                   getSavedCheckpoint(superstep, "_" + partitionId +
-                      CHECKPOINT_VERTICES_POSTFIX);
+                      CheckpointingUtils.CHECKPOINT_VERTICES_POSTFIX);
 
               FSDataInputStream compressedStream =
                   getFs().open(path);
@@ -1626,11 +1627,11 @@ public class BspServiceWorker<I extends WritableComparable,
 
   @Override
   public VertexEdgeCount loadCheckpoint(long superstep) {
-    Path metadataFilePath =
-        getSavedCheckpoint(superstep, CHECKPOINT_METADATA_POSTFIX);
+    Path metadataFilePath = getSavedCheckpoint(
+        superstep, CheckpointingUtils.CHECKPOINT_METADATA_POSTFIX);
 
-    Path checkpointFilePath =
-        getSavedCheckpoint(superstep, CHECKPOINT_DATA_POSTFIX);
+    Path checkpointFilePath = getSavedCheckpoint(
+        superstep, CheckpointingUtils.CHECKPOINT_DATA_POSTFIX);
     // Algorithm:
     // Examine all the partition owners and load the ones
     // that match my hostname and id from the master designated checkpoint
@@ -1659,8 +1660,8 @@ public class BspServiceWorker<I extends WritableComparable,
       // Load global stats and superstep classes
       GlobalStats globalStats = new GlobalStats();
       SuperstepClasses superstepClasses = new SuperstepClasses();
-      String finalizedCheckpointPath =
-          getSavedCheckpointBasePath(superstep) + CHECKPOINT_FINALIZED_POSTFIX;
+      String finalizedCheckpointPath = getSavedCheckpointBasePath(superstep) +
+          CheckpointingUtils.CHECKPOINT_FINALIZED_POSTFIX;
       DataInputStream finalizedStream =
           getFs().open(new Path(finalizedCheckpointPath));
       globalStats.readFields(finalizedStream);
@@ -1674,7 +1675,8 @@ public class BspServiceWorker<I extends WritableComparable,
             checkpointStream, partitionId);
       }
 
-      List<Writable> w2wMessages = WritableUtils.readList(checkpointStream);
+      List<Writable> w2wMessages = (List<Writable>) WritableUtils.readList(
+          checkpointStream);
       getServerData().getCurrentWorkerToWorkerMessages().addAll(w2wMessages);
 
       checkpointStream.close();

http://git-wip-us.apache.org/repos/asf/giraph/blob/54a1a8de/giraph-core/src/test/java/org/apache/giraph/utils/TestWritableUtils.java
----------------------------------------------------------------------
diff --git a/giraph-core/src/test/java/org/apache/giraph/utils/TestWritableUtils.java b/giraph-core/src/test/java/org/apache/giraph/utils/TestWritableUtils.java
index c712b5a..8c97b69 100644
--- a/giraph-core/src/test/java/org/apache/giraph/utils/TestWritableUtils.java
+++ b/giraph-core/src/test/java/org/apache/giraph/utils/TestWritableUtils.java
@@ -61,10 +61,48 @@ public class TestWritableUtils {
     DataInputStream input =
         new DataInputStream(new ByteArrayInputStream(data));
 
-    List<Writable> result = WritableUtils.readList(input);
+    List<Writable> result = (List<Writable>) WritableUtils.readList(input);
 
     Assert.assertEquals(list, result);
 
   }
 
+  @Test
+  public void testIntArray() throws IOException {
+    int[] array = new int[] {1, 2, 3, 4, 5};
+    ByteArrayOutputStream bos = new ByteArrayOutputStream();
+    DataOutputStream dos = new DataOutputStream(bos);
+    WritableUtils.writeIntArray(array, dos);
+    dos.close();
+
+    byte[] data = bos.toByteArray();
+
+    DataInputStream input =
+        new DataInputStream(new ByteArrayInputStream(data));
+
+    int[] result = WritableUtils.readIntArray(input);
+
+    Assert.assertArrayEquals(array, result);
+  }
+
+  @Test
+  public void testLongArray() throws IOException {
+    long[] array = new long[] {1, 2, 3, 4, 5};
+    ByteArrayOutputStream bos = new ByteArrayOutputStream();
+    DataOutputStream dos = new DataOutputStream(bos);
+    WritableUtils.writeLongArray(dos, array);
+    dos.close();
+
+    byte[] data = bos.toByteArray();
+
+    DataInputStream input =
+        new DataInputStream(new ByteArrayInputStream(data));
+
+    long[] result = WritableUtils.readLongArray(input);
+
+    Assert.assertArrayEquals(array, result);
+  }
+
+
+
 }