You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@giraph.apache.org by ni...@apache.org on 2013/01/04 21:52:39 UTC

[9/23] GIRAPH-409: Refactor / cleanups (nitay)

http://git-wip-us.apache.org/repos/asf/giraph/blob/1684891e/giraph-core/src/main/java/org/apache/giraph/master/BspServiceMaster.java
----------------------------------------------------------------------
diff --git a/giraph-core/src/main/java/org/apache/giraph/master/BspServiceMaster.java b/giraph-core/src/main/java/org/apache/giraph/master/BspServiceMaster.java
new file mode 100644
index 0000000..4483385
--- /dev/null
+++ b/giraph-core/src/main/java/org/apache/giraph/master/BspServiceMaster.java
@@ -0,0 +1,1895 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.giraph.master;
+
+import org.apache.commons.io.FilenameUtils;
+import org.apache.giraph.bsp.ApplicationState;
+import org.apache.giraph.bsp.BspInputFormat;
+import org.apache.giraph.bsp.CentralizedServiceMaster;
+import org.apache.giraph.bsp.SuperstepState;
+import org.apache.giraph.comm.MasterClient;
+import org.apache.giraph.comm.MasterServer;
+import org.apache.giraph.comm.netty.NettyMasterClient;
+import org.apache.giraph.comm.netty.NettyMasterServer;
+import org.apache.giraph.conf.GiraphConstants;
+import org.apache.giraph.conf.ImmutableClassesGiraphConfiguration;
+import org.apache.giraph.counters.GiraphStats;
+import org.apache.giraph.graph.AddressesAndPartitionsWritable;
+import org.apache.giraph.bsp.BspService;
+import org.apache.giraph.graph.MapFunctions;
+import org.apache.giraph.io.EdgeInputFormat;
+import org.apache.giraph.io.GiraphInputFormat;
+import org.apache.giraph.graph.GlobalStats;
+import org.apache.giraph.graph.GraphMapper;
+import org.apache.giraph.graph.GraphState;
+import org.apache.giraph.graph.InputSplitEvents;
+import org.apache.giraph.graph.InputSplitPaths;
+import org.apache.giraph.io.VertexInputFormat;
+import org.apache.giraph.partition.MasterGraphPartitioner;
+import org.apache.giraph.partition.PartitionOwner;
+import org.apache.giraph.partition.PartitionStats;
+import org.apache.giraph.partition.PartitionUtils;
+import org.apache.giraph.metrics.AggregatedMetrics;
+import org.apache.giraph.metrics.GiraphMetrics;
+import org.apache.giraph.metrics.GiraphTimer;
+import org.apache.giraph.metrics.GiraphTimerContext;
+import org.apache.giraph.metrics.ResetSuperstepMetricsObserver;
+import org.apache.giraph.metrics.SuperstepMetricsRegistry;
+import org.apache.giraph.metrics.WorkerSuperstepMetrics;
+import org.apache.giraph.utils.ProgressableUtils;
+import org.apache.giraph.time.SystemTime;
+import org.apache.giraph.time.Time;
+import org.apache.giraph.utils.WritableUtils;
+import org.apache.giraph.worker.WorkerInfo;
+import org.apache.giraph.zk.BspEvent;
+import org.apache.giraph.zk.PredicateLock;
+import org.apache.hadoop.fs.FSDataOutputStream;
+import org.apache.hadoop.fs.FileStatus;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.fs.PathFilter;
+import org.apache.hadoop.io.Text;
+import org.apache.hadoop.io.Writable;
+import org.apache.hadoop.io.WritableComparable;
+import org.apache.hadoop.mapred.JobID;
+import org.apache.hadoop.mapred.RunningJob;
+import org.apache.hadoop.mapreduce.InputSplit;
+import org.apache.hadoop.mapreduce.Mapper;
+import org.apache.log4j.Logger;
+import org.apache.zookeeper.CreateMode;
+import org.apache.zookeeper.KeeperException;
+import org.apache.zookeeper.WatchedEvent;
+import org.apache.zookeeper.Watcher.Event.EventType;
+import org.apache.zookeeper.ZooDefs.Ids;
+import org.json.JSONException;
+import org.json.JSONObject;
+
+import com.google.common.collect.Lists;
+import com.google.common.collect.Sets;
+import net.iharder.Base64;
+
+import java.io.ByteArrayOutputStream;
+import java.io.DataInputStream;
+import java.io.DataOutput;
+import java.io.DataOutputStream;
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.Collection;
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Map;
+import java.util.Set;
+import java.util.TreeSet;
+import java.util.concurrent.Callable;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.Executors;
+import java.util.concurrent.TimeUnit;
+
+/**
+ * ZooKeeper-based implementation of {@link CentralizedServiceMaster}.
+ *
+ * @param <I> Vertex id
+ * @param <V> Vertex data
+ * @param <E> Edge data
+ * @param <M> Message data
+ */
+@SuppressWarnings("rawtypes")
+public class BspServiceMaster<I extends WritableComparable,
+    V extends Writable, E extends Writable, M extends Writable>
+    extends BspService<I, V, E, M>
+    implements CentralizedServiceMaster<I, V, E, M>,
+    ResetSuperstepMetricsObserver {
+  /** Print worker names only if there are 10 workers left */
+  public static final int MAX_PRINTABLE_REMAINING_WORKERS = 10;
+  /** How many threads to use when writing input splits to zookeeper*/
+  public static final String INPUT_SPLIT_THREAD_COUNT =
+      "giraph.inputSplitThreadCount";
+  /** Default number of threads to use when writing input splits to zookeeper */
+  public static final int DEFAULT_INPUT_SPLIT_THREAD_COUNT = 1;
+  /** Time instance to use for timing */
+  private static final Time TIME = SystemTime.get();
+  /** Class logger */
+  private static final Logger LOG = Logger.getLogger(BspServiceMaster.class);
+  /** Am I the master? */
+  private boolean isMaster = false;
+  /** Max number of workers */
+  private final int maxWorkers;
+  /** Min number of workers */
+  private final int minWorkers;
+  /** Min % responded workers */
+  private final float minPercentResponded;
+  /** Msecs to wait for an event */
+  private final int eventWaitMsecs;
+  /** Max msecs to wait for a superstep to get enough workers */
+  private final int maxSuperstepWaitMsecs;
+  /** Min number of long tails before printing */
+  private final int partitionLongTailMinPrint;
+  /** Last finalized checkpoint */
+  private long lastCheckpointedSuperstep = -1;
+  /** Worker wrote checkpoint */
+  private final BspEvent workerWroteCheckpoint;
+  /** State of the superstep changed */
+  private final BspEvent superstepStateChanged;
+  /** Master graph partitioner */
+  private final MasterGraphPartitioner<I, V, E, M> masterGraphPartitioner;
+  /** All the partition stats from the last superstep */
+  private final List<PartitionStats> allPartitionStatsList =
+      new ArrayList<PartitionStats>();
+  /** Handler for aggregators */
+  private MasterAggregatorHandler aggregatorHandler;
+  /** Master class */
+  private MasterCompute masterCompute;
+  /** IPC Client */
+  private MasterClient masterClient;
+  /** IPC Server */
+  private MasterServer masterServer;
+  /** Master info */
+  private MasterInfo masterInfo;
+  /** List of workers in current superstep */
+  private List<WorkerInfo> chosenWorkerInfoList = Lists.newArrayList();
+  /** Limit locality information added to each InputSplit znode */
+  private final int localityLimit = 5;
+  /** Observers over master lifecycle. */
+  private final MasterObserver[] observers;
+
+  // Per-Superstep Metrics
+  /** MasterCompute time */
+  private GiraphTimer masterComputeTimer;
+
+  /**
+   * Constructor for setting up the master.
+   *
+   * @param serverPortList ZooKeeper server port list
+   * @param sessionMsecTimeout Msecs to timeout connecting to ZooKeeper
+   * @param context Mapper context
+   * @param graphMapper Graph mapper
+   */
+  public BspServiceMaster(
+      String serverPortList,
+      int sessionMsecTimeout,
+      Mapper<?, ?, ?, ?>.Context context,
+      GraphMapper<I, V, E, M> graphMapper) {
+    super(serverPortList, sessionMsecTimeout, context, graphMapper);
+    workerWroteCheckpoint = new PredicateLock(context);
+    registerBspEvent(workerWroteCheckpoint);
+    superstepStateChanged = new PredicateLock(context);
+    registerBspEvent(superstepStateChanged);
+
+    ImmutableClassesGiraphConfiguration<I, V, E, M> conf = getConfiguration();
+
+    maxWorkers = conf.getInt(GiraphConstants.MAX_WORKERS, -1);
+    minWorkers = conf.getInt(GiraphConstants.MIN_WORKERS, -1);
+    minPercentResponded = conf.getFloat(
+        GiraphConstants.MIN_PERCENT_RESPONDED, 100.0f);
+    eventWaitMsecs = conf.getEventWaitMsecs();
+    maxSuperstepWaitMsecs = conf.getMaxMasterSuperstepWaitMsecs();
+    partitionLongTailMinPrint = conf.getInt(
+        GiraphConstants.PARTITION_LONG_TAIL_MIN_PRINT,
+        GiraphConstants.PARTITION_LONG_TAIL_MIN_PRINT_DEFAULT);
+    masterGraphPartitioner =
+        getGraphPartitionerFactory().createMasterGraphPartitioner();
+    observers = getConfiguration().createMasterObservers();
+
+    GiraphMetrics.get().addSuperstepResetObserver(this);
+    GiraphStats.init(context);
+  }
+
+  @Override
+  public void newSuperstep(SuperstepMetricsRegistry superstepMetrics) {
+    masterComputeTimer = new GiraphTimer(superstepMetrics,
+        "master-compute-call", TimeUnit.MILLISECONDS);
+  }
+
+  @Override
+  public void setJobState(ApplicationState state,
+      long applicationAttempt,
+      long desiredSuperstep) {
+    JSONObject jobState = new JSONObject();
+    try {
+      jobState.put(JSONOBJ_STATE_KEY, state.toString());
+      jobState.put(JSONOBJ_APPLICATION_ATTEMPT_KEY, applicationAttempt);
+      jobState.put(JSONOBJ_SUPERSTEP_KEY, desiredSuperstep);
+    } catch (JSONException e) {
+      throw new RuntimeException("setJobState: Couldn't put " +
+          state.toString());
+    }
+    if (LOG.isInfoEnabled()) {
+      LOG.info("setJobState: " + jobState.toString() + " on superstep " +
+          getSuperstep());
+    }
+    try {
+      getZkExt().createExt(masterJobStatePath + "/jobState",
+          jobState.toString().getBytes(),
+          Ids.OPEN_ACL_UNSAFE,
+          CreateMode.PERSISTENT_SEQUENTIAL,
+          true);
+    } catch (KeeperException.NodeExistsException e) {
+      throw new IllegalStateException(
+          "setJobState: Imposible that " +
+              masterJobStatePath + " already exists!", e);
+    } catch (KeeperException e) {
+      throw new IllegalStateException(
+          "setJobState: Unknown KeeperException for " +
+              masterJobStatePath, e);
+    } catch (InterruptedException e) {
+      throw new IllegalStateException(
+          "setJobState: Unknown InterruptedException for " +
+              masterJobStatePath, e);
+    }
+
+    if (state == ApplicationState.FAILED) {
+      failJob();
+    }
+  }
+
+  /**
+   * Common method for generating vertex/edge input splits.
+   *
+   * @param inputFormat The vertex/edge input format
+   * @param numWorkers Number of available workers
+   * @param inputSplitType Type of input splits (for logging purposes)
+   * @return List of input splits for the given format
+   */
+  private List<InputSplit> generateInputSplits(GiraphInputFormat inputFormat,
+                                               int numWorkers,
+                                               String inputSplitType) {
+    String logPrefix = "generate" + inputSplitType + "InputSplits";
+    List<InputSplit> splits;
+    try {
+      splits = inputFormat.getSplits(getContext(), numWorkers);
+    } catch (IOException e) {
+      throw new IllegalStateException(logPrefix + ": Got IOException", e);
+    } catch (InterruptedException e) {
+      throw new IllegalStateException(
+          logPrefix + ": Got InterruptedException", e);
+    }
+    float samplePercent =
+        getConfiguration().getFloat(
+            GiraphConstants.INPUT_SPLIT_SAMPLE_PERCENT,
+            GiraphConstants.INPUT_SPLIT_SAMPLE_PERCENT_DEFAULT);
+    if (samplePercent !=
+        GiraphConstants.INPUT_SPLIT_SAMPLE_PERCENT_DEFAULT) {
+      int lastIndex = (int) (samplePercent * splits.size() / 100f);
+      List<InputSplit> sampleSplits = splits.subList(0, lastIndex);
+      LOG.warn(logPrefix + ": Using sampling - Processing only " +
+          sampleSplits.size() + " instead of " + splits.size() +
+          " expected splits.");
+      return sampleSplits;
+    } else {
+      if (LOG.isInfoEnabled()) {
+        LOG.info(logPrefix + ": Got " + splits.size() +
+            " input splits for " + numWorkers + " workers");
+      }
+      return splits;
+    }
+  }
+
+  /**
+   * When there is no salvaging this job, fail it.
+   */
+  private void failJob() {
+    LOG.fatal("failJob: Killing job " + getJobId());
+    try {
+      @SuppressWarnings("deprecation")
+      org.apache.hadoop.mapred.JobClient jobClient =
+          new org.apache.hadoop.mapred.JobClient(
+              (org.apache.hadoop.mapred.JobConf)
+              getContext().getConfiguration());
+      @SuppressWarnings("deprecation")
+      JobID jobId = JobID.forName(getJobId());
+      RunningJob job = jobClient.getJob(jobId);
+      failureCleanup(null);
+      job.killJob();
+    } catch (IOException e) {
+      throw new RuntimeException(e);
+    }
+  }
+
+  /**
+   * Parse the {@link WorkerInfo} objects from a ZooKeeper path
+   * (and children).
+   *
+   * @param workerInfosPath Path where all the workers are children
+   * @param watch Watch or not?
+   * @return List of workers in that path
+   */
+  private List<WorkerInfo> getWorkerInfosFromPath(String workerInfosPath,
+      boolean watch) {
+    List<WorkerInfo> workerInfoList = new ArrayList<WorkerInfo>();
+    List<String> workerInfoPathList;
+    try {
+      workerInfoPathList =
+          getZkExt().getChildrenExt(workerInfosPath, watch, false, true);
+    } catch (KeeperException e) {
+      throw new IllegalStateException(
+          "getWorkers: Got KeeperException", e);
+    } catch (InterruptedException e) {
+      throw new IllegalStateException(
+          "getWorkers: Got InterruptedStateException", e);
+    }
+    for (String workerInfoPath : workerInfoPathList) {
+      WorkerInfo workerInfo = new WorkerInfo();
+      WritableUtils.readFieldsFromZnode(
+          getZkExt(), workerInfoPath, true, null, workerInfo);
+      workerInfoList.add(workerInfo);
+    }
+    return workerInfoList;
+  }
+
+  /**
+   * Get the healthy and unhealthy {@link WorkerInfo} objects for
+   * a superstep
+   *
+   * @param superstep superstep to check
+   * @param healthyWorkerInfoList filled in with current data
+   * @param unhealthyWorkerInfoList filled in with current data
+   */
+  private void getAllWorkerInfos(
+      long superstep,
+      List<WorkerInfo> healthyWorkerInfoList,
+      List<WorkerInfo> unhealthyWorkerInfoList) {
+    String healthyWorkerInfoPath =
+        getWorkerInfoHealthyPath(getApplicationAttempt(), superstep);
+    String unhealthyWorkerInfoPath =
+        getWorkerInfoUnhealthyPath(getApplicationAttempt(), superstep);
+
+    try {
+      getZkExt().createOnceExt(healthyWorkerInfoPath,
+          null,
+          Ids.OPEN_ACL_UNSAFE,
+          CreateMode.PERSISTENT,
+          true);
+    } catch (KeeperException e) {
+      throw new IllegalStateException("getWorkers: KeeperException", e);
+    } catch (InterruptedException e) {
+      throw new IllegalStateException("getWorkers: IllegalStateException", e);
+    }
+
+    try {
+      getZkExt().createOnceExt(unhealthyWorkerInfoPath,
+          null,
+          Ids.OPEN_ACL_UNSAFE,
+          CreateMode.PERSISTENT,
+          true);
+    } catch (KeeperException e) {
+      throw new IllegalStateException("getWorkers: KeeperException", e);
+    } catch (InterruptedException e) {
+      throw new IllegalStateException("getWorkers: IllegalStateException", e);
+    }
+
+    List<WorkerInfo> currentHealthyWorkerInfoList =
+        getWorkerInfosFromPath(healthyWorkerInfoPath, true);
+    List<WorkerInfo> currentUnhealthyWorkerInfoList =
+        getWorkerInfosFromPath(unhealthyWorkerInfoPath, false);
+
+    healthyWorkerInfoList.clear();
+    if (currentHealthyWorkerInfoList != null) {
+      for (WorkerInfo healthyWorkerInfo :
+        currentHealthyWorkerInfoList) {
+        healthyWorkerInfoList.add(healthyWorkerInfo);
+      }
+    }
+
+    unhealthyWorkerInfoList.clear();
+    if (currentUnhealthyWorkerInfoList != null) {
+      for (WorkerInfo unhealthyWorkerInfo :
+        currentUnhealthyWorkerInfoList) {
+        unhealthyWorkerInfoList.add(unhealthyWorkerInfo);
+      }
+    }
+  }
+
+  /**
+   * Check all the {@link WorkerInfo} objects to ensure that a minimum
+   * number of good workers exists out of the total that have reported.
+   *
+   * @return List of of healthy workers such that the minimum has been
+   *         met, otherwise null
+   */
+  private List<WorkerInfo> checkWorkers() {
+    boolean failJob = true;
+    long failWorkerCheckMsecs =
+        SystemTime.get().getMilliseconds() + maxSuperstepWaitMsecs;
+    List<WorkerInfo> healthyWorkerInfoList = new ArrayList<WorkerInfo>();
+    List<WorkerInfo> unhealthyWorkerInfoList = new ArrayList<WorkerInfo>();
+    int totalResponses = -1;
+    while (SystemTime.get().getMilliseconds() < failWorkerCheckMsecs) {
+      getContext().progress();
+      getAllWorkerInfos(
+          getSuperstep(), healthyWorkerInfoList, unhealthyWorkerInfoList);
+      totalResponses = healthyWorkerInfoList.size() +
+          unhealthyWorkerInfoList.size();
+      if ((totalResponses * 100.0f / maxWorkers) >=
+          minPercentResponded) {
+        failJob = false;
+        break;
+      }
+      getContext().setStatus(getGraphMapper().getMapFunctions() + " " +
+          "checkWorkers: Only found " +
+          totalResponses +
+          " responses of " + maxWorkers +
+          " needed to start superstep " +
+          getSuperstep());
+      if (getWorkerHealthRegistrationChangedEvent().waitMsecs(
+          eventWaitMsecs)) {
+        if (LOG.isDebugEnabled()) {
+          LOG.debug("checkWorkers: Got event that health " +
+              "registration changed, not using poll attempt");
+        }
+        getWorkerHealthRegistrationChangedEvent().reset();
+        continue;
+      }
+      if (LOG.isInfoEnabled()) {
+        LOG.info("checkWorkers: Only found " + totalResponses +
+            " responses of " + maxWorkers +
+            " needed to start superstep " +
+            getSuperstep() + ".  Reporting every" +
+            eventWaitMsecs + " msecs, " +
+            (failWorkerCheckMsecs - SystemTime.get().getMilliseconds()) +
+            " more msecs left before giving up.");
+        // Find the missing workers if there are only a few
+        if ((maxWorkers - totalResponses) <=
+            partitionLongTailMinPrint) {
+          logMissingWorkersOnSuperstep(healthyWorkerInfoList,
+              unhealthyWorkerInfoList);
+        }
+      }
+    }
+    if (failJob) {
+      LOG.error("checkWorkers: Did not receive enough processes in " +
+          "time (only " + totalResponses + " of " +
+          minWorkers + " required) after waiting " + maxSuperstepWaitMsecs +
+          "msecs).  This occurs if you do not have enough map tasks " +
+          "available simultaneously on your Hadoop instance to fulfill " +
+          "the number of requested workers.");
+      return null;
+    }
+
+    if (healthyWorkerInfoList.size() < minWorkers) {
+      LOG.error("checkWorkers: Only " + healthyWorkerInfoList.size() +
+          " available when " + minWorkers + " are required.");
+      logMissingWorkersOnSuperstep(healthyWorkerInfoList,
+          unhealthyWorkerInfoList);
+      return null;
+    }
+
+    getContext().setStatus(getGraphMapper().getMapFunctions() + " " +
+        "checkWorkers: Done - Found " + totalResponses +
+        " responses of " + maxWorkers + " needed to start superstep " +
+        getSuperstep());
+
+    return healthyWorkerInfoList;
+  }
+
+  /**
+   * Log info level of the missing workers on the superstep
+   *
+   * @param healthyWorkerInfoList Healthy worker list
+   * @param unhealthyWorkerInfoList Unhealthy worker list
+   */
+  private void logMissingWorkersOnSuperstep(
+      List<WorkerInfo> healthyWorkerInfoList,
+      List<WorkerInfo> unhealthyWorkerInfoList) {
+    if (LOG.isInfoEnabled()) {
+      Set<Integer> partitionSet = new TreeSet<Integer>();
+      for (WorkerInfo workerInfo : healthyWorkerInfoList) {
+        partitionSet.add(workerInfo.getTaskId());
+      }
+      for (WorkerInfo workerInfo : unhealthyWorkerInfoList) {
+        partitionSet.add(workerInfo.getTaskId());
+      }
+      for (int i = 1; i <= maxWorkers; ++i) {
+        if (partitionSet.contains(Integer.valueOf(i))) {
+          continue;
+        } else if (i == getTaskPartition()) {
+          continue;
+        } else {
+          LOG.info("logMissingWorkersOnSuperstep: No response from " +
+              "partition " + i + " (could be master)");
+        }
+      }
+    }
+  }
+
+  /**
+   * Common method for creating vertex/edge input splits.
+   *
+   * @param inputFormat The vertex/edge input format
+   * @param inputSplitPaths ZooKeeper input split paths
+   * @param inputSplitType Type of input split (for logging purposes)
+   * @return Number of splits. Returns -1 on failure to create
+   *         valid input splits.
+   */
+  private int createInputSplits(GiraphInputFormat inputFormat,
+                                InputSplitPaths inputSplitPaths,
+                                String inputSplitType) {
+    String logPrefix = "create" + inputSplitType + "InputSplits";
+    // Only the 'master' should be doing this.  Wait until the number of
+    // processes that have reported health exceeds the minimum percentage.
+    // If the minimum percentage is not met, fail the job.  Otherwise
+    // generate the input splits
+    String inputSplitsPath = inputSplitPaths.getPath();
+    try {
+      if (getZkExt().exists(inputSplitsPath, false) != null) {
+        LOG.info(inputSplitsPath + " already exists, no need to create");
+        return Integer.parseInt(
+            new String(getZkExt().getData(inputSplitsPath, false, null)));
+      }
+    } catch (KeeperException.NoNodeException e) {
+      if (LOG.isInfoEnabled()) {
+        LOG.info(logPrefix + ": Need to create the input splits at " +
+            inputSplitsPath);
+      }
+    } catch (KeeperException e) {
+      throw new IllegalStateException(logPrefix + ": KeeperException", e);
+    } catch (InterruptedException e) {
+      throw new IllegalStateException(logPrefix + ": InterrtupedException", e);
+    }
+
+    // When creating znodes, in case the master has already run, resume
+    // where it left off.
+    List<WorkerInfo> healthyWorkerInfoList = checkWorkers();
+    if (healthyWorkerInfoList == null) {
+      setJobState(ApplicationState.FAILED, -1, -1);
+      return -1;
+    }
+
+    // Note that the input splits may only be a sample if
+    // INPUT_SPLIT_SAMPLE_PERCENT is set to something other than 100
+    List<InputSplit> splitList = generateInputSplits(inputFormat,
+        healthyWorkerInfoList.size(), inputSplitType);
+
+    if (splitList.isEmpty()) {
+      LOG.fatal(logPrefix + ": Failing job due to 0 input splits, " +
+          "check input of " + inputFormat.getClass().getName() + "!");
+      getContext().setStatus("Failing job due to 0 input splits, " +
+          "check input of " + inputFormat.getClass().getName() + "!");
+      failJob();
+    }
+    if (healthyWorkerInfoList.size() > splitList.size()) {
+      LOG.warn(logPrefix + ": Number of inputSplits=" +
+          splitList.size() + " < " +
+          healthyWorkerInfoList.size() +
+          "=number of healthy processes, " +
+          "some workers will be not used");
+    }
+
+    // Write input splits to zookeeper in parallel
+    int inputSplitThreadCount = getConfiguration().getInt(
+        INPUT_SPLIT_THREAD_COUNT,
+        DEFAULT_INPUT_SPLIT_THREAD_COUNT);
+    if (LOG.isInfoEnabled()) {
+      LOG.info(logPrefix + ": Starting to write input split data " +
+          "to zookeeper with " + inputSplitThreadCount + " threads");
+    }
+    ExecutorService taskExecutor =
+        Executors.newFixedThreadPool(inputSplitThreadCount);
+    for (int i = 0; i < splitList.size(); ++i) {
+      InputSplit inputSplit = splitList.get(i);
+      taskExecutor.submit(new WriteInputSplit(inputSplit, inputSplitsPath, i));
+    }
+    taskExecutor.shutdown();
+    ProgressableUtils.awaitExecutorTermination(taskExecutor, getContext());
+    if (LOG.isInfoEnabled()) {
+      LOG.info(logPrefix + ": Done writing input split data to zookeeper");
+    }
+
+    // Let workers know they can start trying to load the input splits
+    try {
+      getZkExt().createExt(inputSplitPaths.getAllReadyPath(),
+          null,
+          Ids.OPEN_ACL_UNSAFE,
+          CreateMode.PERSISTENT,
+          false);
+    } catch (KeeperException.NodeExistsException e) {
+      LOG.info(logPrefix + ": Node " +
+          inputSplitPaths.getAllReadyPath() + " already exists.");
+    } catch (KeeperException e) {
+      throw new IllegalStateException(logPrefix + ": KeeperException", e);
+    } catch (InterruptedException e) {
+      throw new IllegalStateException(logPrefix + ": IllegalStateException", e);
+    }
+
+    return splitList.size();
+  }
+
+  @Override
+  public int createVertexInputSplits() {
+    // Short-circuit if there is no vertex input format
+    if (!getConfiguration().hasVertexInputFormat()) {
+      return 0;
+    }
+    VertexInputFormat<I, V, E, M> vertexInputFormat =
+        getConfiguration().createVertexInputFormat();
+    return createInputSplits(vertexInputFormat, vertexInputSplitsPaths,
+        "Vertex");
+  }
+
+  @Override
+  public int createEdgeInputSplits() {
+    // Short-circuit if there is no edge input format
+    if (!getConfiguration().hasEdgeInputFormat()) {
+      return 0;
+    }
+    EdgeInputFormat<I, E> edgeInputFormat =
+        getConfiguration().createEdgeInputFormat();
+    return createInputSplits(edgeInputFormat, edgeInputSplitsPaths,
+        "Edge");
+  }
+
+  @Override
+  public List<WorkerInfo> getWorkerInfoList() {
+    return chosenWorkerInfoList;
+  }
+
+  @Override
+  public MasterAggregatorHandler getAggregatorHandler() {
+    return aggregatorHandler;
+  }
+
+  /**
+   * Read the finalized checkpoint file and associated metadata files for the
+   * checkpoint.  Modifies the {@link PartitionOwner} objects to get the
+   * checkpoint prefixes.  It is an optimization to prevent all workers from
+   * searching all the files.  Also read in the aggregator data from the
+   * finalized checkpoint file and setting it.
+   *
+   * @param superstep Checkpoint set to examine.
+   * @param partitionOwners Partition owners to modify with checkpoint
+   *        prefixes
+   * @throws IOException
+   * @throws InterruptedException
+   * @throws KeeperException
+   */
+  private void prepareCheckpointRestart(
+    long superstep,
+    Collection<PartitionOwner> partitionOwners)
+    throws IOException, KeeperException, InterruptedException {
+    FileSystem fs = getFs();
+    List<Path> validMetadataPathList = new ArrayList<Path>();
+    String finalizedCheckpointPath =
+        getCheckpointBasePath(superstep) + CHECKPOINT_FINALIZED_POSTFIX;
+    DataInputStream finalizedStream =
+        fs.open(new Path(finalizedCheckpointPath));
+    GlobalStats globalStats = new GlobalStats();
+    globalStats.readFields(finalizedStream);
+    updateCounters(globalStats);
+    int prefixFileCount = finalizedStream.readInt();
+    for (int i = 0; i < prefixFileCount; ++i) {
+      String metadataFilePath =
+          finalizedStream.readUTF() + CHECKPOINT_METADATA_POSTFIX;
+      validMetadataPathList.add(new Path(metadataFilePath));
+    }
+
+    aggregatorHandler.readFields(finalizedStream);
+    masterCompute.readFields(finalizedStream);
+    finalizedStream.close();
+
+    Map<Integer, PartitionOwner> idOwnerMap =
+        new HashMap<Integer, PartitionOwner>();
+    for (PartitionOwner partitionOwner : partitionOwners) {
+      if (idOwnerMap.put(partitionOwner.getPartitionId(),
+          partitionOwner) != null) {
+        throw new IllegalStateException(
+            "prepareCheckpointRestart: Duplicate partition " +
+                partitionOwner);
+      }
+    }
+    // Reading the metadata files.  Simply assign each partition owner
+    // the correct file prefix based on the partition id.
+    for (Path metadataPath : validMetadataPathList) {
+      String checkpointFilePrefix = metadataPath.toString();
+      checkpointFilePrefix =
+          checkpointFilePrefix.substring(
+              0,
+              checkpointFilePrefix.length() -
+              CHECKPOINT_METADATA_POSTFIX.length());
+      DataInputStream metadataStream = fs.open(metadataPath);
+      long partitions = metadataStream.readInt();
+      for (long i = 0; i < partitions; ++i) {
+        long dataPos = metadataStream.readLong();
+        int partitionId = metadataStream.readInt();
+        PartitionOwner partitionOwner = idOwnerMap.get(partitionId);
+        if (LOG.isInfoEnabled()) {
+          LOG.info("prepareSuperstepRestart: File " + metadataPath +
+              " with position " + dataPos +
+              ", partition id = " + partitionId +
+              " assigned to " + partitionOwner);
+        }
+        partitionOwner.setCheckpointFilesPrefix(checkpointFilePrefix);
+      }
+      metadataStream.close();
+    }
+  }
+
+  @Override
+  public void setup() {
+    // Might have to manually load a checkpoint.
+    // In that case, the input splits are not set, they will be faked by
+    // the checkpoint files.  Each checkpoint file will be an input split
+    // and the input split
+
+    if (getRestartedSuperstep() != UNSET_SUPERSTEP) {
+      GiraphStats.getInstance().getSuperstepCounter().
+        setValue(getRestartedSuperstep());
+    }
+    for (MasterObserver observer : observers) {
+      observer.preApplication();
+      getContext().progress();
+    }
+  }
+
+  @Override
+  public boolean becomeMaster() {
+    // Create my bid to become the master, then try to become the worker
+    // or return false.
+    String myBid = null;
+    try {
+      myBid =
+          getZkExt().createExt(masterElectionPath +
+              "/" + getHostnamePartitionId(),
+              null,
+              Ids.OPEN_ACL_UNSAFE,
+              CreateMode.EPHEMERAL_SEQUENTIAL,
+              true);
+    } catch (KeeperException e) {
+      throw new IllegalStateException(
+          "becomeMaster: KeeperException", e);
+    } catch (InterruptedException e) {
+      throw new IllegalStateException(
+          "becomeMaster: IllegalStateException", e);
+    }
+    while (true) {
+      JSONObject jobState = getJobState();
+      try {
+        if ((jobState != null) &&
+            ApplicationState.valueOf(
+                jobState.getString(JSONOBJ_STATE_KEY)) ==
+                ApplicationState.FINISHED) {
+          LOG.info("becomeMaster: Job is finished, " +
+              "give up trying to be the master!");
+          isMaster = false;
+          return isMaster;
+        }
+      } catch (JSONException e) {
+        throw new IllegalStateException(
+            "becomeMaster: Couldn't get state from " + jobState, e);
+      }
+      try {
+        List<String> masterChildArr =
+            getZkExt().getChildrenExt(
+                masterElectionPath, true, true, true);
+        if (LOG.isInfoEnabled()) {
+          LOG.info("becomeMaster: First child is '" +
+              masterChildArr.get(0) + "' and my bid is '" +
+              myBid + "'");
+        }
+        if (masterChildArr.get(0).equals(myBid)) {
+          GiraphStats.getInstance().getCurrentMasterTaskPartition().
+              setValue(getTaskPartition());
+          masterCompute = getConfiguration().createMasterCompute();
+          aggregatorHandler = new MasterAggregatorHandler(getConfiguration(),
+              getContext());
+          aggregatorHandler.initialize(this);
+
+          masterInfo = new MasterInfo();
+          masterServer =
+              new NettyMasterServer(getConfiguration(), this, getContext());
+          masterInfo.setInetSocketAddress(masterServer.getMyAddress());
+          masterInfo.setTaskId(getTaskPartition());
+          masterClient =
+              new NettyMasterClient(getContext(), getConfiguration(), this);
+
+          if (LOG.isInfoEnabled()) {
+            LOG.info("becomeMaster: I am now the master!");
+          }
+          isMaster = true;
+          return isMaster;
+        }
+        LOG.info("becomeMaster: Waiting to become the master...");
+        getMasterElectionChildrenChangedEvent().waitForever();
+        getMasterElectionChildrenChangedEvent().reset();
+      } catch (KeeperException e) {
+        throw new IllegalStateException(
+            "becomeMaster: KeeperException", e);
+      } catch (InterruptedException e) {
+        throw new IllegalStateException(
+            "becomeMaster: IllegalStateException", e);
+      }
+    }
+  }
+
+  @Override
+  public MasterInfo getMasterInfo() {
+    return masterInfo;
+  }
+
+  /**
+   * Collect and aggregate the worker statistics for a particular superstep.
+   *
+   * @param superstep Superstep to aggregate on
+   * @return Global statistics aggregated on all worker statistics
+   */
+  private GlobalStats aggregateWorkerStats(long superstep) {
+    ImmutableClassesGiraphConfiguration conf = getConfiguration();
+
+    Class<? extends PartitionStats> partitionStatsClass =
+        masterGraphPartitioner.createPartitionStats().getClass();
+    GlobalStats globalStats = new GlobalStats();
+    // Get the stats from the all the worker selected nodes
+    String workerFinishedPath =
+        getWorkerFinishedPath(getApplicationAttempt(), superstep);
+    List<String> workerFinishedPathList = null;
+    try {
+      workerFinishedPathList =
+          getZkExt().getChildrenExt(
+              workerFinishedPath, false, false, true);
+    } catch (KeeperException e) {
+      throw new IllegalStateException(
+          "aggregateWorkerStats: KeeperException", e);
+    } catch (InterruptedException e) {
+      throw new IllegalStateException(
+          "aggregateWorkerStats: InterruptedException", e);
+    }
+
+    AggregatedMetrics aggregatedMetrics = new AggregatedMetrics();
+
+    allPartitionStatsList.clear();
+    for (String finishedPath : workerFinishedPathList) {
+      String hostnamePartitionId = FilenameUtils.getName(finishedPath);
+      JSONObject workerFinishedInfoObj = null;
+      try {
+        byte [] zkData =
+            getZkExt().getData(finishedPath, false, null);
+        workerFinishedInfoObj = new JSONObject(new String(zkData));
+        List<PartitionStats> statsList =
+            WritableUtils.readListFieldsFromByteArray(
+                Base64.decode(workerFinishedInfoObj.getString(
+                    JSONOBJ_PARTITION_STATS_KEY)),
+                    partitionStatsClass,
+                    conf);
+        for (PartitionStats partitionStats : statsList) {
+          globalStats.addPartitionStats(partitionStats);
+          allPartitionStatsList.add(partitionStats);
+        }
+        globalStats.addMessageCount(
+            workerFinishedInfoObj.getLong(
+                JSONOBJ_NUM_MESSAGES_KEY));
+        if (conf.metricsEnabled() &&
+            workerFinishedInfoObj.has(JSONOBJ_METRICS_KEY)) {
+          WorkerSuperstepMetrics workerMetrics = new WorkerSuperstepMetrics();
+          WritableUtils.readFieldsFromByteArray(
+              Base64.decode(
+                  workerFinishedInfoObj.getString(
+                      JSONOBJ_METRICS_KEY)),
+              workerMetrics);
+          aggregatedMetrics.add(workerMetrics, hostnamePartitionId);
+        }
+      } catch (JSONException e) {
+        throw new IllegalStateException(
+            "aggregateWorkerStats: JSONException", e);
+      } catch (KeeperException e) {
+        throw new IllegalStateException(
+            "aggregateWorkerStats: KeeperException", e);
+      } catch (InterruptedException e) {
+        throw new IllegalStateException(
+            "aggregateWorkerStats: InterruptedException", e);
+      } catch (IOException e) {
+        throw new IllegalStateException(
+            "aggregateWorkerStats: IOException", e);
+      }
+    }
+
+    if (conf.metricsEnabled()) {
+      aggregatedMetrics.print(superstep);
+    }
+
+    if (LOG.isInfoEnabled()) {
+      LOG.info("aggregateWorkerStats: Aggregation found " + globalStats +
+          " on superstep = " + getSuperstep());
+    }
+    return globalStats;
+  }
+
+  /**
+   * Finalize the checkpoint file prefixes by taking the chosen workers and
+   * writing them to a finalized file.  Also write out the master
+   * aggregated aggregator array from the previous superstep.
+   *
+   * @param superstep superstep to finalize
+   * @param chosenWorkerInfoList list of chosen workers that will be finalized
+   * @throws IOException
+   * @throws InterruptedException
+   * @throws KeeperException
+   */
+  private void finalizeCheckpoint(long superstep,
+    List<WorkerInfo> chosenWorkerInfoList)
+    throws IOException, KeeperException, InterruptedException {
+    Path finalizedCheckpointPath =
+        new Path(getCheckpointBasePath(superstep) +
+            CHECKPOINT_FINALIZED_POSTFIX);
+    try {
+      getFs().delete(finalizedCheckpointPath, false);
+    } catch (IOException e) {
+      LOG.warn("finalizedValidCheckpointPrefixes: Removed old file " +
+          finalizedCheckpointPath);
+    }
+
+    // Format:
+    // <global statistics>
+    // <number of files>
+    // <used file prefix 0><used file prefix 1>...
+    // <aggregator data>
+    // <masterCompute data>
+    FSDataOutputStream finalizedOutputStream =
+        getFs().create(finalizedCheckpointPath);
+
+    String superstepFinishedNode =
+        getSuperstepFinishedPath(getApplicationAttempt(), superstep - 1);
+    finalizedOutputStream.write(
+        getZkExt().getData(superstepFinishedNode, false, null));
+
+    finalizedOutputStream.writeInt(chosenWorkerInfoList.size());
+    for (WorkerInfo chosenWorkerInfo : chosenWorkerInfoList) {
+      String chosenWorkerInfoPrefix =
+          getCheckpointBasePath(superstep) + "." +
+              chosenWorkerInfo.getHostnameId();
+      finalizedOutputStream.writeUTF(chosenWorkerInfoPrefix);
+    }
+    aggregatorHandler.write(finalizedOutputStream);
+    masterCompute.write(finalizedOutputStream);
+    finalizedOutputStream.close();
+    lastCheckpointedSuperstep = superstep;
+    GiraphStats.getInstance().
+        getLastCheckpointedSuperstep().setValue(superstep);
+  }
+
+  /**
+   * Assign the partitions for this superstep.  If there are changes,
+   * the workers will know how to do the exchange.  If this was a restarted
+   * superstep, then make sure to provide information on where to find the
+   * checkpoint file.
+   *
+   * @param allPartitionStatsList All partition stats
+   * @param chosenWorkerInfoList All the chosen worker infos
+   * @param masterGraphPartitioner Master graph partitioner
+   */
+  private void assignPartitionOwners(
+      List<PartitionStats> allPartitionStatsList,
+      List<WorkerInfo> chosenWorkerInfoList,
+      MasterGraphPartitioner<I, V, E, M> masterGraphPartitioner) {
+    Collection<PartitionOwner> partitionOwners;
+    if (getSuperstep() == INPUT_SUPERSTEP ||
+        getSuperstep() == getRestartedSuperstep()) {
+      partitionOwners =
+          masterGraphPartitioner.createInitialPartitionOwners(
+              chosenWorkerInfoList, maxWorkers);
+      if (partitionOwners.isEmpty()) {
+        throw new IllegalStateException(
+            "assignAndExchangePartitions: No partition owners set");
+      }
+    } else {
+      partitionOwners =
+          masterGraphPartitioner.generateChangedPartitionOwners(
+              allPartitionStatsList,
+              chosenWorkerInfoList,
+              maxWorkers,
+              getSuperstep());
+
+      PartitionUtils.analyzePartitionStats(partitionOwners,
+          allPartitionStatsList);
+    }
+    checkPartitions(masterGraphPartitioner.getCurrentPartitionOwners());
+
+    // If restarted, prepare the checkpoint restart
+    if (getRestartedSuperstep() == getSuperstep()) {
+      try {
+        prepareCheckpointRestart(getSuperstep(), partitionOwners);
+      } catch (IOException e) {
+        throw new IllegalStateException(
+            "assignPartitionOwners: IOException on preparing", e);
+      } catch (KeeperException e) {
+        throw new IllegalStateException(
+            "assignPartitionOwners: KeeperException on preparing", e);
+      } catch (InterruptedException e) {
+        throw new IllegalStateException(
+            "assignPartitionOwners: InteruptedException on preparing",
+            e);
+      }
+    }
+
+    // There will be some exchange of partitions
+    if (!partitionOwners.isEmpty()) {
+      String vertexExchangePath =
+          getPartitionExchangePath(getApplicationAttempt(),
+              getSuperstep());
+      try {
+        getZkExt().createOnceExt(vertexExchangePath,
+            null,
+            Ids.OPEN_ACL_UNSAFE,
+            CreateMode.PERSISTENT,
+            true);
+      } catch (KeeperException e) {
+        throw new IllegalStateException(
+            "assignPartitionOwners: KeeperException creating " +
+                vertexExchangePath);
+      } catch (InterruptedException e) {
+        throw new IllegalStateException(
+            "assignPartitionOwners: InterruptedException creating " +
+                vertexExchangePath);
+      }
+    }
+
+    // Workers are waiting for these assignments
+    AddressesAndPartitionsWritable addressesAndPartitions =
+        new AddressesAndPartitionsWritable(masterInfo, chosenWorkerInfoList,
+            partitionOwners);
+    String addressesAndPartitionsPath =
+        getAddressesAndPartitionsPath(getApplicationAttempt(),
+            getSuperstep());
+    WritableUtils.writeToZnode(
+        getZkExt(),
+        addressesAndPartitionsPath,
+        -1,
+        addressesAndPartitions);
+  }
+
+  /**
+   * Check if partition ids are valid
+   *
+   * @param partitionOwners List of partition ids for current superstep
+   */
+  private void checkPartitions(Collection<PartitionOwner> partitionOwners) {
+    for (PartitionOwner partitionOwner : partitionOwners) {
+      int partitionId = partitionOwner.getPartitionId();
+      if (partitionId < 0 || partitionId >= partitionOwners.size()) {
+        throw new IllegalStateException("checkPartitions: " +
+            "Invalid partition id " + partitionId +
+            " - partition ids must be values from 0 to (numPartitions - 1)");
+      }
+    }
+  }
+
+  /**
+   * Check whether the workers chosen for this superstep are still alive
+   *
+   * @param chosenWorkerInfoHealthPath Path to the healthy workers in ZooKeeper
+   * @param chosenWorkerInfoList List of the healthy workers
+   * @return true if they are all alive, false otherwise.
+   * @throws InterruptedException
+   * @throws KeeperException
+   */
+  private boolean superstepChosenWorkerAlive(
+    String chosenWorkerInfoHealthPath,
+    List<WorkerInfo> chosenWorkerInfoList)
+    throws KeeperException, InterruptedException {
+    List<WorkerInfo> chosenWorkerInfoHealthyList =
+        getWorkerInfosFromPath(chosenWorkerInfoHealthPath, false);
+    Set<WorkerInfo> chosenWorkerInfoHealthySet =
+        new HashSet<WorkerInfo>(chosenWorkerInfoHealthyList);
+    boolean allChosenWorkersHealthy = true;
+    for (WorkerInfo chosenWorkerInfo : chosenWorkerInfoList) {
+      if (!chosenWorkerInfoHealthySet.contains(chosenWorkerInfo)) {
+        allChosenWorkersHealthy = false;
+        LOG.error("superstepChosenWorkerAlive: Missing chosen " +
+            "worker " + chosenWorkerInfo +
+            " on superstep " + getSuperstep());
+      }
+    }
+    return allChosenWorkersHealthy;
+  }
+
+  @Override
+  public void restartFromCheckpoint(long checkpoint) {
+    // Process:
+    // 1. Remove all old input split data
+    // 2. Increase the application attempt and set to the correct checkpoint
+    // 3. Send command to all workers to restart their tasks
+    try {
+      getZkExt().deleteExt(vertexInputSplitsPaths.getPath(), -1,
+          true);
+      getZkExt().deleteExt(edgeInputSplitsPaths.getPath(), -1,
+          true);
+    } catch (InterruptedException e) {
+      throw new RuntimeException(
+          "restartFromCheckpoint: InterruptedException", e);
+    } catch (KeeperException e) {
+      throw new RuntimeException(
+          "restartFromCheckpoint: KeeperException", e);
+    }
+    setApplicationAttempt(getApplicationAttempt() + 1);
+    setCachedSuperstep(checkpoint);
+    setRestartedSuperstep(checkpoint);
+    setJobState(ApplicationState.START_SUPERSTEP,
+        getApplicationAttempt(),
+        checkpoint);
+  }
+
+  /**
+   * Only get the finalized checkpoint files
+   */
+  public static class FinalizedCheckpointPathFilter implements PathFilter {
+    @Override
+    public boolean accept(Path path) {
+      return path.getName().endsWith(BspService.CHECKPOINT_FINALIZED_POSTFIX);
+    }
+  }
+
+  @Override
+  public long getLastGoodCheckpoint() throws IOException {
+    // Find the last good checkpoint if none have been written to the
+    // knowledge of this master
+    if (lastCheckpointedSuperstep == -1) {
+      try {
+        FileStatus[] fileStatusArray =
+            getFs().listStatus(new Path(checkpointBasePath),
+                new FinalizedCheckpointPathFilter());
+        if (fileStatusArray == null) {
+          return -1;
+        }
+        Arrays.sort(fileStatusArray);
+        lastCheckpointedSuperstep = getCheckpoint(
+            fileStatusArray[fileStatusArray.length - 1].getPath());
+        if (LOG.isInfoEnabled()) {
+          LOG.info("getLastGoodCheckpoint: Found last good checkpoint " +
+              lastCheckpointedSuperstep + " from " +
+              fileStatusArray[fileStatusArray.length - 1].
+                  getPath().toString());
+        }
+      } catch (IOException e) {
+        LOG.fatal("getLastGoodCheckpoint: No last good checkpoints can be " +
+            "found, killing the job.", e);
+        failJob();
+      }
+    }
+
+    return lastCheckpointedSuperstep;
+  }
+
+  /**
+   * Wait for a set of workers to signal that they are done with the
+   * barrier.
+   *
+   * @param finishedWorkerPath Path to where the workers will register their
+   *        hostname and id
+   * @param workerInfoList List of the workers to wait for
+   * @param event Event to wait on for a chance to be done.
+   * @return True if barrier was successful, false if there was a worker
+   *         failure
+   */
+  private boolean barrierOnWorkerList(String finishedWorkerPath,
+      List<WorkerInfo> workerInfoList,
+      BspEvent event) {
+    try {
+      getZkExt().createOnceExt(finishedWorkerPath,
+          null,
+          Ids.OPEN_ACL_UNSAFE,
+          CreateMode.PERSISTENT,
+          true);
+    } catch (KeeperException e) {
+      throw new IllegalStateException(
+          "barrierOnWorkerList: KeeperException - Couldn't create " +
+              finishedWorkerPath, e);
+    } catch (InterruptedException e) {
+      throw new IllegalStateException(
+          "barrierOnWorkerList: InterruptedException - Couldn't create " +
+              finishedWorkerPath, e);
+    }
+    List<String> hostnameIdList =
+        new ArrayList<String>(workerInfoList.size());
+    for (WorkerInfo workerInfo : workerInfoList) {
+      hostnameIdList.add(workerInfo.getHostnameId());
+    }
+    String workerInfoHealthyPath =
+        getWorkerInfoHealthyPath(getApplicationAttempt(), getSuperstep());
+    List<String> finishedHostnameIdList;
+    long nextInfoMillis = System.currentTimeMillis();
+    final int defaultTaskTimeoutMsec = 10 * 60 * 1000;  // from TaskTracker
+    final int taskTimeoutMsec = getContext().getConfiguration().getInt(
+        "mapred.task.timeout", defaultTaskTimeoutMsec);
+    while (true) {
+      try {
+        finishedHostnameIdList =
+            getZkExt().getChildrenExt(finishedWorkerPath,
+                true,
+                false,
+                false);
+      } catch (KeeperException e) {
+        throw new IllegalStateException(
+            "barrierOnWorkerList: KeeperException - Couldn't get " +
+                "children of " + finishedWorkerPath, e);
+      } catch (InterruptedException e) {
+        throw new IllegalStateException(
+            "barrierOnWorkerList: IllegalException - Couldn't get " +
+                "children of " + finishedWorkerPath, e);
+      }
+      if (LOG.isDebugEnabled()) {
+        LOG.debug("barrierOnWorkerList: Got finished worker list = " +
+            finishedHostnameIdList + ", size = " +
+            finishedHostnameIdList.size() +
+            ", worker list = " +
+            workerInfoList + ", size = " +
+            workerInfoList.size() +
+            " from " + finishedWorkerPath);
+      }
+
+      if (LOG.isInfoEnabled() &&
+          (System.currentTimeMillis() > nextInfoMillis)) {
+        nextInfoMillis = System.currentTimeMillis() + 30000;
+        LOG.info("barrierOnWorkerList: " +
+            finishedHostnameIdList.size() +
+            " out of " + workerInfoList.size() +
+            " workers finished on superstep " +
+            getSuperstep() + " on path " + finishedWorkerPath);
+        if (workerInfoList.size() - finishedHostnameIdList.size() <
+            MAX_PRINTABLE_REMAINING_WORKERS) {
+          Set<String> remainingWorkers = Sets.newHashSet(hostnameIdList);
+          remainingWorkers.removeAll(finishedHostnameIdList);
+          LOG.info("barrierOnWorkerList: Waiting on " + remainingWorkers);
+        }
+      }
+      getContext().setStatus(getGraphMapper().getMapFunctions() + " - " +
+          finishedHostnameIdList.size() +
+          " finished out of " +
+          workerInfoList.size() +
+          " on superstep " + getSuperstep());
+      if (finishedHostnameIdList.containsAll(hostnameIdList)) {
+        break;
+      }
+
+      // Wait for a signal or timeout
+      event.waitMsecs(taskTimeoutMsec / 2);
+      event.reset();
+      getContext().progress();
+
+      // Did a worker die?
+      try {
+        if ((getSuperstep() > 0) &&
+            !superstepChosenWorkerAlive(
+                workerInfoHealthyPath,
+                workerInfoList)) {
+          return false;
+        }
+      } catch (KeeperException e) {
+        throw new IllegalStateException(
+            "barrierOnWorkerList: KeeperException - " +
+                "Couldn't get " + workerInfoHealthyPath, e);
+      } catch (InterruptedException e) {
+        throw new IllegalStateException(
+            "barrierOnWorkerList: InterruptedException - " +
+                "Couldn't get " + workerInfoHealthyPath, e);
+      }
+    }
+
+    return true;
+  }
+
+  /**
+   * Clean up old superstep data from Zookeeper
+   *
+   * @param removeableSuperstep Supersteo to clean up
+   * @throws InterruptedException
+   */
+  private void cleanUpOldSuperstep(long removeableSuperstep) throws
+      InterruptedException {
+    if (!(getConfiguration().getBoolean(
+        GiraphConstants.KEEP_ZOOKEEPER_DATA,
+        GiraphConstants.KEEP_ZOOKEEPER_DATA_DEFAULT)) &&
+        (removeableSuperstep >= 0)) {
+      String oldSuperstepPath =
+          getSuperstepPath(getApplicationAttempt()) + "/" +
+              removeableSuperstep;
+      try {
+        if (LOG.isInfoEnabled()) {
+          LOG.info("coordinateSuperstep: Cleaning up old Superstep " +
+              oldSuperstepPath);
+        }
+        getZkExt().deleteExt(oldSuperstepPath,
+            -1,
+            true);
+      } catch (KeeperException.NoNodeException e) {
+        LOG.warn("coordinateBarrier: Already cleaned up " +
+            oldSuperstepPath);
+      } catch (KeeperException e) {
+        throw new IllegalStateException(
+            "coordinateSuperstep: KeeperException on " +
+                "finalizing checkpoint", e);
+      }
+    }
+  }
+
+  /**
+   * Coordinate the exchange of vertex/edge input splits among workers.
+   *
+   * @param inputSplitPaths Input split paths
+   * @param inputSplitEvents Input split events
+   * @param inputSplitsType Type of input splits (for logging purposes)
+   */
+  private void coordinateInputSplits(InputSplitPaths inputSplitPaths,
+                                     InputSplitEvents inputSplitEvents,
+                                     String inputSplitsType) {
+    // Coordinate the workers finishing sending their vertices/edges to the
+    // correct workers and signal when everything is done.
+    String logPrefix = "coordinate" + inputSplitsType + "InputSplits";
+    if (!barrierOnWorkerList(inputSplitPaths.getDonePath(),
+        chosenWorkerInfoList,
+        inputSplitEvents.getDoneStateChanged())) {
+      throw new IllegalStateException(logPrefix + ": Worker failed during " +
+          "input split (currently not supported)");
+    }
+    try {
+      getZkExt().createExt(inputSplitPaths.getAllDonePath(),
+          null,
+          Ids.OPEN_ACL_UNSAFE,
+          CreateMode.PERSISTENT,
+          false);
+    } catch (KeeperException.NodeExistsException e) {
+      LOG.info("coordinateInputSplits: Node " +
+          inputSplitPaths.getAllDonePath() + " already exists.");
+    } catch (KeeperException e) {
+      throw new IllegalStateException(logPrefix + ": KeeperException", e);
+    } catch (InterruptedException e) {
+      throw new IllegalStateException(logPrefix + ": IllegalStateException", e);
+    }
+  }
+
+  @Override
+  public SuperstepState coordinateSuperstep() throws
+  KeeperException, InterruptedException {
+    // 1. Get chosen workers and set up watches on them.
+    // 2. Assign partitions to the workers
+    //    (possibly reloading from a superstep)
+    // 3. Wait for all workers to complete
+    // 4. Collect and process aggregators
+    // 5. Create superstep finished node
+    // 6. If the checkpoint frequency is met, finalize the checkpoint
+
+    for (MasterObserver observer : observers) {
+      observer.preSuperstep();
+      getContext().progress();
+    }
+
+    chosenWorkerInfoList = checkWorkers();
+    if (chosenWorkerInfoList == null) {
+      LOG.fatal("coordinateSuperstep: Not enough healthy workers for " +
+          "superstep " + getSuperstep());
+      setJobState(ApplicationState.FAILED, -1, -1);
+    } else {
+      for (WorkerInfo workerInfo : chosenWorkerInfoList) {
+        String workerInfoHealthyPath =
+            getWorkerInfoHealthyPath(getApplicationAttempt(),
+                getSuperstep()) + "/" +
+                workerInfo.getHostnameId();
+        if (getZkExt().exists(workerInfoHealthyPath, true) == null) {
+          LOG.warn("coordinateSuperstep: Chosen worker " +
+              workerInfoHealthyPath +
+              " is no longer valid, failing superstep");
+        }
+      }
+    }
+
+    masterClient.openConnections();
+
+    GiraphStats.getInstance().
+        getCurrentWorkers().setValue(chosenWorkerInfoList.size());
+    assignPartitionOwners(allPartitionStatsList,
+        chosenWorkerInfoList,
+        masterGraphPartitioner);
+
+    // We need to finalize aggregators from previous superstep (send them to
+    // worker owners) after new worker assignments
+    if (getSuperstep() >= 0) {
+      aggregatorHandler.finishSuperstep(masterClient);
+    }
+
+    // Finalize the valid checkpoint file prefixes and possibly
+    // the aggregators.
+    if (checkpointFrequencyMet(getSuperstep())) {
+      String workerWroteCheckpointPath =
+          getWorkerWroteCheckpointPath(getApplicationAttempt(),
+              getSuperstep());
+      // first wait for all the workers to write their checkpoint data
+      if (!barrierOnWorkerList(workerWroteCheckpointPath,
+          chosenWorkerInfoList,
+          getWorkerWroteCheckpointEvent())) {
+        return SuperstepState.WORKER_FAILURE;
+      }
+      try {
+        finalizeCheckpoint(getSuperstep(), chosenWorkerInfoList);
+      } catch (IOException e) {
+        throw new IllegalStateException(
+            "coordinateSuperstep: IOException on finalizing checkpoint",
+            e);
+      }
+    }
+
+    if (getSuperstep() == INPUT_SUPERSTEP) {
+      if (getConfiguration().hasVertexInputFormat()) {
+        coordinateInputSplits(vertexInputSplitsPaths, vertexInputSplitsEvents,
+            "Vertex");
+      }
+      if (getConfiguration().hasEdgeInputFormat()) {
+        coordinateInputSplits(edgeInputSplitsPaths, edgeInputSplitsEvents,
+            "Edge");
+      }
+    }
+
+    String finishedWorkerPath =
+        getWorkerFinishedPath(getApplicationAttempt(), getSuperstep());
+    if (!barrierOnWorkerList(finishedWorkerPath,
+        chosenWorkerInfoList,
+        getSuperstepStateChangedEvent())) {
+      return SuperstepState.WORKER_FAILURE;
+    }
+
+    // Collect aggregator values, then run the master.compute() and
+    // finally save the aggregator values
+    aggregatorHandler.prepareSuperstep(masterClient);
+    runMasterCompute(getSuperstep());
+
+    // If the master is halted or all the vertices voted to halt and there
+    // are no more messages in the system, stop the computation
+    GlobalStats globalStats = aggregateWorkerStats(getSuperstep());
+    if (masterCompute.isHalted() ||
+        (globalStats.getFinishedVertexCount() ==
+        globalStats.getVertexCount() &&
+        globalStats.getMessageCount() == 0)) {
+      globalStats.setHaltComputation(true);
+    }
+
+    // Let everyone know the aggregated application state through the
+    // superstep finishing znode.
+    String superstepFinishedNode =
+        getSuperstepFinishedPath(getApplicationAttempt(), getSuperstep());
+    WritableUtils.writeToZnode(
+        getZkExt(), superstepFinishedNode, -1, globalStats);
+    updateCounters(globalStats);
+
+    cleanUpOldSuperstep(getSuperstep() - 1);
+    incrCachedSuperstep();
+    // Counter starts at zero, so no need to increment
+    if (getSuperstep() > 0) {
+      GiraphStats.getInstance().getSuperstepCounter().increment();
+    }
+    SuperstepState superstepState;
+    if (globalStats.getHaltComputation()) {
+      superstepState = SuperstepState.ALL_SUPERSTEPS_DONE;
+    } else {
+      superstepState = SuperstepState.THIS_SUPERSTEP_DONE;
+    }
+    aggregatorHandler.writeAggregators(getSuperstep(), superstepState);
+
+    return superstepState;
+  }
+
+  /**
+   * Run the master.compute() class
+   *
+   * @param superstep superstep for which to run the master.compute()
+   */
+  private void runMasterCompute(long superstep) {
+    // The master.compute() should run logically before the workers, so
+    // increase the superstep counter it uses by one
+    GraphState<I, V, E, M> graphState =
+        new GraphState<I, V, E, M>(superstep + 1,
+            GiraphStats.getInstance().getVertices().getValue(),
+            GiraphStats.getInstance().getEdges().getValue(),
+            getContext(), getGraphMapper(), null, null);
+    masterCompute.setGraphState(graphState);
+    if (superstep == INPUT_SUPERSTEP) {
+      try {
+        masterCompute.initialize();
+      } catch (InstantiationException e) {
+        LOG.fatal("runMasterCompute: Failed in instantiation", e);
+        throw new RuntimeException(
+            "runMasterCompute: Failed in instantiation", e);
+      } catch (IllegalAccessException e) {
+        LOG.fatal("runMasterCompute: Failed in access", e);
+        throw new RuntimeException(
+            "runMasterCompute: Failed in access", e);
+      }
+    }
+    GiraphTimerContext timerContext = masterComputeTimer.time();
+    masterCompute.compute();
+    timerContext.stop();
+  }
+
+  /**
+   * Need to clean up ZooKeeper nicely.  Make sure all the masters and workers
+   * have reported ending their ZooKeeper connections.
+   */
+  private void cleanUpZooKeeper() {
+    try {
+      getZkExt().createExt(cleanedUpPath,
+          null,
+          Ids.OPEN_ACL_UNSAFE,
+          CreateMode.PERSISTENT,
+          true);
+    } catch (KeeperException.NodeExistsException e) {
+      if (LOG.isInfoEnabled()) {
+        LOG.info("cleanUpZooKeeper: Node " + cleanedUpPath +
+            " already exists, no need to create.");
+      }
+    } catch (KeeperException e) {
+      throw new IllegalStateException(
+          "cleanupZooKeeper: Got KeeperException", e);
+    } catch (InterruptedException e) {
+      throw new IllegalStateException(
+          "cleanupZooKeeper: Got IllegalStateException", e);
+    }
+    // Need to wait for the number of workers and masters to complete
+    int maxTasks = BspInputFormat.getMaxTasks(getConfiguration());
+    if ((getGraphMapper().getMapFunctions() == MapFunctions.ALL) ||
+        (getGraphMapper().getMapFunctions() ==
+        MapFunctions.ALL_EXCEPT_ZOOKEEPER)) {
+      maxTasks *= 2;
+    }
+    List<String> cleanedUpChildrenList = null;
+    while (true) {
+      try {
+        cleanedUpChildrenList =
+            getZkExt().getChildrenExt(
+                cleanedUpPath, true, false, true);
+        if (LOG.isInfoEnabled()) {
+          LOG.info("cleanUpZooKeeper: Got " +
+              cleanedUpChildrenList.size() + " of " +
+              maxTasks  +  " desired children from " +
+              cleanedUpPath);
+        }
+        if (cleanedUpChildrenList.size() == maxTasks) {
+          break;
+        }
+        if (LOG.isInfoEnabled()) {
+          LOG.info("cleanedUpZooKeeper: Waiting for the " +
+              "children of " + cleanedUpPath +
+              " to change since only got " +
+              cleanedUpChildrenList.size() + " nodes.");
+        }
+      } catch (KeeperException e) {
+        // We are in the cleanup phase -- just log the error
+        LOG.error("cleanUpZooKeeper: Got KeeperException, " +
+            "but will continue", e);
+        return;
+      } catch (InterruptedException e) {
+        // We are in the cleanup phase -- just log the error
+        LOG.error("cleanUpZooKeeper: Got InterruptedException, " +
+            "but will continue", e);
+        return;
+      }
+
+      getCleanedUpChildrenChangedEvent().waitForever();
+      getCleanedUpChildrenChangedEvent().reset();
+    }
+
+    // At this point, all processes have acknowledged the cleanup,
+    // and the master can do any final cleanup if the ZooKeeper service was
+    // provided (not dynamically started) and we don't want to keep the data
+    try {
+      if (getConfiguration().getZookeeperList() != null &&
+          !getConfiguration().getBoolean(
+              GiraphConstants.KEEP_ZOOKEEPER_DATA,
+              GiraphConstants.KEEP_ZOOKEEPER_DATA_DEFAULT)) {
+        if (LOG.isInfoEnabled()) {
+          LOG.info("cleanupZooKeeper: Removing the following path " +
+              "and all children - " + basePath + " from ZooKeeper list " +
+              getConfiguration().getZookeeperList());
+        }
+        getZkExt().deleteExt(basePath, -1, true);
+      }
+    } catch (KeeperException e) {
+      LOG.error("cleanupZooKeeper: Failed to do cleanup of " +
+          basePath + " due to KeeperException", e);
+    } catch (InterruptedException e) {
+      LOG.error("cleanupZooKeeper: Failed to do cleanup of " +
+          basePath + " due to InterruptedException", e);
+    }
+  }
+
+  @Override
+  public void postApplication() {
+    for (MasterObserver observer : observers) {
+      observer.postApplication();
+      getContext().progress();
+    }
+  }
+
+  @Override
+  public void postSuperstep() {
+    for (MasterObserver observer : observers) {
+      observer.postSuperstep();
+      getContext().progress();
+    }
+  }
+
+  @Override
+  public void failureCleanup(Exception e) {
+    for (MasterObserver observer : observers) {
+      try {
+        observer.applicationFailed(e);
+        // CHECKSTYLE: stop IllegalCatchCheck
+      } catch (RuntimeException re) {
+        // CHECKSTYLE: resume IllegalCatchCheck
+        LOG.error(re.getClass().getName() + " from observer " +
+            observer.getClass().getName(), re);
+      }
+      getContext().progress();
+    }
+  }
+
+  @Override
+  public void cleanup() throws IOException {
+    // All master processes should denote they are done by adding special
+    // znode.  Once the number of znodes equals the number of partitions
+    // for workers and masters, the master will clean up the ZooKeeper
+    // znodes associated with this job.
+    String masterCleanedUpPath = cleanedUpPath  + "/" +
+        getTaskPartition() + MASTER_SUFFIX;
+    try {
+      String finalFinishedPath =
+          getZkExt().createExt(masterCleanedUpPath,
+              null,
+              Ids.OPEN_ACL_UNSAFE,
+              CreateMode.PERSISTENT,
+              true);
+      if (LOG.isInfoEnabled()) {
+        LOG.info("cleanup: Notifying master its okay to cleanup with " +
+            finalFinishedPath);
+      }
+    } catch (KeeperException.NodeExistsException e) {
+      if (LOG.isInfoEnabled()) {
+        LOG.info("cleanup: Couldn't create finished node '" +
+            masterCleanedUpPath);
+      }
+    } catch (KeeperException e) {
+      LOG.error("cleanup: Got KeeperException, continuing", e);
+    } catch (InterruptedException e) {
+      LOG.error("cleanup: Got InterruptedException, continuing", e);
+    }
+
+    if (isMaster) {
+      cleanUpZooKeeper();
+      // If desired, cleanup the checkpoint directory
+      if (getConfiguration().getBoolean(
+          GiraphConstants.CLEANUP_CHECKPOINTS_AFTER_SUCCESS,
+          GiraphConstants.CLEANUP_CHECKPOINTS_AFTER_SUCCESS_DEFAULT)) {
+        boolean success =
+            getFs().delete(new Path(checkpointBasePath), true);
+        if (LOG.isInfoEnabled()) {
+          LOG.info("cleanup: Removed HDFS checkpoint directory (" +
+              checkpointBasePath + ") with return = " +
+              success + " since the job " + getContext().getJobName() +
+              " succeeded ");
+        }
+      }
+      aggregatorHandler.close();
+
+      masterClient.closeConnections();
+      masterServer.close();
+    }
+
+    try {
+      getZkExt().close();
+    } catch (InterruptedException e) {
+      // cleanup phase -- just log the error
+      LOG.error("cleanup: Zookeeper failed to close", e);
+    }
+  }
+
+  /**
+   * Event that the master watches that denotes when a worker wrote checkpoint
+   *
+   * @return Event that denotes when a worker wrote checkpoint
+   */
+  public final BspEvent getWorkerWroteCheckpointEvent() {
+    return workerWroteCheckpoint;
+  }
+
+  /**
+   * Event that the master watches that denotes if a worker has done something
+   * that changes the state of a superstep (either a worker completed or died)
+   *
+   * @return Event that denotes a superstep state change
+   */
+  public final BspEvent getSuperstepStateChangedEvent() {
+    return superstepStateChanged;
+  }
+
+  /**
+   * Should this worker failure cause the current superstep to fail?
+   *
+   * @param failedWorkerPath Full path to the failed worker
+   */
+  private void checkHealthyWorkerFailure(String failedWorkerPath) {
+    if (getSuperstepFromPath(failedWorkerPath) < getSuperstep()) {
+      return;
+    }
+
+    Collection<PartitionOwner> partitionOwners =
+        masterGraphPartitioner.getCurrentPartitionOwners();
+    String hostnameId =
+        getHealthyHostnameIdFromPath(failedWorkerPath);
+    for (PartitionOwner partitionOwner : partitionOwners) {
+      WorkerInfo workerInfo = partitionOwner.getWorkerInfo();
+      WorkerInfo previousWorkerInfo =
+          partitionOwner.getPreviousWorkerInfo();
+      if (workerInfo.getHostnameId().equals(hostnameId) ||
+          ((previousWorkerInfo != null) &&
+              previousWorkerInfo.getHostnameId().equals(hostnameId))) {
+        LOG.warn("checkHealthyWorkerFailure: " +
+            "at least one healthy worker went down " +
+            "for superstep " + getSuperstep() + " - " +
+            hostnameId + ", will try to restart from " +
+            "checkpointed superstep " +
+            lastCheckpointedSuperstep);
+        superstepStateChanged.signal();
+      }
+    }
+  }
+
+  @Override
+  public boolean processEvent(WatchedEvent event) {
+    boolean foundEvent = false;
+    if (event.getPath().contains(WORKER_HEALTHY_DIR) &&
+        (event.getType() == EventType.NodeDeleted)) {
+      if (LOG.isDebugEnabled()) {
+        LOG.debug("processEvent: Healthy worker died (node deleted) " +
+            "in " + event.getPath());
+      }
+      checkHealthyWorkerFailure(event.getPath());
+      superstepStateChanged.signal();
+      foundEvent = true;
+    } else if (event.getPath().contains(WORKER_FINISHED_DIR) &&
+        event.getType() == EventType.NodeChildrenChanged) {
+      if (LOG.isDebugEnabled()) {
+        LOG.debug("processEvent: Worker finished (node change) " +
+            "event - superstepStateChanged signaled");
+      }
+      superstepStateChanged.signal();
+      foundEvent = true;
+    } else if (event.getPath().contains(WORKER_WROTE_CHECKPOINT_DIR) &&
+        event.getType() == EventType.NodeChildrenChanged) {
+      if (LOG.isDebugEnabled()) {
+        LOG.debug("processEvent: Worker wrote checkpoint (node change) " +
+            "event - workerWroteCheckpoint signaled");
+      }
+      workerWroteCheckpoint.signal();
+      foundEvent = true;
+    }
+
+    return foundEvent;
+  }
+
+  /**
+   * Set values of counters to match the ones from {@link GlobalStats}
+   *
+   * @param globalStats Global statistics which holds new counter values
+   */
+  private void updateCounters(GlobalStats globalStats) {
+    GiraphStats gs = GiraphStats.getInstance();
+    gs.getVertices().setValue(globalStats.getVertexCount());
+    gs.getFinishedVertexes().setValue(globalStats.getFinishedVertexCount());
+    gs.getEdges().setValue(globalStats.getEdgeCount());
+    gs.getSentMessages().setValue(globalStats.getMessageCount());
+  }
+
+  /**
+   * Task that writes a given input split to zookeeper.
+   * Upon failure call() throws an exception.
+   */
+  private class WriteInputSplit implements Callable<Void> {
+    /** Input split which we are going to write */
+    private final InputSplit inputSplit;
+    /** Input splits path */
+    private final String inputSplitsPath;
+    /** Index of the input split */
+    private final int index;
+
+    /**
+     * Constructor
+     *
+     * @param inputSplit Input split which we are going to write
+     * @param inputSplitsPath Input splits path
+     * @param index Index of the input split
+     */
+    public WriteInputSplit(InputSplit inputSplit,
+                           String inputSplitsPath,
+                           int index) {
+      this.inputSplit = inputSplit;
+      this.inputSplitsPath = inputSplitsPath;
+      this.index = index;
+    }
+
+    @Override
+    public Void call() {
+      String inputSplitPath = null;
+      try {
+        ByteArrayOutputStream byteArrayOutputStream =
+            new ByteArrayOutputStream();
+        DataOutput outputStream =
+            new DataOutputStream(byteArrayOutputStream);
+
+        String[] splitLocations = inputSplit.getLocations();
+        StringBuilder locations = null;
+        if (splitLocations != null) {
+          int splitListLength =
+              Math.min(splitLocations.length, localityLimit);
+          locations = new StringBuilder();
+          for (String location : splitLocations) {
+            locations.append(location)
+                .append(--splitListLength > 0 ? "\t" : "");
+          }
+        }
+        Text.writeString(outputStream,
+            locations == null ? "" : locations.toString());
+        Text.writeString(outputStream,
+            inputSplit.getClass().getName());
+        ((Writable) inputSplit).write(outputStream);
+        inputSplitPath = inputSplitsPath + "/" + index;
+        getZkExt().createExt(inputSplitPath,
+            byteArrayOutputStream.toByteArray(),
+            Ids.OPEN_ACL_UNSAFE,
+            CreateMode.PERSISTENT,
+            true);
+
+        if (LOG.isDebugEnabled()) {
+          LOG.debug("call: Created input split " +
+              "with index " + index + " serialized as " +
+              byteArrayOutputStream.toString());
+        }
+      } catch (KeeperException.NodeExistsException e) {
+        if (LOG.isInfoEnabled()) {
+          LOG.info("call: Node " +
+              inputSplitPath + " already exists.");
+        }
+      } catch (KeeperException e) {
+        throw new IllegalStateException(
+            "call: KeeperException", e);
+      } catch (InterruptedException e) {
+        throw new IllegalStateException(
+            "call: IllegalStateException", e);
+      } catch (IOException e) {
+        throw new IllegalStateException(
+            "call: IOException", e);
+      }
+      return null;
+    }
+  }
+}

http://git-wip-us.apache.org/repos/asf/giraph/blob/1684891e/giraph-core/src/main/java/org/apache/giraph/master/DefaultMasterCompute.java
----------------------------------------------------------------------
diff --git a/giraph-core/src/main/java/org/apache/giraph/master/DefaultMasterCompute.java b/giraph-core/src/main/java/org/apache/giraph/master/DefaultMasterCompute.java
new file mode 100644
index 0000000..bfb6f0e
--- /dev/null
+++ b/giraph-core/src/main/java/org/apache/giraph/master/DefaultMasterCompute.java
@@ -0,0 +1,50 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.giraph.master;
+
+import java.io.DataInput;
+import java.io.DataOutput;
+import java.io.IOException;
+
+/**
+ * A dumb implementation of {@link MasterCompute}. This is the default
+ * implementation when no MasterCompute is defined by the user. It does
+ * nothing.
+ */
+
+public class DefaultMasterCompute extends MasterCompute {
+
+  @Override
+  public void readFields(DataInput in) throws IOException {
+  }
+
+  @Override
+  public void write(DataOutput out) throws IOException {
+  }
+
+  @Override
+  public void compute() {
+  }
+
+  @Override
+  public void initialize() throws InstantiationException,
+      IllegalAccessException {
+  }
+
+}

http://git-wip-us.apache.org/repos/asf/giraph/blob/1684891e/giraph-core/src/main/java/org/apache/giraph/master/MasterAggregatorHandler.java
----------------------------------------------------------------------
diff --git a/giraph-core/src/main/java/org/apache/giraph/master/MasterAggregatorHandler.java b/giraph-core/src/main/java/org/apache/giraph/master/MasterAggregatorHandler.java
new file mode 100644
index 0000000..aab1183
--- /dev/null
+++ b/giraph-core/src/main/java/org/apache/giraph/master/MasterAggregatorHandler.java
@@ -0,0 +1,348 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.giraph.master;
+
+import org.apache.giraph.conf.ImmutableClassesGiraphConfiguration;
+import org.apache.giraph.bsp.SuperstepState;
+import org.apache.giraph.comm.MasterClient;
+import org.apache.giraph.comm.aggregators.AggregatorUtils;
+import org.apache.giraph.aggregators.Aggregator;
+import org.apache.giraph.aggregators.AggregatorWrapper;
+import org.apache.giraph.aggregators.AggregatorWriter;
+import org.apache.giraph.bsp.BspService;
+import org.apache.hadoop.io.Writable;
+import org.apache.hadoop.util.Progressable;
+import org.apache.log4j.Logger;
+
+import com.google.common.base.Function;
+import com.google.common.collect.Iterables;
+import com.google.common.collect.Maps;
+
+import java.io.DataInput;
+import java.io.DataOutput;
+import java.io.IOException;
+import java.util.AbstractMap;
+import java.util.Map;
+
+/** Handler for aggregators on master */
+public class MasterAggregatorHandler implements MasterAggregatorUsage,
+    Writable {
+  /** Class logger */
+  private static final Logger LOG =
+      Logger.getLogger(MasterAggregatorHandler.class);
+  /**
+   * Map of aggregators.
+   * This map is used to store final aggregated values received from worker
+   * owners, and also to read and write values provided during master.compute.
+   */
+  private final Map<String, AggregatorWrapper<Writable>> aggregatorMap =
+      Maps.newHashMap();
+  /** Aggregator writer */
+  private final AggregatorWriter aggregatorWriter;
+  /** Progressable used to report progress */
+  private final Progressable progressable;
+
+  /**
+   * Constructor
+   *
+   * @param conf         Giraph configuration
+   * @param progressable Progressable used for reporting progress
+   */
+  public MasterAggregatorHandler(
+      ImmutableClassesGiraphConfiguration<?, ?, ?, ?> conf,
+      Progressable progressable) {
+    this.progressable = progressable;
+    aggregatorWriter = conf.createAggregatorWriter();
+  }
+
+  @Override
+  public <A extends Writable> A getAggregatedValue(String name) {
+    AggregatorWrapper<? extends Writable> aggregator = aggregatorMap.get(name);
+    if (aggregator == null) {
+      return null;
+    } else {
+      return (A) aggregator.getPreviousAggregatedValue();
+    }
+  }
+
+  @Override
+  public <A extends Writable> void setAggregatedValue(String name, A value) {
+    AggregatorWrapper<? extends Writable> aggregator = aggregatorMap.get(name);
+    if (aggregator == null) {
+      throw new IllegalStateException(
+          "setAggregatedValue: Tried to set value of aggregator which wasn't" +
+              " registered " + name);
+    }
+    ((AggregatorWrapper<A>) aggregator).setCurrentAggregatedValue(value);
+  }
+
+  @Override
+  public <A extends Writable> boolean registerAggregator(String name,
+      Class<? extends Aggregator<A>> aggregatorClass) throws
+      InstantiationException, IllegalAccessException {
+    checkAggregatorName(name);
+    return registerAggregator(name, aggregatorClass, false) != null;
+  }
+
+  @Override
+  public <A extends Writable> boolean registerPersistentAggregator(String name,
+      Class<? extends Aggregator<A>> aggregatorClass) throws
+      InstantiationException, IllegalAccessException {
+    checkAggregatorName(name);
+    return registerAggregator(name, aggregatorClass, true) != null;
+  }
+
+  /**
+   * Make sure user doesn't use AggregatorUtils.SPECIAL_COUNT_AGGREGATOR as
+   * the name of aggregator. Throw an exception if he tries to use it.
+   *
+   * @param name Name of the aggregator to check.
+   */
+  private void checkAggregatorName(String name) {
+    if (name.equals(AggregatorUtils.SPECIAL_COUNT_AGGREGATOR)) {
+      throw new IllegalStateException("checkAggregatorName: " +
+          AggregatorUtils.SPECIAL_COUNT_AGGREGATOR +
+          " is not allowed for the name of aggregator");
+    }
+  }
+
+  /**
+   * Helper function for registering aggregators.
+   *
+   * @param name            Name of the aggregator
+   * @param aggregatorClass Class of the aggregator
+   * @param persistent      Whether aggregator is persistent or not
+   * @param <A>             Aggregated value type
+   * @return Newly registered aggregator or aggregator which was previously
+   *         created with selected name, if any
+   */
+  private <A extends Writable> AggregatorWrapper<A> registerAggregator
+  (String name, Class<? extends Aggregator<A>> aggregatorClass,
+      boolean persistent) throws InstantiationException,
+      IllegalAccessException {
+    AggregatorWrapper<A> aggregatorWrapper =
+        (AggregatorWrapper<A>) aggregatorMap.get(name);
+    if (aggregatorWrapper == null) {
+      aggregatorWrapper =
+          new AggregatorWrapper<A>(aggregatorClass, persistent);
+      aggregatorMap.put(name, (AggregatorWrapper<Writable>) aggregatorWrapper);
+    }
+    return aggregatorWrapper;
+  }
+
+  /**
+   * Prepare aggregators for current superstep
+   *
+   * @param masterClient IPC client on master
+   */
+  public void prepareSuperstep(MasterClient masterClient) {
+    if (LOG.isDebugEnabled()) {
+      LOG.debug("prepareSuperstep: Start preapring aggregators");
+    }
+    // prepare aggregators for master compute
+    for (AggregatorWrapper<Writable> aggregator : aggregatorMap.values()) {
+      if (aggregator.isPersistent()) {
+        aggregator.aggregateCurrent(aggregator.getPreviousAggregatedValue());
+      }
+      aggregator.setPreviousAggregatedValue(
+          aggregator.getCurrentAggregatedValue());
+      aggregator.resetCurrentAggregator();
+      progressable.progress();
+    }
+    if (LOG.isDebugEnabled()) {
+      LOG.debug("prepareSuperstep: Aggregators prepared");
+    }
+  }
+
+  /**
+   * Finalize aggregators for current superstep and share them with workers
+   *
+   * @param masterClient IPC client on master
+   */
+  public void finishSuperstep(MasterClient masterClient) {
+    if (LOG.isDebugEnabled()) {
+      LOG.debug("finishSuperstep: Start finishing aggregators");
+    }
+    for (AggregatorWrapper<Writable> aggregator : aggregatorMap.values()) {
+      if (aggregator.isChanged()) {
+        // if master compute changed the value, use the one he chose
+        aggregator.setPreviousAggregatedValue(
+            aggregator.getCurrentAggregatedValue());
+        // reset aggregator for the next superstep
+        aggregator.resetCurrentAggregator();
+      }
+      progressable.progress();
+    }
+
+    // send aggregators to their owners
+    // TODO: if aggregator owner and it's value didn't change,
+    //       we don't need to resend it
+    try {
+      for (Map.Entry<String, AggregatorWrapper<Writable>> entry :
+          aggregatorMap.entrySet()) {
+        masterClient.sendAggregator(entry.getKey(),
+            entry.getValue().getAggregatorClass(),
+            entry.getValue().getPreviousAggregatedValue());
+        progressable.progress();
+      }
+      masterClient.finishSendingAggregatedValues();
+    } catch (IOException e) {
+      throw new IllegalStateException("finishSuperstep: " +
+          "IOException occurred while sending aggregators", e);
+    }
+    if (LOG.isDebugEnabled()) {
+      LOG.debug("finishSuperstep: Aggregators finished");
+    }
+  }
+
+  /**
+   * Accept aggregated values sent by worker. Every aggregator will be sent
+   * only once, by its owner.
+   * We don't need to count the number of these requests because global
+   * superstep barrier will happen after workers ensure all requests of this
+   * type have been received and processed by master.
+   *
+   * @param aggregatedValuesInput Input in which aggregated values are
+   *                              written in the following format:
+   *                              number_of_aggregators
+   *                              name_1  value_1
+   *                              name_2  value_2
+   *                              ...
+   * @throws IOException
+   */
+  public void acceptAggregatedValues(
+      DataInput aggregatedValuesInput) throws IOException {
+    int numAggregators = aggregatedValuesInput.readInt();
+    for (int i = 0; i < numAggregators; i++) {
+      String aggregatorName = aggregatedValuesInput.readUTF();
+      AggregatorWrapper<Writable> aggregator =
+          aggregatorMap.get(aggregatorName);
+      if (aggregator == null) {
+        throw new IllegalStateException(
+            "acceptAggregatedValues: " +
+                "Master received aggregator which isn't registered: " +
+                aggregatorName);
+      }
+      Writable aggregatorValue = aggregator.createInitialValue();
+      aggregatorValue.readFields(aggregatedValuesInput);
+      aggregator.setCurrentAggregatedValue(aggregatorValue);
+      progressable.progress();
+    }
+    if (LOG.isDebugEnabled()) {
+      LOG.debug("acceptAggregatedValues: Accepted one set with " +
+          numAggregators + " aggregated values");
+    }
+  }
+
+  /**
+   * Write aggregators to {@link AggregatorWriter}
+   *
+   * @param superstep      Superstep which just finished
+   * @param superstepState State of the superstep which just finished
+   */
+  public void writeAggregators(long superstep, SuperstepState superstepState) {
+    try {
+      Iterable<Map.Entry<String, Writable>> iter =
+          Iterables.transform(
+              aggregatorMap.entrySet(),
+              new Function<Map.Entry<String, AggregatorWrapper<Writable>>,
+                  Map.Entry<String, Writable>>() {
+                @Override
+                public Map.Entry<String, Writable> apply(
+                    Map.Entry<String, AggregatorWrapper<Writable>> entry) {
+                  progressable.progress();
+                  return new AbstractMap.SimpleEntry<String,
+                      Writable>(entry.getKey(),
+                      entry.getValue().getPreviousAggregatedValue());
+                }
+              });
+      aggregatorWriter.writeAggregator(iter,
+          (superstepState == SuperstepState.ALL_SUPERSTEPS_DONE) ?
+              AggregatorWriter.LAST_SUPERSTEP : superstep);
+    } catch (IOException e) {
+      throw new IllegalStateException(
+          "coordinateSuperstep: IOException while " +
+              "writing aggregators data", e);
+    }
+  }
+
+  /**
+   * Initialize {@link AggregatorWriter}
+   *
+   * @param service BspService
+   */
+  public void initialize(BspService service) {
+    try {
+      aggregatorWriter.initialize(service.getContext(),
+          service.getApplicationAttempt());
+    } catch (IOException e) {
+      throw new IllegalStateException("initialize: " +
+          "Couldn't initialize aggregatorWriter", e);
+    }
+  }
+
+  /**
+   * Close {@link AggregatorWriter}
+   *
+   * @throws IOException
+   */
+  public void close() throws IOException {
+    aggregatorWriter.close();
+  }
+
+  @Override
+  public void write(DataOutput out) throws IOException {
+    out.writeInt(aggregatorMap.size());
+    for (Map.Entry<String, AggregatorWrapper<Writable>> entry :
+        aggregatorMap.entrySet()) {
+      out.writeUTF(entry.getKey());
+      out.writeUTF(entry.getValue().getAggregatorClass().getName());
+      out.writeBoolean(entry.getValue().isPersistent());
+      entry.getValue().getPreviousAggregatedValue().write(out);
+      progressable.progress();
+    }
+  }
+
+  @Override
+  public void readFields(DataInput in) throws IOException {
+    aggregatorMap.clear();
+    int numAggregators = in.readInt();
+    try {
+      for (int i = 0; i < numAggregators; i++) {
+        String aggregatorName = in.readUTF();
+        String aggregatorClassName = in.readUTF();
+        boolean isPersistent = in.readBoolean();
+        AggregatorWrapper<Writable> aggregator = registerAggregator(
+            aggregatorName,
+            AggregatorUtils.getAggregatorClass(aggregatorClassName),
+            isPersistent);
+        Writable value = aggregator.createInitialValue();
+        value.readFields(in);
+        aggregator.setPreviousAggregatedValue(value);
+        progressable.progress();
+      }
+    } catch (InstantiationException e) {
+      throw new IllegalStateException("readFields: " +
+          "InstantiationException occurred", e);
+    } catch (IllegalAccessException e) {
+      throw new IllegalStateException("readFields: " +
+          "IllegalAccessException occurred", e);
+    }
+  }
+}