You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@giraph.apache.org by ac...@apache.org on 2012/02/16 23:12:36 UTC

svn commit: r1245205 [8/18] - in /incubator/giraph/trunk: ./ src/main/java/org/apache/giraph/ src/main/java/org/apache/giraph/benchmark/ src/main/java/org/apache/giraph/bsp/ src/main/java/org/apache/giraph/comm/ src/main/java/org/apache/giraph/examples...

Modified: incubator/giraph/trunk/src/main/java/org/apache/giraph/graph/BspServiceMaster.java
URL: http://svn.apache.org/viewvc/incubator/giraph/trunk/src/main/java/org/apache/giraph/graph/BspServiceMaster.java?rev=1245205&r1=1245204&r2=1245205&view=diff
==============================================================================
--- incubator/giraph/trunk/src/main/java/org/apache/giraph/graph/BspServiceMaster.java (original)
+++ incubator/giraph/trunk/src/main/java/org/apache/giraph/graph/BspServiceMaster.java Thu Feb 16 22:12:31 2012
@@ -21,7 +21,6 @@ package org.apache.giraph.graph;
 import net.iharder.Base64;
 import org.apache.giraph.bsp.ApplicationState;
 import org.apache.giraph.bsp.BspInputFormat;
-import org.apache.giraph.bsp.CentralizedService;
 import org.apache.giraph.bsp.CentralizedServiceMaster;
 import org.apache.giraph.bsp.SuperstepState;
 import org.apache.giraph.graph.GraphMapper.MapFunctions;
@@ -75,1663 +74,1678 @@ import org.apache.giraph.utils.WritableU
 
 /**
  * ZooKeeper-based implementation of {@link CentralizedService}.
+ *
+ * @param <I> Vertex id
+ * @param <V> Vertex data
+ * @param <E> Edge data
+ * @param <M> Message data
  */
 @SuppressWarnings("rawtypes")
-public class BspServiceMaster<
-        I extends WritableComparable,
-        V extends Writable,
-        E extends Writable, M extends Writable>
-        extends BspService<I, V, E, M>
-        implements CentralizedServiceMaster<I, V, E, M> {
-    /** Class logger */
-    private static final Logger LOG = Logger.getLogger(BspServiceMaster.class);
-    /** Superstep counter */
-    private Counter superstepCounter = null;
-    /** Vertex counter */
-    private Counter vertexCounter = null;
-    /** Finished vertex counter */
-    private Counter finishedVertexCounter = null;
-    /** Edge counter */
-    private Counter edgeCounter = null;
-    /** Sent messages counter */
-    private Counter sentMessagesCounter = null;
-    /** Workers on this superstep */
-    private Counter currentWorkersCounter = null;
-    /** Current master task partition */
-    private Counter currentMasterTaskPartitionCounter = null;
-    /** Last checkpointed superstep */
-    private Counter lastCheckpointedSuperstepCounter = null;
-    /** Am I the master? */
-    private boolean isMaster = false;
-    /** Max number of workers */
-    private final int maxWorkers;
-    /** Min number of workers */
-    private final int minWorkers;
-    /** Min % responded workers */
-    private final float minPercentResponded;
-    /** Poll period in msecs */
-    private final int msecsPollPeriod;
-    /** Max number of poll attempts */
-    private final int maxPollAttempts;
-    /** Min number of long tails before printing */
-    private final int partitionLongTailMinPrint;
-    /** Last finalized checkpoint */
-    private long lastCheckpointedSuperstep = -1;
-    /** State of the superstep changed */
-    private final BspEvent superstepStateChanged =
-        new PredicateLock();
-    /** Master graph partitioner */
-    private final MasterGraphPartitioner<I, V, E, M> masterGraphPartitioner;
-    /** All the partition stats from the last superstep */
-    private final List<PartitionStats> allPartitionStatsList =
-        new ArrayList<PartitionStats>();
-    /** Counter group name for the Giraph statistics */
-    public String GIRAPH_STATS_COUNTER_GROUP_NAME = "Giraph Stats";
-    /** Aggregator writer */
-    public AggregatorWriter aggregatorWriter;
-
-    public BspServiceMaster(
-            String serverPortList,
-            int sessionMsecTimeout,
-            Mapper<?, ?, ?, ?>.Context context,
-            GraphMapper<I, V, E, M> graphMapper) {
-        super(serverPortList, sessionMsecTimeout, context, graphMapper);
-        registerBspEvent(superstepStateChanged);
-
-        maxWorkers =
-            getConfiguration().getInt(GiraphJob.MAX_WORKERS, -1);
-        minWorkers =
-            getConfiguration().getInt(GiraphJob.MIN_WORKERS, -1);
-        minPercentResponded =
-            getConfiguration().getFloat(GiraphJob.MIN_PERCENT_RESPONDED,
-                                        100.0f);
-        msecsPollPeriod =
-            getConfiguration().getInt(GiraphJob.POLL_MSECS,
-                                      GiraphJob.POLL_MSECS_DEFAULT);
-        maxPollAttempts =
-            getConfiguration().getInt(GiraphJob.POLL_ATTEMPTS,
-                                      GiraphJob.POLL_ATTEMPTS_DEFAULT);
-        partitionLongTailMinPrint = getConfiguration().getInt(
-            GiraphJob.PARTITION_LONG_TAIL_MIN_PRINT,
-            GiraphJob.PARTITION_LONG_TAIL_MIN_PRINT_DEFAULT);
-        masterGraphPartitioner =
-            getGraphPartitionerFactory().createMasterGraphPartitioner();
-    }
-
-    @Override
-    public void setJobState(ApplicationState state,
-                            long applicationAttempt,
-                            long desiredSuperstep) {
-        JSONObject jobState = new JSONObject();
-        try {
-            jobState.put(JSONOBJ_STATE_KEY, state.toString());
-            jobState.put(JSONOBJ_APPLICATION_ATTEMPT_KEY, applicationAttempt);
-            jobState.put(JSONOBJ_SUPERSTEP_KEY, desiredSuperstep);
-        } catch (JSONException e) {
-            throw new RuntimeException("setJobState: Coudn't put " +
-                                       state.toString());
-        }
+public class BspServiceMaster<I extends WritableComparable,
+    V extends Writable, E extends Writable, M extends Writable>
+    extends BspService<I, V, E, M>
+    implements CentralizedServiceMaster<I, V, E, M> {
+  /** Counter group name for the Giraph statistics */
+  public static final String GIRAPH_STATS_COUNTER_GROUP_NAME = "Giraph Stats";
+  /** Class logger */
+  private static final Logger LOG = Logger.getLogger(BspServiceMaster.class);
+  /** Superstep counter */
+  private Counter superstepCounter = null;
+  /** Vertex counter */
+  private Counter vertexCounter = null;
+  /** Finished vertex counter */
+  private Counter finishedVertexCounter = null;
+  /** Edge counter */
+  private Counter edgeCounter = null;
+  /** Sent messages counter */
+  private Counter sentMessagesCounter = null;
+  /** Workers on this superstep */
+  private Counter currentWorkersCounter = null;
+  /** Current master task partition */
+  private Counter currentMasterTaskPartitionCounter = null;
+  /** Last checkpointed superstep */
+  private Counter lastCheckpointedSuperstepCounter = null;
+  /** Am I the master? */
+  private boolean isMaster = false;
+  /** Max number of workers */
+  private final int maxWorkers;
+  /** Min number of workers */
+  private final int minWorkers;
+  /** Min % responded workers */
+  private final float minPercentResponded;
+  /** Poll period in msecs */
+  private final int msecsPollPeriod;
+  /** Max number of poll attempts */
+  private final int maxPollAttempts;
+  /** Min number of long tails before printing */
+  private final int partitionLongTailMinPrint;
+  /** Last finalized checkpoint */
+  private long lastCheckpointedSuperstep = -1;
+  /** State of the superstep changed */
+  private final BspEvent superstepStateChanged =
+      new PredicateLock();
+  /** Master graph partitioner */
+  private final MasterGraphPartitioner<I, V, E, M> masterGraphPartitioner;
+  /** All the partition stats from the last superstep */
+  private final List<PartitionStats> allPartitionStatsList =
+      new ArrayList<PartitionStats>();
+  /** Aggregator writer */
+  private AggregatorWriter aggregatorWriter;
+
+  /**
+   * Constructor for setting up the master.
+   *
+   * @param serverPortList ZooKeeper server port list
+   * @param sessionMsecTimeout Msecs to timeout connecting to ZooKeeper
+   * @param context Mapper context
+   * @param graphMapper Graph mapper
+   */
+  public BspServiceMaster(
+      String serverPortList,
+      int sessionMsecTimeout,
+      Mapper<?, ?, ?, ?>.Context context,
+      GraphMapper<I, V, E, M> graphMapper) {
+    super(serverPortList, sessionMsecTimeout, context, graphMapper);
+    registerBspEvent(superstepStateChanged);
+
+    maxWorkers =
+        getConfiguration().getInt(GiraphJob.MAX_WORKERS, -1);
+    minWorkers =
+        getConfiguration().getInt(GiraphJob.MIN_WORKERS, -1);
+    minPercentResponded =
+        getConfiguration().getFloat(GiraphJob.MIN_PERCENT_RESPONDED,
+            100.0f);
+    msecsPollPeriod =
+        getConfiguration().getInt(GiraphJob.POLL_MSECS,
+            GiraphJob.POLL_MSECS_DEFAULT);
+    maxPollAttempts =
+        getConfiguration().getInt(GiraphJob.POLL_ATTEMPTS,
+            GiraphJob.POLL_ATTEMPTS_DEFAULT);
+    partitionLongTailMinPrint = getConfiguration().getInt(
+        GiraphJob.PARTITION_LONG_TAIL_MIN_PRINT,
+        GiraphJob.PARTITION_LONG_TAIL_MIN_PRINT_DEFAULT);
+    masterGraphPartitioner =
+        getGraphPartitionerFactory().createMasterGraphPartitioner();
+  }
+
+  @Override
+  public void setJobState(ApplicationState state,
+      long applicationAttempt,
+      long desiredSuperstep) {
+    JSONObject jobState = new JSONObject();
+    try {
+      jobState.put(JSONOBJ_STATE_KEY, state.toString());
+      jobState.put(JSONOBJ_APPLICATION_ATTEMPT_KEY, applicationAttempt);
+      jobState.put(JSONOBJ_SUPERSTEP_KEY, desiredSuperstep);
+    } catch (JSONException e) {
+      throw new RuntimeException("setJobState: Coudn't put " +
+          state.toString());
+    }
+    if (LOG.isInfoEnabled()) {
+      LOG.info("setJobState: " + jobState.toString() + " on superstep " +
+          getSuperstep());
+    }
+    try {
+      getZkExt().createExt(masterJobStatePath + "/jobState",
+          jobState.toString().getBytes(),
+          Ids.OPEN_ACL_UNSAFE,
+          CreateMode.PERSISTENT_SEQUENTIAL,
+          true);
+    } catch (KeeperException.NodeExistsException e) {
+      throw new IllegalStateException(
+          "setJobState: Imposible that " +
+              masterJobStatePath + " already exists!", e);
+    } catch (KeeperException e) {
+      throw new IllegalStateException(
+          "setJobState: Unknown KeeperException for " +
+              masterJobStatePath, e);
+    } catch (InterruptedException e) {
+      throw new IllegalStateException(
+          "setJobState: Unknown InterruptedException for " +
+              masterJobStatePath, e);
+    }
+
+    if (state == ApplicationState.FAILED) {
+      failJob();
+    }
+  }
+
+  /**
+   * Master uses this to calculate the {@link VertexInputFormat}
+   * input splits and write it to ZooKeeper.
+   *
+   * @param numWorkers Number of available workers
+   * @return List of input splits
+   * @throws InstantiationException
+   * @throws IllegalAccessException
+   * @throws IOException
+   * @throws InterruptedException
+   */
+  private List<InputSplit> generateInputSplits(int numWorkers) {
+    VertexInputFormat<I, V, E, M> vertexInputFormat =
+        BspUtils.<I, V, E, M>createVertexInputFormat(getConfiguration());
+    List<InputSplit> splits;
+    try {
+      splits = vertexInputFormat.getSplits(getContext(), numWorkers);
+      float samplePercent =
+          getConfiguration().getFloat(
+              GiraphJob.INPUT_SPLIT_SAMPLE_PERCENT,
+              GiraphJob.INPUT_SPLIT_SAMPLE_PERCENT_DEFAULT);
+      if (samplePercent != GiraphJob.INPUT_SPLIT_SAMPLE_PERCENT_DEFAULT) {
+        int lastIndex = (int) (samplePercent * splits.size() / 100f);
+        List<InputSplit> sampleSplits = splits.subList(0, lastIndex);
+        LOG.warn("generateInputSplits: Using sampling - Processing " +
+            "only " + sampleSplits.size() + " instead of " +
+            splits.size() + " expected splits.");
+        return sampleSplits;
+      } else {
         if (LOG.isInfoEnabled()) {
-            LOG.info("setJobState: " + jobState.toString() + " on superstep " +
-                     getSuperstep());
+          LOG.info("generateInputSplits: Got " + splits.size() +
+              " input splits for " + numWorkers + " workers");
         }
-        try {
-            getZkExt().createExt(MASTER_JOB_STATE_PATH + "/jobState",
-                                 jobState.toString().getBytes(),
-                                 Ids.OPEN_ACL_UNSAFE,
-                                 CreateMode.PERSISTENT_SEQUENTIAL,
-                                 true);
-        } catch (KeeperException.NodeExistsException e) {
-            throw new IllegalStateException(
-                "setJobState: Imposible that " +
-                MASTER_JOB_STATE_PATH + " already exists!", e);
-        } catch (KeeperException e) {
-            throw new IllegalStateException(
-                "setJobState: Unknown KeeperException for " +
-                MASTER_JOB_STATE_PATH, e);
-        } catch (InterruptedException e) {
-            throw new IllegalStateException(
-                "setJobState: Unknown InterruptedException for " +
-                MASTER_JOB_STATE_PATH, e);
-        }
-
-        if (state == ApplicationState.FAILED) {
-            failJob();
-        }
-    }
-
-    /**
-     * Master uses this to calculate the {@link VertexInputFormat}
-     * input splits and write it to ZooKeeper.
-     *
-     * @param numWorkers Number of available workers
-     * @throws InstantiationException
-     * @throws IllegalAccessException
-     * @throws IOException
-     * @throws InterruptedException
-     */
-    private List<InputSplit> generateInputSplits(int numWorkers) {
-        VertexInputFormat<I, V, E, M> vertexInputFormat =
-            BspUtils.<I, V, E, M>createVertexInputFormat(getConfiguration());
-        List<InputSplit> splits;
-        try {
-            splits = vertexInputFormat.getSplits(getContext(), numWorkers);
-            float samplePercent =
-                getConfiguration().getFloat(
-                    GiraphJob.INPUT_SPLIT_SAMPLE_PERCENT,
-                    GiraphJob.INPUT_SPLIT_SAMPLE_PERCENT_DEFAULT);
-            if (samplePercent != GiraphJob.INPUT_SPLIT_SAMPLE_PERCENT_DEFAULT) {
-                int lastIndex = (int) (samplePercent * splits.size() / 100f);
-                List<InputSplit> sampleSplits = splits.subList(0, lastIndex);
-                LOG.warn("generateInputSplits: Using sampling - Processing " +
-                         "only " + sampleSplits.size() + " instead of " +
-                        splits.size() + " expected splits.");
-                return sampleSplits;
+        return splits;
+      }
+    } catch (IOException e) {
+      throw new IllegalStateException(
+          "generateInputSplits: Got IOException", e);
+    } catch (InterruptedException e) {
+      throw new IllegalStateException(
+          "generateInputSplits: Got InterruptedException", e);
+    }
+  }
+
+  /**
+   * When there is no salvaging this job, fail it.
+   *
+   * @throws IOException
+   */
+  private void failJob() {
+    LOG.fatal("failJob: Killing job " + getJobId());
+    try {
+      @SuppressWarnings("deprecation")
+      org.apache.hadoop.mapred.JobClient jobClient =
+          new org.apache.hadoop.mapred.JobClient(
+              (org.apache.hadoop.mapred.JobConf)
+              getConfiguration());
+      @SuppressWarnings("deprecation")
+      org.apache.hadoop.mapred.JobID jobId =
+          org.apache.hadoop.mapred.JobID.forName(getJobId());
+      RunningJob job = jobClient.getJob(jobId);
+      job.killJob();
+    } catch (IOException e) {
+      throw new RuntimeException(e);
+    }
+  }
+
+  /**
+   * Parse the {@link WorkerInfo} objects from a ZooKeeper path
+   * (and children).
+   *
+   * @param workerInfosPath Path where all the workers are children
+   * @param watch Watch or not?
+   * @return List of workers in that path
+   */
+  private List<WorkerInfo> getWorkerInfosFromPath(String workerInfosPath,
+      boolean watch) {
+    List<WorkerInfo> workerInfoList = new ArrayList<WorkerInfo>();
+    List<String> workerInfoPathList;
+    try {
+      workerInfoPathList =
+          getZkExt().getChildrenExt(workerInfosPath, watch, false, true);
+    } catch (KeeperException e) {
+      throw new IllegalStateException(
+          "getWorkers: Got KeeperException", e);
+    } catch (InterruptedException e) {
+      throw new IllegalStateException(
+          "getWorkers: Got InterruptedStateException", e);
+    }
+    for (String workerInfoPath : workerInfoPathList) {
+      WorkerInfo workerInfo = new WorkerInfo();
+      WritableUtils.readFieldsFromZnode(
+          getZkExt(), workerInfoPath, true, null, workerInfo);
+      workerInfoList.add(workerInfo);
+    }
+    return workerInfoList;
+  }
+
+  /**
+   * Get the healthy and unhealthy {@link WorkerInfo} objects for
+   * a superstep
+   *
+   * @param superstep superstep to check
+   * @param healthyWorkerInfoList filled in with current data
+   * @param unhealthyWorkerInfoList filled in with current data
+   */
+  private void getAllWorkerInfos(
+      long superstep,
+      List<WorkerInfo> healthyWorkerInfoList,
+      List<WorkerInfo> unhealthyWorkerInfoList) {
+    String healthyWorkerInfoPath =
+        getWorkerInfoHealthyPath(getApplicationAttempt(), superstep);
+    String unhealthyWorkerInfoPath =
+        getWorkerInfoUnhealthyPath(getApplicationAttempt(), superstep);
+
+    try {
+      getZkExt().createOnceExt(healthyWorkerInfoPath,
+          null,
+          Ids.OPEN_ACL_UNSAFE,
+          CreateMode.PERSISTENT,
+          true);
+    } catch (KeeperException e) {
+      throw new IllegalStateException("getWorkers: KeeperException", e);
+    } catch (InterruptedException e) {
+      throw new IllegalStateException("getWorkers: IllegalStateException"
+          , e);
+    }
+
+    try {
+      getZkExt().createOnceExt(unhealthyWorkerInfoPath,
+          null,
+          Ids.OPEN_ACL_UNSAFE,
+          CreateMode.PERSISTENT,
+          true);
+    } catch (KeeperException e) {
+      throw new IllegalStateException("getWorkers: KeeperException", e);
+    } catch (InterruptedException e) {
+      throw new IllegalStateException("getWorkers: IllegalStateException"
+          , e);
+    }
+
+    List<WorkerInfo> currentHealthyWorkerInfoList =
+        getWorkerInfosFromPath(healthyWorkerInfoPath, true);
+    List<WorkerInfo> currentUnhealthyWorkerInfoList =
+        getWorkerInfosFromPath(unhealthyWorkerInfoPath, false);
+
+    healthyWorkerInfoList.clear();
+    if (currentHealthyWorkerInfoList != null) {
+      for (WorkerInfo healthyWorkerInfo :
+        currentHealthyWorkerInfoList) {
+        healthyWorkerInfoList.add(healthyWorkerInfo);
+      }
+    }
+
+    unhealthyWorkerInfoList.clear();
+    if (currentUnhealthyWorkerInfoList != null) {
+      for (WorkerInfo unhealthyWorkerInfo :
+        currentUnhealthyWorkerInfoList) {
+        unhealthyWorkerInfoList.add(unhealthyWorkerInfo);
+      }
+    }
+  }
+
+  /**
+   * Check all the {@link WorkerInfo} objects to ensure that a minimum
+   * number of good workers exists out of the total that have reported.
+   *
+   * @return List of of healthy workers such that the minimum has been
+   *         met, otherwise null
+   */
+  private List<WorkerInfo> checkWorkers() {
+    boolean failJob = true;
+    int pollAttempt = 0;
+    List<WorkerInfo> healthyWorkerInfoList = new ArrayList<WorkerInfo>();
+    List<WorkerInfo> unhealthyWorkerInfoList = new ArrayList<WorkerInfo>();
+    int totalResponses = -1;
+    while (pollAttempt < maxPollAttempts) {
+      getAllWorkerInfos(
+          getSuperstep(), healthyWorkerInfoList, unhealthyWorkerInfoList);
+      totalResponses = healthyWorkerInfoList.size() +
+          unhealthyWorkerInfoList.size();
+      if ((totalResponses * 100.0f / maxWorkers) >=
+          minPercentResponded) {
+        failJob = false;
+        break;
+      }
+      getContext().setStatus(getGraphMapper().getMapFunctions() + " " +
+          "checkWorkers: Only found " +
+          totalResponses +
+          " responses of " + maxWorkers +
+          " needed to start superstep " +
+          getSuperstep());
+      if (getWorkerHealthRegistrationChangedEvent().waitMsecs(
+          msecsPollPeriod)) {
+        if (LOG.isDebugEnabled()) {
+          LOG.debug("checkWorkers: Got event that health " +
+              "registration changed, not using poll attempt");
+        }
+        getWorkerHealthRegistrationChangedEvent().reset();
+        continue;
+      }
+      if (LOG.isInfoEnabled()) {
+        LOG.info("checkWorkers: Only found " + totalResponses +
+            " responses of " + maxWorkers +
+            " needed to start superstep " +
+            getSuperstep() + ".  Sleeping for " +
+            msecsPollPeriod + " msecs and used " + pollAttempt +
+            " of " + maxPollAttempts + " attempts.");
+        // Find the missing workers if there are only a few
+        if ((maxWorkers - totalResponses) <=
+            partitionLongTailMinPrint) {
+          Set<Integer> partitionSet = new TreeSet<Integer>();
+          for (WorkerInfo workerInfo : healthyWorkerInfoList) {
+            partitionSet.add(workerInfo.getPartitionId());
+          }
+          for (WorkerInfo workerInfo : unhealthyWorkerInfoList) {
+            partitionSet.add(workerInfo.getPartitionId());
+          }
+          for (int i = 1; i <= maxWorkers; ++i) {
+            if (partitionSet.contains(new Integer(i))) {
+              continue;
+            } else if (i == getTaskPartition()) {
+              continue;
             } else {
-                if (LOG.isInfoEnabled()) {
-                    LOG.info("generateInputSplits: Got " + splits.size() +
-                            " input splits for " + numWorkers + " workers");
-                }
-                return splits;
-            }
-        } catch (IOException e) {
-            throw new IllegalStateException(
-                "generateInputSplits: Got IOException", e);
-        } catch (InterruptedException e) {
-            throw new IllegalStateException(
-                "generateInputSplits: Got InterruptedException", e);
-        }
-    }
-
-    /**
-     * When there is no salvaging this job, fail it.
-     *
-     * @throws IOException
-     */
-    private void failJob() {
-        LOG.fatal("failJob: Killing job " + getJobId());
-        try {
-            @SuppressWarnings("deprecation")
-            org.apache.hadoop.mapred.JobClient jobClient =
-                new org.apache.hadoop.mapred.JobClient(
-                    (org.apache.hadoop.mapred.JobConf)
-                    getConfiguration());
-            @SuppressWarnings("deprecation")
-            org.apache.hadoop.mapred.JobID jobId =
-                org.apache.hadoop.mapred.JobID.forName(getJobId());
-            RunningJob job = jobClient.getJob(jobId);
-            job.killJob();
-        } catch (IOException e) {
-            throw new RuntimeException(e);
-        }
-    }
-
-    /**
-     * Parse the {@link WorkerInfo} objects from a ZooKeeper path
-     * (and children).
-     *
-     * @param workerInfosPath Path where all the workers are children
-     * @param watch Watch or not?
-     * @return List of workers in that path
-     */
-    private List<WorkerInfo> getWorkerInfosFromPath(String workerInfosPath,
-                                                    boolean watch) {
-        List<WorkerInfo> workerInfoList = new ArrayList<WorkerInfo>();
-        List<String> workerInfoPathList;
-        try {
-            workerInfoPathList =
-                getZkExt().getChildrenExt(workerInfosPath, watch, false, true);
-        } catch (KeeperException e) {
-            throw new IllegalStateException(
-                "getWorkers: Got KeeperException", e);
-        } catch (InterruptedException e) {
-            throw new IllegalStateException(
-                "getWorkers: Got InterruptedStateException", e);
-        }
-        for (String workerInfoPath : workerInfoPathList) {
-            WorkerInfo workerInfo = new WorkerInfo();
-            WritableUtils.readFieldsFromZnode(
-                getZkExt(), workerInfoPath, true, null, workerInfo);
-            workerInfoList.add(workerInfo);
-        }
-        return workerInfoList;
-    }
-
-    /**
-     * Get the healthy and unhealthy {@link WorkerInfo} objects for
-     * a superstep
-     *
-     * @param superstep superstep to check
-     * @param healthyWorkerInfoList filled in with current data
-     * @param unhealthyWorkerInfoList filled in with current data
-     */
-    private void getAllWorkerInfos(
-            long superstep,
-            List<WorkerInfo> healthyWorkerInfoList,
-            List<WorkerInfo> unhealthyWorkerInfoList) {
-        String healthyWorkerInfoPath =
-            getWorkerInfoHealthyPath(getApplicationAttempt(), superstep);
-        String unhealthyWorkerInfoPath =
-            getWorkerInfoUnhealthyPath(getApplicationAttempt(), superstep);
-
-        try {
-            getZkExt().createOnceExt(healthyWorkerInfoPath,
-                                     null,
-                                     Ids.OPEN_ACL_UNSAFE,
-                                     CreateMode.PERSISTENT,
-                                     true);
-        } catch (KeeperException e) {
-            throw new IllegalStateException("getWorkers: KeeperException", e);
-        } catch (InterruptedException e) {
-            throw new IllegalStateException("getWorkers: IllegalStateException"
-                                            , e);
-        }
-
-        try {
-            getZkExt().createOnceExt(unhealthyWorkerInfoPath,
-                                     null,
-                                     Ids.OPEN_ACL_UNSAFE,
-                                     CreateMode.PERSISTENT,
-                                     true);
-        } catch (KeeperException e) {
-            throw new IllegalStateException("getWorkers: KeeperException", e);
-        } catch (InterruptedException e) {
-            throw new IllegalStateException("getWorkers: IllegalStateException"
-                                            , e);
-        }
-
-        List<WorkerInfo> currentHealthyWorkerInfoList =
-            getWorkerInfosFromPath(healthyWorkerInfoPath, true);
-        List<WorkerInfo> currentUnhealthyWorkerInfoList =
-            getWorkerInfosFromPath(unhealthyWorkerInfoPath, false);
-
-        healthyWorkerInfoList.clear();
-        if (currentHealthyWorkerInfoList != null) {
-            for (WorkerInfo healthyWorkerInfo :
-                    currentHealthyWorkerInfoList) {
-                healthyWorkerInfoList.add(healthyWorkerInfo);
-            }
-        }
-
-        unhealthyWorkerInfoList.clear();
-        if (currentUnhealthyWorkerInfoList != null) {
-            for (WorkerInfo unhealthyWorkerInfo :
-                    currentUnhealthyWorkerInfoList) {
-                unhealthyWorkerInfoList.add(unhealthyWorkerInfo);
-            }
-        }
-    }
-
-    /**
-     * Check all the {@link WorkerInfo} objects to ensure that a minimum
-     * number of good workers exists out of the total that have reported.
-     *
-     * @return List of of healthy workers such that the minimum has been
-     *         met, otherwise null
-     */
-    private List<WorkerInfo> checkWorkers() {
-        boolean failJob = true;
-        int pollAttempt = 0;
-        List<WorkerInfo> healthyWorkerInfoList = new ArrayList<WorkerInfo>();
-        List<WorkerInfo> unhealthyWorkerInfoList = new ArrayList<WorkerInfo>();
-        int totalResponses = -1;
-        while (pollAttempt < maxPollAttempts) {
-            getAllWorkerInfos(
-                getSuperstep(), healthyWorkerInfoList, unhealthyWorkerInfoList);
-            totalResponses = healthyWorkerInfoList.size() +
-                unhealthyWorkerInfoList.size();
-            if ((totalResponses * 100.0f / maxWorkers) >=
-                    minPercentResponded) {
-                failJob = false;
-                break;
-            }
-            getContext().setStatus(getGraphMapper().getMapFunctions() + " " +
-                                   "checkWorkers: Only found " +
-                                   totalResponses +
-                                   " responses of " + maxWorkers +
-                                   " needed to start superstep " +
-                                   getSuperstep());
-            if (getWorkerHealthRegistrationChangedEvent().waitMsecs(
-                    msecsPollPeriod)) {
-                if (LOG.isDebugEnabled()) {
-                    LOG.debug("checkWorkers: Got event that health " +
-                              "registration changed, not using poll attempt");
-                }
-                getWorkerHealthRegistrationChangedEvent().reset();
-                continue;
-            }
-            if (LOG.isInfoEnabled()) {
-                LOG.info("checkWorkers: Only found " + totalResponses +
-                         " responses of " + maxWorkers +
-                         " needed to start superstep " +
-                         getSuperstep() + ".  Sleeping for " +
-                         msecsPollPeriod + " msecs and used " + pollAttempt +
-                         " of " + maxPollAttempts + " attempts.");
-                // Find the missing workers if there are only a few
-                if ((maxWorkers - totalResponses) <=
-                        partitionLongTailMinPrint) {
-                    Set<Integer> partitionSet = new TreeSet<Integer>();
-                    for (WorkerInfo workerInfo : healthyWorkerInfoList) {
-                        partitionSet.add(workerInfo.getPartitionId());
-                    }
-                    for (WorkerInfo workerInfo : unhealthyWorkerInfoList) {
-                        partitionSet.add(workerInfo.getPartitionId());
-                    }
-                    for (int i = 1; i <= maxWorkers; ++i) {
-                        if (partitionSet.contains(new Integer(i))) {
-                            continue;
-                        } else if (i == getTaskPartition()) {
-                            continue;
-                        } else {
-                            LOG.info("checkWorkers: No response from "+
-                                     "partition " + i + " (could be master)");
-                        }
-                    }
-                }
-            }
-            ++pollAttempt;
-        }
-        if (failJob) {
-            LOG.error("checkWorkers: Did not receive enough processes in " +
-                      "time (only " + totalResponses + " of " +
-                      minWorkers + " required).  This occurs if you do not " +
-                      "have enough map tasks available simultaneously on " +
-                      "your Hadoop instance to fulfill the number of " +
-                      "requested workers.");
-            return null;
-        }
-
-        if (healthyWorkerInfoList.size() < minWorkers) {
-            LOG.error("checkWorkers: Only " + healthyWorkerInfoList.size() +
-                      " available when " + minWorkers + " are required.");
-            return null;
-        }
-
-        getContext().setStatus(getGraphMapper().getMapFunctions() + " " +
-            "checkWorkers: Done - Found " + totalResponses +
-            " responses of " + maxWorkers + " needed to start superstep " +
-            getSuperstep());
-
-        return healthyWorkerInfoList;
-    }
-
-    @Override
-    public int createInputSplits() {
-        // Only the 'master' should be doing this.  Wait until the number of
-        // processes that have reported health exceeds the minimum percentage.
-        // If the minimum percentage is not met, fail the job.  Otherwise
-        // generate the input splits
-        try {
-            if (getZkExt().exists(INPUT_SPLIT_PATH, false) != null) {
-                LOG.info(INPUT_SPLIT_PATH +
-                         " already exists, no need to create");
-                return Integer.parseInt(
-                    new String(
-                        getZkExt().getData(INPUT_SPLIT_PATH, false, null)));
-            }
-        } catch (KeeperException.NoNodeException e) {
-            if (LOG.isInfoEnabled()) {
-                LOG.info("createInputSplits: Need to create the " +
-                         "input splits at " + INPUT_SPLIT_PATH);
-            }
-        } catch (KeeperException e) {
-            throw new IllegalStateException(
-                "createInputSplits: KeeperException", e);
-        } catch (InterruptedException e) {
-            throw new IllegalStateException(
-                "createInputSplits: IllegalStateException", e);
-        }
-
-        // When creating znodes, in case the master has already run, resume
-        // where it left off.
-        List<WorkerInfo> healthyWorkerInfoList = checkWorkers();
-        if (healthyWorkerInfoList == null) {
-            setJobState(ApplicationState.FAILED, -1, -1);
-            return -1;
-        }
-
-        // Note that the input splits may only be a sample if
-        // INPUT_SPLIT_SAMPLE_PERCENT is set to something other than 100
-        List<InputSplit> splitList =
-            generateInputSplits(healthyWorkerInfoList.size());
-        if (healthyWorkerInfoList.size() > splitList.size()) {
-            LOG.warn("createInputSplits: Number of inputSplits="
-                     + splitList.size() + " < " +
-                     healthyWorkerInfoList.size() +
-                     "=number of healthy processes, " +
-                     "some workers will be not used");
-        }
-        String inputSplitPath = null;
-        for (int i = 0; i< splitList.size(); ++i) {
-            try {
-                ByteArrayOutputStream byteArrayOutputStream =
-                    new ByteArrayOutputStream();
-                DataOutput outputStream =
-                    new DataOutputStream(byteArrayOutputStream);
-                InputSplit inputSplit = splitList.get(i);
-                Text.writeString(outputStream,
-                                 inputSplit.getClass().getName());
-                ((Writable) inputSplit).write(outputStream);
-                inputSplitPath = INPUT_SPLIT_PATH + "/" + i;
-                getZkExt().createExt(inputSplitPath,
-                                     byteArrayOutputStream.toByteArray(),
-                                     Ids.OPEN_ACL_UNSAFE,
-                                     CreateMode.PERSISTENT,
-                                     true);
-                if (LOG.isDebugEnabled()) {
-                    LOG.debug("createInputSplits: Created input split " +
-                              "with index " + i + " serialized as " +
-                              byteArrayOutputStream.toString());
-                }
-            } catch (KeeperException.NodeExistsException e) {
-                if (LOG.isInfoEnabled()) {
-                    LOG.info("createInputSplits: Node " +
-                             inputSplitPath + " already exists.");
-                }
-            } catch (KeeperException e) {
-                throw new IllegalStateException(
-                    "createInputSplits: KeeperException", e);
-            } catch (InterruptedException e) {
-                throw new IllegalStateException(
-                    "createInputSplits: IllegalStateException", e);
-            } catch (IOException e) {
-                throw new IllegalStateException(
-                    "createInputSplits: IOException", e);
-            }
-        }
-
-        // Let workers know they can start trying to load the input splits
-        try {
-            getZkExt().create(INPUT_SPLITS_ALL_READY_PATH,
-                        null,
-                        Ids.OPEN_ACL_UNSAFE,
-                        CreateMode.PERSISTENT);
-        } catch (KeeperException.NodeExistsException e) {
-            LOG.info("createInputSplits: Node " +
-                     INPUT_SPLITS_ALL_READY_PATH + " already exists.");
-        } catch (KeeperException e) {
-            throw new IllegalStateException(
-                "createInputSplits: KeeperException", e);
-        } catch (InterruptedException e) {
-            throw new IllegalStateException(
-                "createInputSplits: IllegalStateException", e);
-        }
-
-        return splitList.size();
-    }
-
-    /**
-     * Read the finalized checkpoint file and associated metadata files for the
-     * checkpoint.  Modifies the {@link PartitionOwner} objects to get the
-     * checkpoint prefixes.  It is an optimization to prevent all workers from
-     * searching all the files.  Also read in the aggregator data from the
-     * finalized checkpoint file and setting it.
-     *
-     * @param superstep Checkpoint set to examine.
-     * @param partitionOwners Partition owners to modify with checkpoint
-     *        prefixes
-     * @throws IOException
-     * @throws InterruptedException
-     * @throws KeeperException
-     */
-    private void prepareCheckpointRestart(
-            long superstep,
-            Collection<PartitionOwner> partitionOwners)
-            throws IOException, KeeperException, InterruptedException {
-        FileSystem fs = getFs();
-        List<Path> validMetadataPathList = new ArrayList<Path>();
-        String finalizedCheckpointPath =
-            getCheckpointBasePath(superstep) + CHECKPOINT_FINALIZED_POSTFIX;
-        DataInputStream finalizedStream =
-            fs.open(new Path(finalizedCheckpointPath));
-        int prefixFileCount = finalizedStream.readInt();
-        for (int i = 0; i < prefixFileCount; ++i) {
-            String metadataFilePath =
-                finalizedStream.readUTF() + CHECKPOINT_METADATA_POSTFIX;
-            validMetadataPathList.add(new Path(metadataFilePath));
-        }
-
-        // Set the merged aggregator data if it exists.
-        int aggregatorDataSize = finalizedStream.readInt();
-        if (aggregatorDataSize > 0) {
-            byte [] aggregatorZkData = new byte[aggregatorDataSize];
-            int actualDataRead =
-                finalizedStream.read(aggregatorZkData, 0, aggregatorDataSize);
-            if (actualDataRead != aggregatorDataSize) {
-                throw new RuntimeException(
-                    "prepareCheckpointRestart: Only read " + actualDataRead +
-                    " of " + aggregatorDataSize + " aggregator bytes from " +
-                    finalizedCheckpointPath);
-            }
-            String mergedAggregatorPath =
-                getMergedAggregatorPath(getApplicationAttempt(), superstep - 1);
-            if (LOG.isInfoEnabled()) {
-                LOG.info("prepareCheckpointRestart: Reloading merged " +
-                         "aggregator " + "data '" +
-                         Arrays.toString(aggregatorZkData) +
-                         "' to previous checkpoint in path " +
-                         mergedAggregatorPath);
-            }
-            if (getZkExt().exists(mergedAggregatorPath, false) == null) {
-                getZkExt().createExt(mergedAggregatorPath,
-                                     aggregatorZkData,
-                                     Ids.OPEN_ACL_UNSAFE,
-                                     CreateMode.PERSISTENT,
-                                     true);
-            }
-            else {
-                getZkExt().setData(mergedAggregatorPath, aggregatorZkData, -1);
-            }
-        }
-        finalizedStream.close();
-
-        Map<Integer, PartitionOwner> idOwnerMap =
-            new HashMap<Integer, PartitionOwner>();
-        for (PartitionOwner partitionOwner : partitionOwners) {
-            if (idOwnerMap.put(partitionOwner.getPartitionId(),
-                               partitionOwner) != null) {
-                throw new IllegalStateException(
-                    "prepareCheckpointRestart: Duplicate partition " +
-                    partitionOwner);
+              LOG.info("checkWorkers: No response from " +
+                  "partition " + i + " (could be master)");
             }
+          }
         }
-        // Reading the metadata files.  Simply assign each partition owner
-        // the correct file prefix based on the partition id.
-        for (Path metadataPath : validMetadataPathList) {
-            String checkpointFilePrefix = metadataPath.toString();
-            checkpointFilePrefix =
-                checkpointFilePrefix.substring(
-                0,
-                checkpointFilePrefix.length() -
-                CHECKPOINT_METADATA_POSTFIX.length());
-            DataInputStream metadataStream = fs.open(metadataPath);
-            long partitions = metadataStream.readInt();
-            for (long i = 0; i < partitions; ++i) {
-                long dataPos = metadataStream.readLong();
-                int partitionId = metadataStream.readInt();
-                PartitionOwner partitionOwner = idOwnerMap.get(partitionId);
-                if (LOG.isInfoEnabled()) {
-                    LOG.info("prepareSuperstepRestart: File " + metadataPath +
-                              " with position " + dataPos +
-                              ", partition id = " + partitionId +
-                              " assigned to " + partitionOwner);
-                }
-                partitionOwner.setCheckpointFilesPrefix(checkpointFilePrefix);
-            }
-            metadataStream.close();
-        }
+      }
+      ++pollAttempt;
     }
-
-    @Override
-    public void setup() {
-        // Might have to manually load a checkpoint.
-        // In that case, the input splits are not set, they will be faked by
-        // the checkpoint files.  Each checkpoint file will be an input split
-        // and the input split
-        superstepCounter = getContext().getCounter(
-            GIRAPH_STATS_COUNTER_GROUP_NAME, "Superstep");
-        vertexCounter = getContext().getCounter(
-            GIRAPH_STATS_COUNTER_GROUP_NAME, "Aggregate vertices");
-        finishedVertexCounter = getContext().getCounter(
-            GIRAPH_STATS_COUNTER_GROUP_NAME, "Aggregate finished vertices");
-        edgeCounter = getContext().getCounter(
-            GIRAPH_STATS_COUNTER_GROUP_NAME, "Aggregate edges");
-        sentMessagesCounter = getContext().getCounter(
-            GIRAPH_STATS_COUNTER_GROUP_NAME, "Sent messages");
-        currentWorkersCounter = getContext().getCounter(
-            GIRAPH_STATS_COUNTER_GROUP_NAME, "Current workers");
-        currentMasterTaskPartitionCounter = getContext().getCounter(
-            GIRAPH_STATS_COUNTER_GROUP_NAME, "Current master task partition");
-        lastCheckpointedSuperstepCounter = getContext().getCounter(
-            GIRAPH_STATS_COUNTER_GROUP_NAME, "Last checkpointed superstep");
-        if (getRestartedSuperstep() != UNSET_SUPERSTEP) {
-            superstepCounter.increment(getRestartedSuperstep());
+    if (failJob) {
+      LOG.error("checkWorkers: Did not receive enough processes in " +
+          "time (only " + totalResponses + " of " +
+          minWorkers + " required).  This occurs if you do not " +
+          "have enough map tasks available simultaneously on " +
+          "your Hadoop instance to fulfill the number of " +
+          "requested workers.");
+      return null;
+    }
+
+    if (healthyWorkerInfoList.size() < minWorkers) {
+      LOG.error("checkWorkers: Only " + healthyWorkerInfoList.size() +
+          " available when " + minWorkers + " are required.");
+      return null;
+    }
+
+    getContext().setStatus(getGraphMapper().getMapFunctions() + " " +
+        "checkWorkers: Done - Found " + totalResponses +
+        " responses of " + maxWorkers + " needed to start superstep " +
+        getSuperstep());
+
+    return healthyWorkerInfoList;
+  }
+
+  @Override
+  public int createInputSplits() {
+    // Only the 'master' should be doing this.  Wait until the number of
+    // processes that have reported health exceeds the minimum percentage.
+    // If the minimum percentage is not met, fail the job.  Otherwise
+    // generate the input splits
+    try {
+      if (getZkExt().exists(inputSplitsPath, false) != null) {
+        LOG.info(inputSplitsPath +
+            " already exists, no need to create");
+        return Integer.parseInt(
+            new String(
+                getZkExt().getData(inputSplitsPath, false, null)));
+      }
+    } catch (KeeperException.NoNodeException e) {
+      if (LOG.isInfoEnabled()) {
+        LOG.info("createInputSplits: Need to create the " +
+            "input splits at " + inputSplitsPath);
+      }
+    } catch (KeeperException e) {
+      throw new IllegalStateException(
+          "createInputSplits: KeeperException", e);
+    } catch (InterruptedException e) {
+      throw new IllegalStateException(
+          "createInputSplits: InterrtupedException", e);
+    }
+
+    // When creating znodes, in case the master has already run, resume
+    // where it left off.
+    List<WorkerInfo> healthyWorkerInfoList = checkWorkers();
+    if (healthyWorkerInfoList == null) {
+      setJobState(ApplicationState.FAILED, -1, -1);
+      return -1;
+    }
+
+    // Note that the input splits may only be a sample if
+    // INPUT_SPLIT_SAMPLE_PERCENT is set to something other than 100
+    List<InputSplit> splitList =
+        generateInputSplits(healthyWorkerInfoList.size());
+    if (healthyWorkerInfoList.size() > splitList.size()) {
+      LOG.warn("createInputSplits: Number of inputSplits=" +
+          splitList.size() + " < " +
+          healthyWorkerInfoList.size() +
+          "=number of healthy processes, " +
+          "some workers will be not used");
+    }
+    String inputSplitPath = null;
+    for (int i = 0; i < splitList.size(); ++i) {
+      try {
+        ByteArrayOutputStream byteArrayOutputStream =
+            new ByteArrayOutputStream();
+        DataOutput outputStream =
+            new DataOutputStream(byteArrayOutputStream);
+        InputSplit inputSplit = splitList.get(i);
+        Text.writeString(outputStream,
+            inputSplit.getClass().getName());
+        ((Writable) inputSplit).write(outputStream);
+        inputSplitPath = inputSplitsPath + "/" + i;
+        getZkExt().createExt(inputSplitPath,
+            byteArrayOutputStream.toByteArray(),
+            Ids.OPEN_ACL_UNSAFE,
+            CreateMode.PERSISTENT,
+            true);
+        if (LOG.isDebugEnabled()) {
+          LOG.debug("createInputSplits: Created input split " +
+              "with index " + i + " serialized as " +
+              byteArrayOutputStream.toString());
         }
-    }
-
-    @Override
-    public boolean becomeMaster() {
-        // Create my bid to become the master, then try to become the worker
-        // or return false.
-        String myBid = null;
-        try {
-            myBid =
-                getZkExt().createExt(MASTER_ELECTION_PATH +
-                    "/" + getHostnamePartitionId(),
-                    null,
-                    Ids.OPEN_ACL_UNSAFE,
-                    CreateMode.EPHEMERAL_SEQUENTIAL,
-                    true);
-        } catch (KeeperException e) {
-            throw new IllegalStateException(
-                "becomeMaster: KeeperException", e);
-        } catch (InterruptedException e) {
-            throw new IllegalStateException(
-                "becomeMaster: IllegalStateException", e);
-        }
-        while (true) {
-            JSONObject jobState = getJobState();
-            try {
-                if ((jobState != null) &&
-                    ApplicationState.valueOf(
-                        jobState.getString(JSONOBJ_STATE_KEY)) ==
-                            ApplicationState.FINISHED) {
-                    LOG.info("becomeMaster: Job is finished, " +
-                             "give up trying to be the master!");
-                    isMaster = false;
-                    return isMaster;
-                }
-            } catch (JSONException e) {
-                throw new IllegalStateException(
-                    "becomeMaster: Couldn't get state from " + jobState, e);
-            }
-            try {
-                List<String> masterChildArr =
-                    getZkExt().getChildrenExt(
-                        MASTER_ELECTION_PATH, true, true, true);
-                if (LOG.isInfoEnabled()) {
-                    LOG.info("becomeMaster: First child is '" +
-                             masterChildArr.get(0) + "' and my bid is '" +
-                             myBid + "'");
-                }
-                if (masterChildArr.get(0).equals(myBid)) {
-                    currentMasterTaskPartitionCounter.increment(
-                        getTaskPartition() -
-                        currentMasterTaskPartitionCounter.getValue());
-                    aggregatorWriter = 
-                        BspUtils.createAggregatorWriter(getConfiguration());
-                    try {
-                        aggregatorWriter.initialize(getContext(),
-                                                    getApplicationAttempt());
-                    } catch (IOException e) {
-                        throw new IllegalStateException("becomeMaster: " +
-                            "Couldn't initialize aggregatorWriter", e);
-                    }
-                    LOG.info("becomeMaster: I am now the master!");
-                    isMaster = true;
-                    return isMaster;
-                }
-                LOG.info("becomeMaster: Waiting to become the master...");
-                getMasterElectionChildrenChangedEvent().waitForever();
-                getMasterElectionChildrenChangedEvent().reset();
-            } catch (KeeperException e) {
-                throw new IllegalStateException(
-                    "becomeMaster: KeeperException", e);
-            } catch (InterruptedException e) {
-                throw new IllegalStateException(
-                    "becomeMaster: IllegalStateException", e);
-            }
-        }
-    }
-
-    /**
-     * Collect and aggregate the worker statistics for a particular superstep.
-     *
-     * @param superstep Superstep to aggregate on
-     * @return Global statistics aggregated on all worker statistics
-     */
-    private GlobalStats aggregateWorkerStats(long superstep) {
-        Class<? extends Writable> partitionStatsClass =
-            masterGraphPartitioner.createPartitionStats().getClass();
-        GlobalStats globalStats = new GlobalStats();
-        // Get the stats from the all the worker selected nodes
-        String workerFinishedPath =
-            getWorkerFinishedPath(getApplicationAttempt(), superstep);
-        List<String> workerFinishedPathList = null;
-        try {
-            workerFinishedPathList =
-                getZkExt().getChildrenExt(
-                    workerFinishedPath, false, false, true);
-        } catch (KeeperException e) {
-            throw new IllegalStateException(
-                "aggregateWorkerStats: KeeperException", e);
-        } catch (InterruptedException e) {
-            throw new IllegalStateException(
-                "aggregateWorkerStats: InterruptedException", e);
-        }
-
-        allPartitionStatsList.clear();
-        for (String finishedPath : workerFinishedPathList) {
-            JSONObject workerFinishedInfoObj = null;
-            try {
-                byte [] zkData =
-                    getZkExt().getData(finishedPath, false, null);
-                workerFinishedInfoObj = new JSONObject(new String(zkData));
-                List<? extends Writable> writableList =
-                    WritableUtils.readListFieldsFromByteArray(
-                        Base64.decode(workerFinishedInfoObj.getString(
-                            JSONOBJ_PARTITION_STATS_KEY)),
-                        partitionStatsClass,
-                        getConfiguration());
-                for (Writable writable : writableList) {
-                    globalStats.addPartitionStats((PartitionStats) writable);
-                    globalStats.addMessageCount(
-                        workerFinishedInfoObj.getLong(
-                            JSONOBJ_NUM_MESSAGES_KEY));
-                    allPartitionStatsList.add((PartitionStats) writable);
-                }
-            } catch (JSONException e) {
-                throw new IllegalStateException(
-                    "aggregateWorkerStats: JSONException", e);
-            } catch (KeeperException e) {
-                throw new IllegalStateException(
-                    "aggregateWorkerStats: KeeperException", e);
-            } catch (InterruptedException e) {
-                throw new IllegalStateException(
-                    "aggregateWorkerStats: InterruptedException", e);
-            } catch (IOException e) {
-                throw new IllegalStateException(
-                    "aggregateWorkerStats: IOException", e);
-            }
-         }
-
+      } catch (KeeperException.NodeExistsException e) {
         if (LOG.isInfoEnabled()) {
-            LOG.info("aggregateWorkerStats: Aggregation found " + globalStats +
-                     " on superstep = " + getSuperstep());
-        }
-        return globalStats;
-    }
-
-    /**
-     * Get the aggregator values for a particular superstep,
-     * aggregate and save them. Does nothing on the INPUT_SUPERSTEP.
-     *
-     * @param superstep superstep to check
-     */
-    private void collectAndProcessAggregatorValues(long superstep) {
-        if (superstep == INPUT_SUPERSTEP) {
-            // Nothing to collect on the input superstep
-            return;
-        }
-        Map<String, Aggregator<? extends Writable>> aggregatorMap =
-            new TreeMap<String, Aggregator<? extends Writable>>();
-        String workerFinishedPath =
-            getWorkerFinishedPath(getApplicationAttempt(), superstep);
-        List<String> hostnameIdPathList = null;
-        try {
-            hostnameIdPathList =
-                getZkExt().getChildrenExt(
-                    workerFinishedPath, false, false, true);
-        } catch (KeeperException e) {
-            throw new IllegalStateException(
-                "collectAndProcessAggregatorValues: KeeperException", e);
-        } catch (InterruptedException e) {
-            throw new IllegalStateException(
-                "collectAndProcessAggregatorValues: InterruptedException", e);
-        }
-
-        for (String hostnameIdPath : hostnameIdPathList) {
-            JSONObject workerFinishedInfoObj = null;
-            JSONArray aggregatorArray = null;
-            try {
-                byte [] zkData =
-                    getZkExt().getData(hostnameIdPath, false, null);
-                workerFinishedInfoObj = new JSONObject(new String(zkData));
-            } catch (KeeperException e) {
-                throw new IllegalStateException(
-                    "collectAndProcessAggregatorValues: KeeperException", e);
-            } catch (InterruptedException e) {
-                throw new IllegalStateException(
-                    "collectAndProcessAggregatorValues: InterruptedException",
-                    e);
-            } catch (JSONException e) {
-                throw new IllegalStateException(
-                    "collectAndProcessAggregatorValues: JSONException", e);
-            }
-            try {
-                aggregatorArray = workerFinishedInfoObj.getJSONArray(
-                    JSONOBJ_AGGREGATOR_VALUE_ARRAY_KEY);
-            } catch (JSONException e) {
-                if (LOG.isDebugEnabled()) {
-                    LOG.debug("collectAndProcessAggregatorValues: " +
-                              "No aggregators" + " for " + hostnameIdPath);
-                }
-                continue;
-            }
-            for (int i = 0; i < aggregatorArray.length(); ++i) {
-                try {
-                    if (LOG.isInfoEnabled()) {
-                        LOG.info("collectAndProcessAggregatorValues: " +
-                                 "Getting aggregators from " +
-                                 aggregatorArray.getJSONObject(i));
-                    }
-                    String aggregatorName =
-                        aggregatorArray.getJSONObject(i).getString(
-                            AGGREGATOR_NAME_KEY);
-                    String aggregatorClassName =
-                        aggregatorArray.getJSONObject(i).getString(
-                            AGGREGATOR_CLASS_NAME_KEY);
-                    @SuppressWarnings("unchecked")
-                    Aggregator<Writable> aggregator =
-                        (Aggregator<Writable>) aggregatorMap.get(aggregatorName);
-                    boolean firstTime = false;
-                    if (aggregator == null) {
-                        @SuppressWarnings("unchecked")
-                        Aggregator<Writable> aggregatorWritable =
-                            (Aggregator<Writable>) getAggregator(aggregatorName);
-                        aggregator = aggregatorWritable;
-                        if (aggregator == null) {
-                            @SuppressWarnings("unchecked")
-                            Class<? extends Aggregator<Writable>> aggregatorClass =
-                                (Class<? extends Aggregator<Writable>>)
-                                    Class.forName(aggregatorClassName);
-                            aggregator = registerAggregator(
-                                aggregatorName,
-                                aggregatorClass);
-                        }
-                        aggregatorMap.put(aggregatorName, aggregator);
-                        firstTime = true;
-                    }
-                    Writable aggregatorValue =
-                        aggregator.createAggregatedValue();
-                    InputStream input =
-                        new ByteArrayInputStream(
-                            Base64.decode(
-                                aggregatorArray.getJSONObject(i).
-                                getString(AGGREGATOR_VALUE_KEY)));
-                    aggregatorValue.readFields(new DataInputStream(input));
-                    if (LOG.isDebugEnabled()) {
-                        LOG.debug("collectAndProcessAggregatorValues: " +
-                                  "aggregator value size=" + input.available() +
-                                  " for aggregator=" + aggregatorName +
-                                  " value=" + aggregatorValue);
-                    }
-                    if (firstTime) {
-                        aggregator.setAggregatedValue(aggregatorValue);
-                    } else {
-                        aggregator.aggregate(aggregatorValue);
-                    }
-                } catch (IOException e) {
-                    throw new IllegalStateException(
-                        "collectAndProcessAggregatorValues: " +
-                        "IOException when reading aggregator data " +
-                        aggregatorArray, e);
-                } catch (JSONException e) {
-                    throw new IllegalStateException(
-                        "collectAndProcessAggregatorValues: " +
-                        "JSONException when reading aggregator data " +
-                        aggregatorArray, e);
-                } catch (ClassNotFoundException e) {
-                    throw new IllegalStateException(
-                        "collectAndProcessAggregatorValues: " +
-                        "ClassNotFoundException when reading aggregator data " +
-                        aggregatorArray, e);
-                } catch (InstantiationException e) {
-                    throw new IllegalStateException(
-                        "collectAndProcessAggregatorValues: " +
-                        "InstantiationException when reading aggregator data " +
-                        aggregatorArray, e);
-                } catch (IllegalAccessException e) {
-                    throw new IllegalStateException(
-                        "collectAndProcessAggregatorValues: " +
-                        "IOException when reading aggregator data " +
-                        aggregatorArray, e);
-                }
-            }
+          LOG.info("createInputSplits: Node " +
+              inputSplitPath + " already exists.");
         }
-        if (aggregatorMap.size() > 0) {
-            String mergedAggregatorPath =
-                getMergedAggregatorPath(getApplicationAttempt(), superstep);
-            byte [] zkData = null;
-            JSONArray aggregatorArray = new JSONArray();
-            for (Map.Entry<String, Aggregator<? extends Writable>> entry :
-                    aggregatorMap.entrySet()) {
-                try {
-                    ByteArrayOutputStream outputStream =
-                        new ByteArrayOutputStream();
-                    DataOutput output = new DataOutputStream(outputStream);
-                    entry.getValue().getAggregatedValue().write(output);
-
-                    JSONObject aggregatorObj = new JSONObject();
-                    aggregatorObj.put(AGGREGATOR_NAME_KEY,
-                                      entry.getKey());
-                    aggregatorObj.put(
-                        AGGREGATOR_VALUE_KEY,
-                        Base64.encodeBytes(outputStream.toByteArray()));
-                    aggregatorArray.put(aggregatorObj);
-                    if (LOG.isInfoEnabled()) {
-                        LOG.info("collectAndProcessAggregatorValues: " +
-                                 "Trying to add aggregatorObj " +
-                                 aggregatorObj + "(" +
-                                 entry.getValue().getAggregatedValue() +
-                                 ") to merged aggregator path " +
-                                 mergedAggregatorPath);
-                    }
-                } catch (IOException e) {
-                    throw new IllegalStateException(
-                        "collectAndProcessAggregatorValues: " +
-                        "IllegalStateException", e);
-                } catch (JSONException e) {
-                    throw new IllegalStateException(
-                        "collectAndProcessAggregatorValues: JSONException", e);
-                }
-            }
-            try {
-                zkData = aggregatorArray.toString().getBytes();
-                getZkExt().createExt(mergedAggregatorPath,
-                                     zkData,
-                                     Ids.OPEN_ACL_UNSAFE,
-                                     CreateMode.PERSISTENT,
-                                     true);
-            } catch (KeeperException.NodeExistsException e) {
-                LOG.warn("collectAndProcessAggregatorValues: " +
-                         mergedAggregatorPath+
-                         " already exists!");
-            } catch (KeeperException e) {
-                throw new IllegalStateException(
-                    "collectAndProcessAggregatorValues: KeeperException", e);
-            } catch (InterruptedException e) {
-                throw new IllegalStateException(
-                    "collectAndProcessAggregatorValues: IllegalStateException",
-                    e);
-            }
-            if (LOG.isInfoEnabled()) {
-                LOG.info("collectAndProcessAggregatorValues: Finished " +
-                         "loading " +
-                         mergedAggregatorPath+ " with aggregator values " +
-                         aggregatorArray);
-            }
-        }
-    }
-
-    /**
-     * Finalize the checkpoint file prefixes by taking the chosen workers and
-     * writing them to a finalized file.  Also write out the master
-     * aggregated aggregator array from the previous superstep.
-     *
-     * @param superstep superstep to finalize
-     * @param chosenWorkerList list of chosen workers that will be finalized
-     * @throws IOException
-     * @throws InterruptedException
-     * @throws KeeperException
-     */
-    private void finalizeCheckpoint(
-            long superstep,
-            List<WorkerInfo> chosenWorkerInfoList)
-            throws IOException, KeeperException, InterruptedException {
-        Path finalizedCheckpointPath =
-            new Path(getCheckpointBasePath(superstep) +
-                     CHECKPOINT_FINALIZED_POSTFIX);
+      } catch (KeeperException e) {
+        throw new IllegalStateException(
+            "createInputSplits: KeeperException", e);
+      } catch (InterruptedException e) {
+        throw new IllegalStateException(
+            "createInputSplits: IllegalStateException", e);
+      } catch (IOException e) {
+        throw new IllegalStateException(
+            "createInputSplits: IOException", e);
+      }
+    }
+
+    // Let workers know they can start trying to load the input splits
+    try {
+      getZkExt().create(inputSplitsAllReadyPath,
+          null,
+          Ids.OPEN_ACL_UNSAFE,
+          CreateMode.PERSISTENT);
+    } catch (KeeperException.NodeExistsException e) {
+      LOG.info("createInputSplits: Node " +
+          inputSplitsAllReadyPath + " already exists.");
+    } catch (KeeperException e) {
+      throw new IllegalStateException(
+          "createInputSplits: KeeperException", e);
+    } catch (InterruptedException e) {
+      throw new IllegalStateException(
+          "createInputSplits: IllegalStateException", e);
+    }
+
+    return splitList.size();
+  }
+
+  /**
+   * Read the finalized checkpoint file and associated metadata files for the
+   * checkpoint.  Modifies the {@link PartitionOwner} objects to get the
+   * checkpoint prefixes.  It is an optimization to prevent all workers from
+   * searching all the files.  Also read in the aggregator data from the
+   * finalized checkpoint file and setting it.
+   *
+   * @param superstep Checkpoint set to examine.
+   * @param partitionOwners Partition owners to modify with checkpoint
+   *        prefixes
+   * @throws IOException
+   * @throws InterruptedException
+   * @throws KeeperException
+   */
+  private void prepareCheckpointRestart(
+    long superstep,
+    Collection<PartitionOwner> partitionOwners)
+    throws IOException, KeeperException, InterruptedException {
+    FileSystem fs = getFs();
+    List<Path> validMetadataPathList = new ArrayList<Path>();
+    String finalizedCheckpointPath =
+        getCheckpointBasePath(superstep) + CHECKPOINT_FINALIZED_POSTFIX;
+    DataInputStream finalizedStream =
+        fs.open(new Path(finalizedCheckpointPath));
+    int prefixFileCount = finalizedStream.readInt();
+    for (int i = 0; i < prefixFileCount; ++i) {
+      String metadataFilePath =
+          finalizedStream.readUTF() + CHECKPOINT_METADATA_POSTFIX;
+      validMetadataPathList.add(new Path(metadataFilePath));
+    }
+
+    // Set the merged aggregator data if it exists.
+    int aggregatorDataSize = finalizedStream.readInt();
+    if (aggregatorDataSize > 0) {
+      byte [] aggregatorZkData = new byte[aggregatorDataSize];
+      int actualDataRead =
+          finalizedStream.read(aggregatorZkData, 0, aggregatorDataSize);
+      if (actualDataRead != aggregatorDataSize) {
+        throw new RuntimeException(
+            "prepareCheckpointRestart: Only read " + actualDataRead +
+            " of " + aggregatorDataSize + " aggregator bytes from " +
+            finalizedCheckpointPath);
+      }
+      String mergedAggregatorPath =
+          getMergedAggregatorPath(getApplicationAttempt(), superstep - 1);
+      if (LOG.isInfoEnabled()) {
+        LOG.info("prepareCheckpointRestart: Reloading merged " +
+            "aggregator " + "data '" +
+            Arrays.toString(aggregatorZkData) +
+            "' to previous checkpoint in path " +
+            mergedAggregatorPath);
+      }
+      if (getZkExt().exists(mergedAggregatorPath, false) == null) {
+        getZkExt().createExt(mergedAggregatorPath,
+            aggregatorZkData,
+            Ids.OPEN_ACL_UNSAFE,
+            CreateMode.PERSISTENT,
+            true);
+      } else {
+        getZkExt().setData(mergedAggregatorPath, aggregatorZkData, -1);
+      }
+    }
+    finalizedStream.close();
+
+    Map<Integer, PartitionOwner> idOwnerMap =
+        new HashMap<Integer, PartitionOwner>();
+    for (PartitionOwner partitionOwner : partitionOwners) {
+      if (idOwnerMap.put(partitionOwner.getPartitionId(),
+          partitionOwner) != null) {
+        throw new IllegalStateException(
+            "prepareCheckpointRestart: Duplicate partition " +
+                partitionOwner);
+      }
+    }
+    // Reading the metadata files.  Simply assign each partition owner
+    // the correct file prefix based on the partition id.
+    for (Path metadataPath : validMetadataPathList) {
+      String checkpointFilePrefix = metadataPath.toString();
+      checkpointFilePrefix =
+          checkpointFilePrefix.substring(
+              0,
+              checkpointFilePrefix.length() -
+              CHECKPOINT_METADATA_POSTFIX.length());
+      DataInputStream metadataStream = fs.open(metadataPath);
+      long partitions = metadataStream.readInt();
+      for (long i = 0; i < partitions; ++i) {
+        long dataPos = metadataStream.readLong();
+        int partitionId = metadataStream.readInt();
+        PartitionOwner partitionOwner = idOwnerMap.get(partitionId);
+        if (LOG.isInfoEnabled()) {
+          LOG.info("prepareSuperstepRestart: File " + metadataPath +
+              " with position " + dataPos +
+              ", partition id = " + partitionId +
+              " assigned to " + partitionOwner);
+        }
+        partitionOwner.setCheckpointFilesPrefix(checkpointFilePrefix);
+      }
+      metadataStream.close();
+    }
+  }
+
+  @Override
+  public void setup() {
+    // Might have to manually load a checkpoint.
+    // In that case, the input splits are not set, they will be faked by
+    // the checkpoint files.  Each checkpoint file will be an input split
+    // and the input split
+    superstepCounter = getContext().getCounter(
+        GIRAPH_STATS_COUNTER_GROUP_NAME, "Superstep");
+    vertexCounter = getContext().getCounter(
+        GIRAPH_STATS_COUNTER_GROUP_NAME, "Aggregate vertices");
+    finishedVertexCounter = getContext().getCounter(
+        GIRAPH_STATS_COUNTER_GROUP_NAME, "Aggregate finished vertices");
+    edgeCounter = getContext().getCounter(
+        GIRAPH_STATS_COUNTER_GROUP_NAME, "Aggregate edges");
+    sentMessagesCounter = getContext().getCounter(
+        GIRAPH_STATS_COUNTER_GROUP_NAME, "Sent messages");
+    currentWorkersCounter = getContext().getCounter(
+        GIRAPH_STATS_COUNTER_GROUP_NAME, "Current workers");
+    currentMasterTaskPartitionCounter = getContext().getCounter(
+        GIRAPH_STATS_COUNTER_GROUP_NAME, "Current master task partition");
+    lastCheckpointedSuperstepCounter = getContext().getCounter(
+        GIRAPH_STATS_COUNTER_GROUP_NAME, "Last checkpointed superstep");
+    if (getRestartedSuperstep() != UNSET_SUPERSTEP) {
+      superstepCounter.increment(getRestartedSuperstep());
+    }
+  }
+
+  @Override
+  public boolean becomeMaster() {
+    // Create my bid to become the master, then try to become the worker
+    // or return false.
+    String myBid = null;
+    try {
+      myBid =
+          getZkExt().createExt(masterElectionPath +
+              "/" + getHostnamePartitionId(),
+              null,
+              Ids.OPEN_ACL_UNSAFE,
+              CreateMode.EPHEMERAL_SEQUENTIAL,
+              true);
+    } catch (KeeperException e) {
+      throw new IllegalStateException(
+          "becomeMaster: KeeperException", e);
+    } catch (InterruptedException e) {
+      throw new IllegalStateException(
+          "becomeMaster: IllegalStateException", e);
+    }
+    while (true) {
+      JSONObject jobState = getJobState();
+      try {
+        if ((jobState != null) &&
+            ApplicationState.valueOf(
+                jobState.getString(JSONOBJ_STATE_KEY)) ==
+                ApplicationState.FINISHED) {
+          LOG.info("becomeMaster: Job is finished, " +
+              "give up trying to be the master!");
+          isMaster = false;
+          return isMaster;
+        }
+      } catch (JSONException e) {
+        throw new IllegalStateException(
+            "becomeMaster: Couldn't get state from " + jobState, e);
+      }
+      try {
+        List<String> masterChildArr =
+            getZkExt().getChildrenExt(
+                masterElectionPath, true, true, true);
+        if (LOG.isInfoEnabled()) {
+          LOG.info("becomeMaster: First child is '" +
+              masterChildArr.get(0) + "' and my bid is '" +
+              myBid + "'");
+        }
+        if (masterChildArr.get(0).equals(myBid)) {
+          currentMasterTaskPartitionCounter.increment(
+              getTaskPartition() -
+              currentMasterTaskPartitionCounter.getValue());
+          aggregatorWriter =
+              BspUtils.createAggregatorWriter(getConfiguration());
+          try {
+            aggregatorWriter.initialize(getContext(),
+                getApplicationAttempt());
+          } catch (IOException e) {
+            throw new IllegalStateException("becomeMaster: " +
+                "Couldn't initialize aggregatorWriter", e);
+          }
+          LOG.info("becomeMaster: I am now the master!");
+          isMaster = true;
+          return isMaster;
+        }
+        LOG.info("becomeMaster: Waiting to become the master...");
+        getMasterElectionChildrenChangedEvent().waitForever();
+        getMasterElectionChildrenChangedEvent().reset();
+      } catch (KeeperException e) {
+        throw new IllegalStateException(
+            "becomeMaster: KeeperException", e);
+      } catch (InterruptedException e) {
+        throw new IllegalStateException(
+            "becomeMaster: IllegalStateException", e);
+      }
+    }
+  }
+
+  /**
+   * Collect and aggregate the worker statistics for a particular superstep.
+   *
+   * @param superstep Superstep to aggregate on
+   * @return Global statistics aggregated on all worker statistics
+   */
+  private GlobalStats aggregateWorkerStats(long superstep) {
+    Class<? extends Writable> partitionStatsClass =
+        masterGraphPartitioner.createPartitionStats().getClass();
+    GlobalStats globalStats = new GlobalStats();
+    // Get the stats from the all the worker selected nodes
+    String workerFinishedPath =
+        getWorkerFinishedPath(getApplicationAttempt(), superstep);
+    List<String> workerFinishedPathList = null;
+    try {
+      workerFinishedPathList =
+          getZkExt().getChildrenExt(
+              workerFinishedPath, false, false, true);
+    } catch (KeeperException e) {
+      throw new IllegalStateException(
+          "aggregateWorkerStats: KeeperException", e);
+    } catch (InterruptedException e) {
+      throw new IllegalStateException(
+          "aggregateWorkerStats: InterruptedException", e);
+    }
+
+    allPartitionStatsList.clear();
+    for (String finishedPath : workerFinishedPathList) {
+      JSONObject workerFinishedInfoObj = null;
+      try {
+        byte [] zkData =
+            getZkExt().getData(finishedPath, false, null);
+        workerFinishedInfoObj = new JSONObject(new String(zkData));
+        List<? extends Writable> writableList =
+            WritableUtils.readListFieldsFromByteArray(
+                Base64.decode(workerFinishedInfoObj.getString(
+                    JSONOBJ_PARTITION_STATS_KEY)),
+                    partitionStatsClass,
+                    getConfiguration());
+        for (Writable writable : writableList) {
+          globalStats.addPartitionStats((PartitionStats) writable);
+          globalStats.addMessageCount(
+              workerFinishedInfoObj.getLong(
+                  JSONOBJ_NUM_MESSAGES_KEY));
+          allPartitionStatsList.add((PartitionStats) writable);
+        }
+      } catch (JSONException e) {
+        throw new IllegalStateException(
+            "aggregateWorkerStats: JSONException", e);
+      } catch (KeeperException e) {
+        throw new IllegalStateException(
+            "aggregateWorkerStats: KeeperException", e);
+      } catch (InterruptedException e) {
+        throw new IllegalStateException(
+            "aggregateWorkerStats: InterruptedException", e);
+      } catch (IOException e) {
+        throw new IllegalStateException(
+            "aggregateWorkerStats: IOException", e);
+      }
+    }
+
+    if (LOG.isInfoEnabled()) {
+      LOG.info("aggregateWorkerStats: Aggregation found " + globalStats +
+          " on superstep = " + getSuperstep());
+    }
+    return globalStats;
+  }
+
+  /**
+   * Get the aggregator values for a particular superstep,
+   * aggregate and save them. Does nothing on the INPUT_SUPERSTEP.
+   *
+   * @param superstep superstep to check
+   */
+  private void collectAndProcessAggregatorValues(long superstep) {
+    if (superstep == INPUT_SUPERSTEP) {
+      // Nothing to collect on the input superstep
+      return;
+    }
+    Map<String, Aggregator<? extends Writable>> aggregatorMap =
+        new TreeMap<String, Aggregator<? extends Writable>>();
+    String workerFinishedPath =
+        getWorkerFinishedPath(getApplicationAttempt(), superstep);
+    List<String> hostnameIdPathList = null;
+    try {
+      hostnameIdPathList =
+          getZkExt().getChildrenExt(
+              workerFinishedPath, false, false, true);
+    } catch (KeeperException e) {
+      throw new IllegalStateException(
+          "collectAndProcessAggregatorValues: KeeperException", e);
+    } catch (InterruptedException e) {
+      throw new IllegalStateException(
+          "collectAndProcessAggregatorValues: InterruptedException", e);
+    }
+
+    for (String hostnameIdPath : hostnameIdPathList) {
+      JSONObject workerFinishedInfoObj = null;
+      JSONArray aggregatorArray = null;
+      try {
+        byte [] zkData =
+            getZkExt().getData(hostnameIdPath, false, null);
+        workerFinishedInfoObj = new JSONObject(new String(zkData));
+      } catch (KeeperException e) {
+        throw new IllegalStateException(
+            "collectAndProcessAggregatorValues: KeeperException", e);
+      } catch (InterruptedException e) {
+        throw new IllegalStateException(
+            "collectAndProcessAggregatorValues: InterruptedException",
+            e);
+      } catch (JSONException e) {
+        throw new IllegalStateException(
+            "collectAndProcessAggregatorValues: JSONException", e);
+      }
+      try {
+        aggregatorArray = workerFinishedInfoObj.getJSONArray(
+            JSONOBJ_AGGREGATOR_VALUE_ARRAY_KEY);
+      } catch (JSONException e) {
+        if (LOG.isDebugEnabled()) {
+          LOG.debug("collectAndProcessAggregatorValues: " +
+              "No aggregators" + " for " + hostnameIdPath);
+        }
+        continue;
+      }
+      for (int i = 0; i < aggregatorArray.length(); ++i) {
         try {
-            getFs().delete(finalizedCheckpointPath, false);
+          if (LOG.isInfoEnabled()) {
+            LOG.info("collectAndProcessAggregatorValues: " +
+                "Getting aggregators from " +
+                aggregatorArray.getJSONObject(i));
+          }
+          String aggregatorName =
+              aggregatorArray.getJSONObject(i).getString(
+                  AGGREGATOR_NAME_KEY);
+          String aggregatorClassName =
+              aggregatorArray.getJSONObject(i).getString(
+                  AGGREGATOR_CLASS_NAME_KEY);
+          @SuppressWarnings("unchecked")
+          Aggregator<Writable> aggregator =
+            (Aggregator<Writable>) aggregatorMap.get(aggregatorName);
+          boolean firstTime = false;
+          if (aggregator == null) {
+            @SuppressWarnings("unchecked")
+            Aggregator<Writable> aggregatorWritable =
+              (Aggregator<Writable>) getAggregator(aggregatorName);
+            aggregator = aggregatorWritable;
+            if (aggregator == null) {
+              @SuppressWarnings("unchecked")
+              Class<? extends Aggregator<Writable>> aggregatorClass =
+                (Class<? extends Aggregator<Writable>>)
+                Class.forName(aggregatorClassName);
+              aggregator = registerAggregator(
+                  aggregatorName,
+                  aggregatorClass);
+            }
+            aggregatorMap.put(aggregatorName, aggregator);
+            firstTime = true;
+          }
+          Writable aggregatorValue =
+              aggregator.createAggregatedValue();
+          InputStream input =
+              new ByteArrayInputStream(
+                  Base64.decode(
+                      aggregatorArray.getJSONObject(i).
+                      getString(AGGREGATOR_VALUE_KEY)));
+          aggregatorValue.readFields(new DataInputStream(input));
+          if (LOG.isDebugEnabled()) {
+            LOG.debug("collectAndProcessAggregatorValues: " +
+                "aggregator value size=" + input.available() +
+                " for aggregator=" + aggregatorName +
+                " value=" + aggregatorValue);
+          }
+          if (firstTime) {
+            aggregator.setAggregatedValue(aggregatorValue);
+          } else {
+            aggregator.aggregate(aggregatorValue);
+          }
         } catch (IOException e) {
-            LOG.warn("finalizedValidCheckpointPrefixes: Removed old file " +
-                     finalizedCheckpointPath);
-        }
-
-        // Format:
-        // <number of files>
-        // <used file prefix 0><used file prefix 1>...
-        // <aggregator data length><aggregators as a serialized JSON byte array>
-        FSDataOutputStream finalizedOutputStream =
-            getFs().create(finalizedCheckpointPath);
-        finalizedOutputStream.writeInt(chosenWorkerInfoList.size());
-        for (WorkerInfo chosenWorkerInfo : chosenWorkerInfoList) {
-            String chosenWorkerInfoPrefix =
-                getCheckpointBasePath(superstep) + "." +
-                chosenWorkerInfo.getHostnameId();
-            finalizedOutputStream.writeUTF(chosenWorkerInfoPrefix);
-        }
-        String mergedAggregatorPath =
-            getMergedAggregatorPath(getApplicationAttempt(), superstep - 1);
-        if (getZkExt().exists(mergedAggregatorPath, false) != null) {
-            byte [] aggregatorZkData =
-                getZkExt().getData(mergedAggregatorPath, false, null);
-            finalizedOutputStream.writeInt(aggregatorZkData.length);
-            finalizedOutputStream.write(aggregatorZkData);
-        }
-        else {
-            finalizedOutputStream.writeInt(0);
-        }
-        finalizedOutputStream.close();
-        lastCheckpointedSuperstep = superstep;
-        lastCheckpointedSuperstepCounter.increment(superstep -
-            lastCheckpointedSuperstepCounter.getValue());
-    }
-
-    /**
-     * Assign the partitions for this superstep.  If there are changes,
-     * the workers will know how to do the exchange.  If this was a restarted
-     * superstep, then make sure to provide information on where to find the
-     * checkpoint file.
-     *
-     * @param allPartitionStatsList All partition stats
-     * @param chosenWorkerInfoList All the chosen worker infos
-     * @param masterGraphPartitioner Master graph partitioner
-     */
-    private void assignPartitionOwners(
-            List<PartitionStats> allPartitionStatsList,
-            List<WorkerInfo> chosenWorkerInfoList,
-            MasterGraphPartitioner<I, V, E, M> masterGraphPartitioner) {
-        Collection<PartitionOwner> partitionOwners;
-        if (getSuperstep() == INPUT_SUPERSTEP ||
-                getSuperstep() == getRestartedSuperstep()) {
-            partitionOwners =
-                masterGraphPartitioner.createInitialPartitionOwners(
-                    chosenWorkerInfoList, maxWorkers);
-            if (partitionOwners.isEmpty()) {
-                throw new IllegalStateException(
-                    "assignAndExchangePartitions: No partition owners set");
-            }
-        } else {
-            partitionOwners =
-                masterGraphPartitioner.generateChangedPartitionOwners(
-                    allPartitionStatsList,
-                    chosenWorkerInfoList,
-                    maxWorkers,
-                    getSuperstep());
-
-            PartitionUtils.analyzePartitionStats(partitionOwners,
-                                                 allPartitionStatsList);
-        }
-
-        // If restarted, prepare the checkpoint restart
-        if (getRestartedSuperstep() == getSuperstep()) {
-            try {
-                prepareCheckpointRestart(getSuperstep(), partitionOwners);
-            } catch (IOException e) {
-                throw new IllegalStateException(
-                    "assignPartitionOwners: IOException on preparing", e);
-            } catch (KeeperException e) {
-                throw new IllegalStateException(
-                    "assignPartitionOwners: KeeperException on preparing", e);
-            } catch (InterruptedException e) {
-                throw new IllegalStateException(
-                    "assignPartitionOwners: InteruptedException on preparing",
-                    e);
-            }
-        }
-
-        // There will be some exchange of partitions
-        if (!partitionOwners.isEmpty()) {
-            String vertexExchangePath =
-                getPartitionExchangePath(getApplicationAttempt(),
-                                         getSuperstep());
-            try {
-                getZkExt().createOnceExt(vertexExchangePath,
-                                         null,
-                                         Ids.OPEN_ACL_UNSAFE,
-                                         CreateMode.PERSISTENT,
-                                         true);
-            } catch (KeeperException e) {
-                throw new IllegalStateException(
-                    "assignPartitionOwners: KeeperException creating " +
-                    vertexExchangePath);
-            } catch (InterruptedException e) {
-                throw new IllegalStateException(
-                    "assignPartitionOwners: InterruptedException creating " +
-                    vertexExchangePath);
-            }
-        }
-
-        // Workers are waiting for these assignments
-        String partitionAssignmentsPath =
-            getPartitionAssignmentsPath(getApplicationAttempt(),
-                                        getSuperstep());
-        WritableUtils.writeListToZnode(
-            getZkExt(),
-            partitionAssignmentsPath,
-            -1,
-            new ArrayList<Writable>(partitionOwners));
-    }
-
-    /**
-     * Check whether the workers chosen for this superstep are still alive
-     *
-     * @param chosenWorkerHealthPath Path to the healthy workers in ZooKeeper
-     * @param chosenWorkerList List of the healthy workers
-     * @return true if they are all alive, false otherwise.
-     * @throws InterruptedException
-     * @throws KeeperException
-     */
-    private boolean superstepChosenWorkerAlive(
-            String chosenWorkerInfoHealthPath,
-            List<WorkerInfo> chosenWorkerInfoList)
-            throws KeeperException, InterruptedException {
-        List<WorkerInfo> chosenWorkerInfoHealthyList =
-            getWorkerInfosFromPath(chosenWorkerInfoHealthPath, false);
-        Set<WorkerInfo> chosenWorkerInfoHealthySet =
-            new HashSet<WorkerInfo>(chosenWorkerInfoHealthyList);
-        boolean allChosenWorkersHealthy = true;
-        for (WorkerInfo chosenWorkerInfo : chosenWorkerInfoList) {
-            if (!chosenWorkerInfoHealthySet.contains(chosenWorkerInfo)) {
-                allChosenWorkersHealthy = false;
-                LOG.error("superstepChosenWorkerAlive: Missing chosen " +
-                          "worker " + chosenWorkerInfo +
-                          " on superstep " + getSuperstep());
-            }
-        }
-        return allChosenWorkersHealthy;
-    }
-
-    @Override
-    public void restartFromCheckpoint(long checkpoint) {
-        // Process:
-        // 1. Remove all old input split data
-        // 2. Increase the application attempt and set to the correct checkpoint
-        // 3. Send command to all workers to restart their tasks
+          throw new IllegalStateException(
+              "collectAndProcessAggregatorValues: " +
+                  "IOException when reading aggregator data " +
+                  aggregatorArray, e);
+        } catch (JSONException e) {
+          throw new IllegalStateException(
+              "collectAndProcessAggregatorValues: " +
+                  "JSONException when reading aggregator data " +
+                  aggregatorArray, e);
+        } catch (ClassNotFoundException e) {
+          throw new IllegalStateException(
+              "collectAndProcessAggregatorValues: " +
+                  "ClassNotFoundException when reading aggregator data " +
+                  aggregatorArray, e);
+        } catch (InstantiationException e) {
+          throw new IllegalStateException(
+              "collectAndProcessAggregatorValues: " +
+                  "InstantiationException when reading aggregator data " +
+                  aggregatorArray, e);
+        } catch (IllegalAccessException e) {
+          throw new IllegalStateException(
+              "collectAndProcessAggregatorValues: " +
+                  "IOException when reading aggregator data " +
+                  aggregatorArray, e);
+        }
+      }
+    }
+    if (aggregatorMap.size() > 0) {
+      String mergedAggregatorPath =
+          getMergedAggregatorPath(getApplicationAttempt(), superstep);
+      byte [] zkData = null;

[... 1279 lines stripped ...]