You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@giraph.apache.org by ma...@apache.org on 2014/01/08 22:55:41 UTC

git commit: updated refs/heads/trunk to ca442de

Updated Branches:
  refs/heads/trunk 20f8df00e -> ca442deeb


GIRAPH-742: Worker task finishSuperstep status reporting the wrong superstep number (cmuchins via majakabiljo)


Project: http://git-wip-us.apache.org/repos/asf/giraph/repo
Commit: http://git-wip-us.apache.org/repos/asf/giraph/commit/ca442dee
Tree: http://git-wip-us.apache.org/repos/asf/giraph/tree/ca442dee
Diff: http://git-wip-us.apache.org/repos/asf/giraph/diff/ca442dee

Branch: refs/heads/trunk
Commit: ca442deeb5c63a8d7bcab796067b712de30e910c
Parents: 20f8df0
Author: Maja Kabiljo <ma...@fb.com>
Authored: Wed Jan 8 13:55:13 2014 -0800
Committer: Maja Kabiljo <ma...@fb.com>
Committed: Wed Jan 8 13:55:13 2014 -0800

----------------------------------------------------------------------
 CHANGELOG                                       |    3 +
 .../apache/giraph/worker/BspServiceWorker.java  |    2 +-
 .../giraph/worker/BspServiceWorker.java.orig    | 1535 ------------------
 3 files changed, 4 insertions(+), 1536 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/giraph/blob/ca442dee/CHANGELOG
----------------------------------------------------------------------
diff --git a/CHANGELOG b/CHANGELOG
index 645cd72..6d77b5b 100644
--- a/CHANGELOG
+++ b/CHANGELOG
@@ -1,6 +1,9 @@
 Giraph Change Log
 
 Release 1.1.0 - unreleased
+  GIRAPH-742: Worker task finishSuperstep status reporting the wrong superstep number
+  (cmuchins via majakabiljo)
+
   GIRAPH-810: Giraph should track aggregate statistics over lifetime of the computation
   (rvesse via majakabiljo)
 

http://git-wip-us.apache.org/repos/asf/giraph/blob/ca442dee/giraph-core/src/main/java/org/apache/giraph/worker/BspServiceWorker.java
----------------------------------------------------------------------
diff --git a/giraph-core/src/main/java/org/apache/giraph/worker/BspServiceWorker.java b/giraph-core/src/main/java/org/apache/giraph/worker/BspServiceWorker.java
index e62c081..bc29b03 100644
--- a/giraph-core/src/main/java/org/apache/giraph/worker/BspServiceWorker.java
+++ b/giraph-core/src/main/java/org/apache/giraph/worker/BspServiceWorker.java
@@ -789,11 +789,11 @@ public class BspServiceWorker<I extends WritableComparable,
           " with global stats " + globalStats + " and classes " +
           superstepClasses);
     }
-    incrCachedSuperstep();
     getContext().setStatus("finishSuperstep: (all workers done) " +
         getGraphTaskManager().getGraphFunctions().toString() +
         " - Attempt=" + getApplicationAttempt() +
         ", Superstep=" + getSuperstep());
+    incrCachedSuperstep();
     getConfiguration().updateSuperstepClasses(superstepClasses);
 
     return new FinishedSuperstepStats(

http://git-wip-us.apache.org/repos/asf/giraph/blob/ca442dee/giraph-core/src/main/java/org/apache/giraph/worker/BspServiceWorker.java.orig
----------------------------------------------------------------------
diff --git a/giraph-core/src/main/java/org/apache/giraph/worker/BspServiceWorker.java.orig b/giraph-core/src/main/java/org/apache/giraph/worker/BspServiceWorker.java.orig
deleted file mode 100644
index 9311fbd..0000000
--- a/giraph-core/src/main/java/org/apache/giraph/worker/BspServiceWorker.java.orig
+++ /dev/null
@@ -1,1535 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.giraph.worker;
-
-import org.apache.giraph.bsp.ApplicationState;
-import org.apache.giraph.bsp.BspService;
-import org.apache.giraph.bsp.CentralizedServiceWorker;
-import org.apache.giraph.comm.ServerData;
-import org.apache.giraph.comm.WorkerClient;
-import org.apache.giraph.comm.WorkerClientRequestProcessor;
-import org.apache.giraph.comm.WorkerServer;
-import org.apache.giraph.comm.aggregators.WorkerAggregatorRequestProcessor;
-import org.apache.giraph.comm.netty.NettyWorkerAggregatorRequestProcessor;
-import org.apache.giraph.comm.netty.NettyWorkerClient;
-import org.apache.giraph.comm.netty.NettyWorkerClientRequestProcessor;
-import org.apache.giraph.comm.netty.NettyWorkerServer;
-import org.apache.giraph.conf.GiraphConstants;
-import org.apache.giraph.conf.ImmutableClassesGiraphConfiguration;
-import org.apache.giraph.graph.AddressesAndPartitionsWritable;
-import org.apache.giraph.graph.FinishedSuperstepStats;
-import org.apache.giraph.graph.GlobalStats;
-import org.apache.giraph.graph.GraphTaskManager;
-import org.apache.giraph.graph.InputSplitEvents;
-import org.apache.giraph.graph.InputSplitPaths;
-import org.apache.giraph.graph.Vertex;
-import org.apache.giraph.graph.VertexEdgeCount;
-import org.apache.giraph.io.VertexOutputFormat;
-import org.apache.giraph.io.VertexWriter;
-import org.apache.giraph.io.superstep_output.SuperstepOutput;
-import org.apache.giraph.master.MasterInfo;
-import org.apache.giraph.master.SuperstepClasses;
-import org.apache.giraph.metrics.GiraphMetrics;
-import org.apache.giraph.metrics.GiraphTimer;
-import org.apache.giraph.metrics.GiraphTimerContext;
-import org.apache.giraph.metrics.ResetSuperstepMetricsObserver;
-import org.apache.giraph.metrics.SuperstepMetricsRegistry;
-import org.apache.giraph.metrics.WorkerSuperstepMetrics;
-import org.apache.giraph.partition.Partition;
-import org.apache.giraph.partition.PartitionExchange;
-import org.apache.giraph.partition.PartitionOwner;
-import org.apache.giraph.partition.PartitionStats;
-import org.apache.giraph.partition.PartitionStore;
-import org.apache.giraph.partition.WorkerGraphPartitioner;
-import org.apache.giraph.utils.CallableFactory;
-import org.apache.giraph.utils.JMapHistoDumper;
-import org.apache.giraph.utils.LoggerUtils;
-import org.apache.giraph.utils.MemoryUtils;
-import org.apache.giraph.utils.ProgressableUtils;
-import org.apache.giraph.utils.WritableUtils;
-import org.apache.giraph.zk.BspEvent;
-import org.apache.giraph.zk.PredicateLock;
-import org.apache.hadoop.fs.FSDataOutputStream;
-import org.apache.hadoop.fs.Path;
-import org.apache.hadoop.io.Writable;
-import org.apache.hadoop.io.WritableComparable;
-import org.apache.hadoop.mapreduce.Mapper;
-import org.apache.hadoop.mapreduce.OutputCommitter;
-import org.apache.log4j.Level;
-import org.apache.log4j.Logger;
-import org.apache.zookeeper.CreateMode;
-import org.apache.zookeeper.KeeperException;
-import org.apache.zookeeper.WatchedEvent;
-import org.apache.zookeeper.Watcher.Event.EventType;
-import org.apache.zookeeper.ZooDefs.Ids;
-import org.apache.zookeeper.data.Stat;
-import org.json.JSONArray;
-import org.json.JSONException;
-import org.json.JSONObject;
-
-import com.google.common.collect.Iterables;
-import com.google.common.collect.Lists;
-import net.iharder.Base64;
-
-import java.io.ByteArrayOutputStream;
-import java.io.DataInputStream;
-import java.io.DataOutput;
-import java.io.DataOutputStream;
-import java.io.IOException;
-import java.nio.charset.Charset;
-import java.util.ArrayList;
-import java.util.Collection;
-import java.util.Collections;
-import java.util.HashSet;
-import java.util.LinkedList;
-import java.util.List;
-import java.util.Map;
-import java.util.Map.Entry;
-import java.util.Queue;
-import java.util.Set;
-import java.util.concurrent.ArrayBlockingQueue;
-import java.util.concurrent.Callable;
-import java.util.concurrent.TimeUnit;
-
-/**
- * ZooKeeper-based implementation of {@link CentralizedServiceWorker}.
- *
- * @param <I> Vertex id
- * @param <V> Vertex data
- * @param <E> Edge data
- */
-@SuppressWarnings("rawtypes")
-public class BspServiceWorker<I extends WritableComparable,
-    V extends Writable, E extends Writable>
-    extends BspService<I, V, E>
-    implements CentralizedServiceWorker<I, V, E>,
-    ResetSuperstepMetricsObserver {
-  /** Name of gauge for time spent waiting on other workers */
-  public static final String TIMER_WAIT_REQUESTS = "wait-requests-us";
-  /** Class logger */
-  private static final Logger LOG = Logger.getLogger(BspServiceWorker.class);
-  /** My process health znode */
-  private String myHealthZnode;
-  /** Worker info */
-  private final WorkerInfo workerInfo;
-  /** Worker graph partitioner */
-  private final WorkerGraphPartitioner<I, V, E> workerGraphPartitioner;
-
-  /** IPC Client */
-  private final WorkerClient<I, V, E> workerClient;
-  /** IPC Server */
-  private final WorkerServer<I, V, E> workerServer;
-  /** Request processor for aggregator requests */
-  private final WorkerAggregatorRequestProcessor
-  workerAggregatorRequestProcessor;
-  /** Master info */
-  private MasterInfo masterInfo = new MasterInfo();
-  /** List of workers */
-  private List<WorkerInfo> workerInfoList = Lists.newArrayList();
-  /** Have the partition exchange children (workers) changed? */
-  private final BspEvent partitionExchangeChildrenChanged;
-
-  /** Worker Context */
-  private final WorkerContext workerContext;
-
-  /** Handler for aggregators */
-  private final WorkerAggregatorHandler aggregatorHandler;
-
-  /** Superstep output */
-  private SuperstepOutput<I, V, E> superstepOutput;
-
-  /** array of observers to call back to */
-  private final WorkerObserver[] observers;
-
-  // Per-Superstep Metrics
-  /** Timer for WorkerContext#postSuperstep */
-  private GiraphTimer wcPostSuperstepTimer;
-  /** Time spent waiting on requests to finish */
-  private GiraphTimer waitRequestsTimer;
-
-  /**
-   * Constructor for setting up the worker.
-   *
-   * @param serverPortList ZooKeeper server port list
-   * @param sessionMsecTimeout Msecs to timeout connecting to ZooKeeper
-   * @param context Mapper context
-   * @param graphTaskManager GraphTaskManager for this compute node
-   * @throws IOException
-   * @throws InterruptedException
-   */
-  public BspServiceWorker(
-    String serverPortList,
-    int sessionMsecTimeout,
-    Mapper<?, ?, ?, ?>.Context context,
-    GraphTaskManager<I, V, E> graphTaskManager)
-    throws IOException, InterruptedException {
-    super(serverPortList, sessionMsecTimeout, context, graphTaskManager);
-    ImmutableClassesGiraphConfiguration<I, V, E> conf = getConfiguration();
-    partitionExchangeChildrenChanged = new PredicateLock(context);
-    registerBspEvent(partitionExchangeChildrenChanged);
-    workerGraphPartitioner =
-        getGraphPartitionerFactory().createWorkerGraphPartitioner();
-    workerInfo = new WorkerInfo();
-    workerServer = new NettyWorkerServer<I, V, E>(conf, this, context);
-    workerInfo.setInetSocketAddress(workerServer.getMyAddress());
-    workerInfo.setTaskId(getTaskPartition());
-    workerClient = new NettyWorkerClient<I, V, E>(context, conf, this);
-
-    workerAggregatorRequestProcessor =
-        new NettyWorkerAggregatorRequestProcessor(getContext(), conf, this);
-
-    aggregatorHandler = new WorkerAggregatorHandler(this, conf, context);
-
-    workerContext = conf.createWorkerContext();
-    workerContext.setWorkerAggregatorUsage(aggregatorHandler);
-
-    superstepOutput = conf.createSuperstepOutput(context);
-
-    if (conf.isJMapHistogramDumpEnabled()) {
-      conf.addWorkerObserverClass(JMapHistoDumper.class);
-    }
-    observers = conf.createWorkerObservers();
-
-    GiraphMetrics.get().addSuperstepResetObserver(this);
-  }
-
-  @Override
-  public void newSuperstep(SuperstepMetricsRegistry superstepMetrics) {
-    waitRequestsTimer = new GiraphTimer(superstepMetrics,
-        TIMER_WAIT_REQUESTS, TimeUnit.MICROSECONDS);
-    wcPostSuperstepTimer = new GiraphTimer(superstepMetrics,
-        "worker-context-post-superstep", TimeUnit.MICROSECONDS);
-  }
-
-  @Override
-  public WorkerContext getWorkerContext() {
-    return workerContext;
-  }
-
-  @Override
-  public WorkerObserver[] getWorkerObservers() {
-    return observers;
-  }
-
-  @Override
-  public WorkerClient<I, V, E> getWorkerClient() {
-    return workerClient;
-  }
-
-  /**
-   * Intended to check the health of the node.  For instance, can it ssh,
-   * dmesg, etc. For now, does nothing.
-   * TODO: Make this check configurable by the user (i.e. search dmesg for
-   * problems).
-   *
-   * @return True if healthy (always in this case).
-   */
-  public boolean isHealthy() {
-    return true;
-  }
-
-  /**
-   * Load the vertices/edges from input slits. Do this until all the
-   * InputSplits have been processed.
-   * All workers will try to do as many InputSplits as they can.  The master
-   * will monitor progress and stop this once all the InputSplits have been
-   * loaded and check-pointed.  Keep track of the last input split path to
-   * ensure the input split cache is flushed prior to marking the last input
-   * split complete.
-   *
-   * Use one or more threads to do the loading.
-   *
-   * @param inputSplitPathList List of input split paths
-   * @param inputSplitsCallableFactory Factory for {@link InputSplitsCallable}s
-   * @return Statistics of the vertices and edges loaded
-   * @throws InterruptedException
-   * @throws KeeperException
-   */
-  private VertexEdgeCount loadInputSplits(
-      List<String> inputSplitPathList,
-      CallableFactory<VertexEdgeCount> inputSplitsCallableFactory)
-    throws KeeperException, InterruptedException {
-    VertexEdgeCount vertexEdgeCount = new VertexEdgeCount();
-    // Determine how many threads to use based on the number of input splits
-    int maxInputSplitThreads = (inputSplitPathList.size() - 1) /
-        getConfiguration().getMaxWorkers() + 1;
-    int numThreads = Math.min(getConfiguration().getNumInputSplitsThreads(),
-        maxInputSplitThreads);
-    if (LOG.isInfoEnabled()) {
-      LOG.info("loadInputSplits: Using " + numThreads + " thread(s), " +
-          "originally " + getConfiguration().getNumInputSplitsThreads() +
-          " threads(s) for " + inputSplitPathList.size() + " total splits.");
-    }
-
-    List<VertexEdgeCount> results =
-        ProgressableUtils.getResultsWithNCallables(inputSplitsCallableFactory,
-            numThreads, "load-%d", getContext());
-    for (VertexEdgeCount result : results) {
-      vertexEdgeCount = vertexEdgeCount.incrVertexEdgeCount(result);
-    }
-
-    workerClient.waitAllRequests();
-    return vertexEdgeCount;
-  }
-
-
-  /**
-   * Load the vertices from the user-defined
-   * {@link org.apache.giraph.io.VertexReader}
-   *
-   * @return Count of vertices and edges loaded
-   */
-  private VertexEdgeCount loadVertices() throws KeeperException,
-      InterruptedException {
-    List<String> inputSplitPathList =
-        getZkExt().getChildrenExt(vertexInputSplitsPaths.getPath(),
-            false, false, true);
-
-    InputSplitPathOrganizer splitOrganizer =
-        new InputSplitPathOrganizer(getZkExt(),
-            inputSplitPathList, getWorkerInfo().getHostname(),
-            getConfiguration().useInputSplitLocality());
-    InputSplitsHandler splitsHandler = new InputSplitsHandler(
-        splitOrganizer,
-        getZkExt(),
-        getContext(),
-        BspService.VERTEX_INPUT_SPLIT_RESERVED_NODE,
-        BspService.VERTEX_INPUT_SPLIT_FINISHED_NODE);
-
-    VertexInputSplitsCallableFactory<I, V, E> inputSplitsCallableFactory =
-        new VertexInputSplitsCallableFactory<I, V, E>(
-            getConfiguration().createWrappedVertexInputFormat(),
-            getContext(),
-            getConfiguration(),
-            this,
-            splitsHandler,
-            getZkExt());
-
-    return loadInputSplits(inputSplitPathList, inputSplitsCallableFactory);
-  }
-
-  /**
-   * Load the edges from the user-defined
-   * {@link org.apache.giraph.io.EdgeReader}.
-   *
-   * @return Number of edges loaded
-   */
-  private long loadEdges() throws KeeperException, InterruptedException {
-    List<String> inputSplitPathList =
-        getZkExt().getChildrenExt(edgeInputSplitsPaths.getPath(),
-            false, false, true);
-
-    InputSplitPathOrganizer splitOrganizer =
-        new InputSplitPathOrganizer(getZkExt(),
-            inputSplitPathList, getWorkerInfo().getHostname(),
-            getConfiguration().useInputSplitLocality());
-    InputSplitsHandler splitsHandler = new InputSplitsHandler(
-        splitOrganizer,
-        getZkExt(),
-        getContext(),
-        BspService.EDGE_INPUT_SPLIT_RESERVED_NODE,
-        BspService.EDGE_INPUT_SPLIT_FINISHED_NODE);
-
-    EdgeInputSplitsCallableFactory<I, V, E> inputSplitsCallableFactory =
-        new EdgeInputSplitsCallableFactory<I, V, E>(
-            getConfiguration().createWrappedEdgeInputFormat(),
-            getContext(),
-            getConfiguration(),
-            this,
-            splitsHandler,
-            getZkExt());
-
-    return loadInputSplits(inputSplitPathList, inputSplitsCallableFactory).
-        getEdgeCount();
-  }
-
-  @Override
-  public MasterInfo getMasterInfo() {
-    return masterInfo;
-  }
-
-  @Override
-  public List<WorkerInfo> getWorkerInfoList() {
-    return workerInfoList;
-  }
-
-  /**
-   * Ensure the input splits are ready for processing
-   *
-   * @param inputSplitPaths Input split paths
-   * @param inputSplitEvents Input split events
-   */
-  private void ensureInputSplitsReady(InputSplitPaths inputSplitPaths,
-                                      InputSplitEvents inputSplitEvents) {
-    while (true) {
-      Stat inputSplitsReadyStat;
-      try {
-        inputSplitsReadyStat = getZkExt().exists(
-            inputSplitPaths.getAllReadyPath(), true);
-      } catch (KeeperException e) {
-        throw new IllegalStateException("ensureInputSplitsReady: " +
-            "KeeperException waiting on input splits", e);
-      } catch (InterruptedException e) {
-        throw new IllegalStateException("ensureInputSplitsReady: " +
-            "InterruptedException waiting on input splits", e);
-      }
-      if (inputSplitsReadyStat != null) {
-        break;
-      }
-      inputSplitEvents.getAllReadyChanged().waitForever();
-      inputSplitEvents.getAllReadyChanged().reset();
-    }
-  }
-
-  /**
-   * Wait for all workers to finish processing input splits.
-   *
-   * @param inputSplitPaths Input split paths
-   * @param inputSplitEvents Input split events
-   */
-  private void waitForOtherWorkers(InputSplitPaths inputSplitPaths,
-                                   InputSplitEvents inputSplitEvents) {
-    String workerInputSplitsDonePath =
-        inputSplitPaths.getDonePath() + "/" +
-            getWorkerInfo().getHostnameId();
-    try {
-      getZkExt().createExt(workerInputSplitsDonePath,
-          null,
-          Ids.OPEN_ACL_UNSAFE,
-          CreateMode.PERSISTENT,
-          true);
-    } catch (KeeperException e) {
-      throw new IllegalStateException("waitForOtherWorkers: " +
-          "KeeperException creating worker done splits", e);
-    } catch (InterruptedException e) {
-      throw new IllegalStateException("waitForOtherWorkers: " +
-          "InterruptedException creating worker done splits", e);
-    }
-    while (true) {
-      Stat inputSplitsDoneStat;
-      try {
-        inputSplitsDoneStat =
-            getZkExt().exists(inputSplitPaths.getAllDonePath(),
-                true);
-      } catch (KeeperException e) {
-        throw new IllegalStateException("waitForOtherWorkers: " +
-            "KeeperException waiting on worker done splits", e);
-      } catch (InterruptedException e) {
-        throw new IllegalStateException("waitForOtherWorkers: " +
-            "InterruptedException waiting on worker done splits", e);
-      }
-      if (inputSplitsDoneStat != null) {
-        break;
-      }
-      inputSplitEvents.getAllDoneChanged().waitForever();
-      inputSplitEvents.getAllDoneChanged().reset();
-    }
-  }
-
-  @Override
-  public FinishedSuperstepStats setup() {
-    // Unless doing a restart, prepare for computation:
-    // 1. Start superstep INPUT_SUPERSTEP (no computation)
-    // 2. Wait until the INPUT_SPLIT_ALL_READY_PATH node has been created
-    // 3. Process input splits until there are no more.
-    // 4. Wait until the INPUT_SPLIT_ALL_DONE_PATH node has been created
-    // 5. Process any mutations deriving from add edge requests
-    // 6. Wait for superstep INPUT_SUPERSTEP to complete.
-    if (getRestartedSuperstep() != UNSET_SUPERSTEP) {
-      setCachedSuperstep(getRestartedSuperstep());
-      return new FinishedSuperstepStats(0, false, 0, 0, true);
-    }
-
-    JSONObject jobState = getJobState();
-    if (jobState != null) {
-      try {
-        if ((ApplicationState.valueOf(jobState.getString(JSONOBJ_STATE_KEY)) ==
-            ApplicationState.START_SUPERSTEP) &&
-            jobState.getLong(JSONOBJ_SUPERSTEP_KEY) ==
-            getSuperstep()) {
-          if (LOG.isInfoEnabled()) {
-            LOG.info("setup: Restarting from an automated " +
-                "checkpointed superstep " +
-                getSuperstep() + ", attempt " +
-                getApplicationAttempt());
-          }
-          setRestartedSuperstep(getSuperstep());
-          return new FinishedSuperstepStats(0, false, 0, 0, true);
-        }
-      } catch (JSONException e) {
-        throw new RuntimeException(
-            "setup: Failed to get key-values from " +
-                jobState.toString(), e);
-      }
-    }
-
-    // Add the partitions that this worker owns
-    Collection<? extends PartitionOwner> masterSetPartitionOwners =
-        startSuperstep();
-    workerGraphPartitioner.updatePartitionOwners(
-        getWorkerInfo(), masterSetPartitionOwners, getPartitionStore());
-
-    /*if[HADOOP_NON_SECURE]
-      workerClient.setup();
-    else[HADOOP_NON_SECURE]*/
-    workerClient.setup(getConfiguration().authenticate());
-    /*end[HADOOP_NON_SECURE]*/
-
-    // Initialize aggregator at worker side during setup.
-    // Do this just before vertex and edge loading.
-    aggregatorHandler.prepareSuperstep(workerAggregatorRequestProcessor);
-
-    VertexEdgeCount vertexEdgeCount;
-
-    if (getConfiguration().hasVertexInputFormat()) {
-      // Ensure the vertex InputSplits are ready for processing
-      ensureInputSplitsReady(vertexInputSplitsPaths, vertexInputSplitsEvents);
-      getContext().progress();
-      try {
-        vertexEdgeCount = loadVertices();
-      } catch (InterruptedException e) {
-        throw new IllegalStateException(
-            "setup: loadVertices failed with InterruptedException", e);
-      } catch (KeeperException e) {
-        throw new IllegalStateException(
-            "setup: loadVertices failed with KeeperException", e);
-      }
-      getContext().progress();
-    } else {
-      vertexEdgeCount = new VertexEdgeCount();
-    }
-
-    if (getConfiguration().hasEdgeInputFormat()) {
-      // Ensure the edge InputSplits are ready for processing
-      ensureInputSplitsReady(edgeInputSplitsPaths, edgeInputSplitsEvents);
-      getContext().progress();
-      try {
-        vertexEdgeCount = vertexEdgeCount.incrVertexEdgeCount(0, loadEdges());
-      } catch (InterruptedException e) {
-        throw new IllegalStateException(
-            "setup: loadEdges failed with InterruptedException", e);
-      } catch (KeeperException e) {
-        throw new IllegalStateException(
-            "setup: loadEdges failed with KeeperException", e);
-      }
-      getContext().progress();
-    }
-
-    if (LOG.isInfoEnabled()) {
-      LOG.info("setup: Finally loaded a total of " + vertexEdgeCount);
-    }
-
-    if (getConfiguration().hasVertexInputFormat()) {
-      // Workers wait for each other to finish, coordinated by master
-      waitForOtherWorkers(vertexInputSplitsPaths, vertexInputSplitsEvents);
-    }
-
-    if (getConfiguration().hasEdgeInputFormat()) {
-      // Workers wait for each other to finish, coordinated by master
-      waitForOtherWorkers(edgeInputSplitsPaths, edgeInputSplitsEvents);
-    }
-
-    // Create remaining partitions owned by this worker.
-    for (PartitionOwner partitionOwner : masterSetPartitionOwners) {
-      if (partitionOwner.getWorkerInfo().equals(getWorkerInfo()) &&
-          !getPartitionStore().hasPartition(
-              partitionOwner.getPartitionId())) {
-        Partition<I, V, E> partition =
-            getConfiguration().createPartition(
-                partitionOwner.getPartitionId(), getContext());
-        getPartitionStore().addPartition(partition);
-      }
-    }
-
-    if (getConfiguration().hasEdgeInputFormat()) {
-      // Move edges from temporary storage to their source vertices.
-      getServerData().getEdgeStore().moveEdgesToVertices();
-    }
-
-    // Generate the partition stats for the input superstep and process
-    // if necessary
-    List<PartitionStats> partitionStatsList =
-        new ArrayList<PartitionStats>();
-    for (Integer partitionId : getPartitionStore().getPartitionIds()) {
-      Partition<I, V, E> partition =
-          getPartitionStore().getPartition(partitionId);
-      PartitionStats partitionStats =
-          new PartitionStats(partition.getId(),
-              partition.getVertexCount(),
-              0,
-              partition.getEdgeCount(),
-              0, 0);
-      partitionStatsList.add(partitionStats);
-      getPartitionStore().putPartition(partition);
-    }
-    workerGraphPartitioner.finalizePartitionStats(
-        partitionStatsList, getPartitionStore());
-
-    return finishSuperstep(partitionStatsList);
-  }
-
-  /**
-   * Register the health of this worker for a given superstep
-   *
-   * @param superstep Superstep to register health on
-   */
-  private void registerHealth(long superstep) {
-    JSONArray hostnamePort = new JSONArray();
-    hostnamePort.put(getHostname());
-
-    hostnamePort.put(workerInfo.getPort());
-
-    String myHealthPath = null;
-    if (isHealthy()) {
-      myHealthPath = getWorkerInfoHealthyPath(getApplicationAttempt(),
-          getSuperstep());
-    } else {
-      myHealthPath = getWorkerInfoUnhealthyPath(getApplicationAttempt(),
-          getSuperstep());
-    }
-    myHealthPath = myHealthPath + "/" + workerInfo.getHostnameId();
-    try {
-      myHealthZnode = getZkExt().createExt(
-          myHealthPath,
-          WritableUtils.writeToByteArray(workerInfo),
-          Ids.OPEN_ACL_UNSAFE,
-          CreateMode.EPHEMERAL,
-          true);
-    } catch (KeeperException.NodeExistsException e) {
-      LOG.warn("registerHealth: myHealthPath already exists (likely " +
-          "from previous failure): " + myHealthPath +
-          ".  Waiting for change in attempts " +
-          "to re-join the application");
-      getApplicationAttemptChangedEvent().waitForever();
-      if (LOG.isInfoEnabled()) {
-        LOG.info("registerHealth: Got application " +
-            "attempt changed event, killing self");
-      }
-      throw new IllegalStateException(
-          "registerHealth: Trying " +
-              "to get the new application attempt by killing self", e);
-    } catch (KeeperException e) {
-      throw new IllegalStateException("Creating " + myHealthPath +
-          " failed with KeeperException", e);
-    } catch (InterruptedException e) {
-      throw new IllegalStateException("Creating " + myHealthPath +
-          " failed with InterruptedException", e);
-    }
-    if (LOG.isInfoEnabled()) {
-      LOG.info("registerHealth: Created my health node for attempt=" +
-          getApplicationAttempt() + ", superstep=" +
-          getSuperstep() + " with " + myHealthZnode +
-          " and workerInfo= " + workerInfo);
-    }
-  }
-
-  /**
-   * Do this to help notify the master quicker that this worker has failed.
-   */
-  private void unregisterHealth() {
-    LOG.error("unregisterHealth: Got failure, unregistering health on " +
-        myHealthZnode + " on superstep " + getSuperstep());
-    try {
-      getZkExt().deleteExt(myHealthZnode, -1, false);
-    } catch (InterruptedException e) {
-      throw new IllegalStateException(
-          "unregisterHealth: InterruptedException - Couldn't delete " +
-              myHealthZnode, e);
-    } catch (KeeperException e) {
-      throw new IllegalStateException(
-          "unregisterHealth: KeeperException - Couldn't delete " +
-              myHealthZnode, e);
-    }
-  }
-
-  @Override
-  public void failureCleanup() {
-    unregisterHealth();
-  }
-
-  @Override
-  public Collection<? extends PartitionOwner> startSuperstep() {
-    // Algorithm:
-    // 1. Communication service will combine message from previous
-    //    superstep
-    // 2. Register my health for the next superstep.
-    // 3. Wait until the partition assignment is complete and get it
-    // 4. Get the aggregator values from the previous superstep
-    if (getSuperstep() != INPUT_SUPERSTEP) {
-      workerServer.prepareSuperstep();
-    }
-
-    registerHealth(getSuperstep());
-
-    String addressesAndPartitionsPath =
-        getAddressesAndPartitionsPath(getApplicationAttempt(),
-            getSuperstep());
-    AddressesAndPartitionsWritable addressesAndPartitions =
-        new AddressesAndPartitionsWritable(
-            workerGraphPartitioner.createPartitionOwner().getClass());
-    try {
-      while (getZkExt().exists(addressesAndPartitionsPath, true) ==
-          null) {
-        getAddressesAndPartitionsReadyChangedEvent().waitForever();
-        getAddressesAndPartitionsReadyChangedEvent().reset();
-      }
-      WritableUtils.readFieldsFromZnode(
-          getZkExt(),
-          addressesAndPartitionsPath,
-          false,
-          null,
-          addressesAndPartitions);
-    } catch (KeeperException e) {
-      throw new IllegalStateException(
-          "startSuperstep: KeeperException getting assignments", e);
-    } catch (InterruptedException e) {
-      throw new IllegalStateException(
-          "startSuperstep: InterruptedException getting assignments", e);
-    }
-
-    workerInfoList.clear();
-    workerInfoList = addressesAndPartitions.getWorkerInfos();
-    masterInfo = addressesAndPartitions.getMasterInfo();
-
-    if (LOG.isInfoEnabled()) {
-      LOG.info("startSuperstep: " + masterInfo);
-      LOG.info("startSuperstep: Ready for computation on superstep " +
-          getSuperstep() + " since worker " +
-          "selection and vertex range assignments are done in " +
-          addressesAndPartitionsPath);
-    }
-
-    getContext().setStatus("startSuperstep: " +
-        getGraphTaskManager().getGraphFunctions().toString() +
-        " - Attempt=" + getApplicationAttempt() +
-        ", Superstep=" + getSuperstep());
-    return addressesAndPartitions.getPartitionOwners();
-  }
-
-  @Override
-  public FinishedSuperstepStats finishSuperstep(
-      List<PartitionStats> partitionStatsList) {
-    // This barrier blocks until success (or the master signals it to
-    // restart).
-    //
-    // Master will coordinate the barriers and aggregate "doneness" of all
-    // the vertices.  Each worker will:
-    // 1. Ensure that the requests are complete
-    // 2. Execute user postSuperstep() if necessary.
-    // 3. Save aggregator values that are in use.
-    // 4. Report the statistics (vertices, edges, messages, etc.)
-    //    of this worker
-    // 5. Let the master know it is finished.
-    // 6. Wait for the master's superstep info, and check if done
-    waitForRequestsToFinish();
-
-    getGraphTaskManager().notifyFinishedCommunication();
-
-    long workerSentMessages = 0;
-    long workerSentMessageBytes = 0;
-    long localVertices = 0;
-    for (PartitionStats partitionStats : partitionStatsList) {
-      workerSentMessages += partitionStats.getMessagesSentCount();
-      workerSentMessageBytes += partitionStats.getMessageBytesSentCount();
-      localVertices += partitionStats.getVertexCount();
-    }
-
-    if (getSuperstep() != INPUT_SUPERSTEP) {
-      postSuperstepCallbacks();
-    }
-
-    aggregatorHandler.finishSuperstep(workerAggregatorRequestProcessor);
-
-    if (LOG.isInfoEnabled()) {
-      LOG.info("finishSuperstep: Superstep " + getSuperstep() +
-          ", messages = " + workerSentMessages + " " +
-          ", message bytes = " + workerSentMessageBytes + " , " +
-          MemoryUtils.getRuntimeMemoryStats());
-    }
-
-    writeFinshedSuperstepInfoToZK(partitionStatsList,
-      workerSentMessages, workerSentMessageBytes);
-
-    LoggerUtils.setStatusAndLog(getContext(), LOG, Level.INFO,
-        "finishSuperstep: (waiting for rest " +
-            "of workers) " +
-            getGraphTaskManager().getGraphFunctions().toString() +
-            " - Attempt=" + getApplicationAttempt() +
-            ", Superstep=" + getSuperstep());
-
-    String superstepFinishedNode =
-        getSuperstepFinishedPath(getApplicationAttempt(), getSuperstep());
-
-    waitForOtherWorkers(superstepFinishedNode);
-
-    GlobalStats globalStats = new GlobalStats();
-    SuperstepClasses superstepClasses = new SuperstepClasses();
-    WritableUtils.readFieldsFromZnode(
-        getZkExt(), superstepFinishedNode, false, null, globalStats,
-        superstepClasses);
-    if (LOG.isInfoEnabled()) {
-      LOG.info("finishSuperstep: Completed superstep " + getSuperstep() +
-          " with global stats " + globalStats + " and classes " +
-          superstepClasses);
-    }
-    incrCachedSuperstep();
-    getContext().setStatus("finishSuperstep: (all workers done) " +
-        getGraphTaskManager().getGraphFunctions().toString() +
-        " - Attempt=" + getApplicationAttempt() +
-        ", Superstep=" + getSuperstep());
-    getConfiguration().updateSuperstepClasses(superstepClasses);
-
-    return new FinishedSuperstepStats(
-        localVertices,
-        globalStats.getHaltComputation(),
-        globalStats.getVertexCount(),
-        globalStats.getEdgeCount(),
-        false);
-  }
-
-  /**
-   * Handle post-superstep callbacks
-   */
-  private void postSuperstepCallbacks() {
-    GiraphTimerContext timerContext = wcPostSuperstepTimer.time();
-    getWorkerContext().postSuperstep();
-    timerContext.stop();
-    getContext().progress();
-
-    for (WorkerObserver obs : getWorkerObservers()) {
-      obs.postSuperstep(getSuperstep());
-      getContext().progress();
-    }
-  }
-
-  /**
-   * Wait for all the requests to finish.
-   */
-  private void waitForRequestsToFinish() {
-    if (LOG.isInfoEnabled()) {
-      LOG.info("finishSuperstep: Waiting on all requests, superstep " +
-          getSuperstep() + " " +
-          MemoryUtils.getRuntimeMemoryStats());
-    }
-    GiraphTimerContext timerContext = waitRequestsTimer.time();
-    workerClient.waitAllRequests();
-    timerContext.stop();
-  }
-
-  /**
-   * Wait for all the other Workers to finish the superstep.
-   *
-   * @param superstepFinishedNode ZooKeeper path to wait on.
-   */
-  private void waitForOtherWorkers(String superstepFinishedNode) {
-    try {
-      while (getZkExt().exists(superstepFinishedNode, true) == null) {
-        getSuperstepFinishedEvent().waitForever();
-        getSuperstepFinishedEvent().reset();
-      }
-    } catch (KeeperException e) {
-      throw new IllegalStateException(
-          "finishSuperstep: Failed while waiting for master to " +
-              "signal completion of superstep " + getSuperstep(), e);
-    } catch (InterruptedException e) {
-      throw new IllegalStateException(
-          "finishSuperstep: Failed while waiting for master to " +
-              "signal completion of superstep " + getSuperstep(), e);
-    }
-  }
-
-  /**
-   * Write finished superstep info to ZooKeeper.
-   *
-   * @param partitionStatsList List of partition stats from superstep.
-   * @param workerSentMessages Number of messages sent in superstep.
-   * @param workerSentMessageBytes Number of message bytes sent
-   *                               in superstep.
-   */
-  private void writeFinshedSuperstepInfoToZK(
-      List<PartitionStats> partitionStatsList, long workerSentMessages,
-      long workerSentMessageBytes) {
-    Collection<PartitionStats> finalizedPartitionStats =
-        workerGraphPartitioner.finalizePartitionStats(
-            partitionStatsList, getPartitionStore());
-    List<PartitionStats> finalizedPartitionStatsList =
-        new ArrayList<PartitionStats>(finalizedPartitionStats);
-    byte[] partitionStatsBytes =
-        WritableUtils.writeListToByteArray(finalizedPartitionStatsList);
-    WorkerSuperstepMetrics metrics = new WorkerSuperstepMetrics();
-    metrics.readFromRegistry();
-    byte[] metricsBytes = WritableUtils.writeToByteArray(metrics);
-
-    JSONObject workerFinishedInfoObj = new JSONObject();
-    try {
-      workerFinishedInfoObj.put(JSONOBJ_PARTITION_STATS_KEY,
-          Base64.encodeBytes(partitionStatsBytes));
-      workerFinishedInfoObj.put(JSONOBJ_NUM_MESSAGES_KEY, workerSentMessages);
-      workerFinishedInfoObj.put(JSONOBJ_NUM_MESSAGE_BYTES_KEY,
-        workerSentMessageBytes);
-      workerFinishedInfoObj.put(JSONOBJ_METRICS_KEY,
-          Base64.encodeBytes(metricsBytes));
-    } catch (JSONException e) {
-      throw new RuntimeException(e);
-    }
-
-    String finishedWorkerPath =
-        getWorkerFinishedPath(getApplicationAttempt(), getSuperstep()) +
-        "/" + getHostnamePartitionId();
-    try {
-      getZkExt().createExt(finishedWorkerPath,
-          workerFinishedInfoObj.toString().getBytes(Charset.defaultCharset()),
-          Ids.OPEN_ACL_UNSAFE,
-          CreateMode.PERSISTENT,
-          true);
-    } catch (KeeperException.NodeExistsException e) {
-      LOG.warn("finishSuperstep: finished worker path " +
-          finishedWorkerPath + " already exists!");
-    } catch (KeeperException e) {
-      throw new IllegalStateException("Creating " + finishedWorkerPath +
-          " failed with KeeperException", e);
-    } catch (InterruptedException e) {
-      throw new IllegalStateException("Creating " + finishedWorkerPath +
-          " failed with InterruptedException", e);
-    }
-  }
-
-  /**
-   * Save the vertices using the user-defined VertexOutputFormat from our
-   * vertexArray based on the split.
-   *
-   * @param numLocalVertices Number of local vertices
-   * @throws InterruptedException
-   */
-  private void saveVertices(long numLocalVertices) throws IOException,
-      InterruptedException {
-    if (getConfiguration().getVertexOutputFormatClass() == null) {
-      LOG.warn("saveVertices: " +
-          GiraphConstants.VERTEX_OUTPUT_FORMAT_CLASS +
-          " not specified -- there will be no saved output");
-      return;
-    }
-    if (getConfiguration().doOutputDuringComputation()) {
-      if (LOG.isInfoEnabled()) {
-        LOG.info("saveVertices: The option for doing output during " +
-            "computation is selected, so there will be no saving of the " +
-            "output in the end of application");
-      }
-      return;
-    }
-
-    final int numPartitions = getPartitionStore().getNumPartitions();
-    int numThreads = Math.min(getConfiguration().getNumOutputThreads(),
-        numPartitions);
-    LoggerUtils.setStatusAndLog(getContext(), LOG, Level.INFO,
-        "saveVertices: Starting to save " + numLocalVertices + " vertices " +
-            "using " + numThreads + " threads");
-    final VertexOutputFormat<I, V, E> vertexOutputFormat =
-        getConfiguration().createWrappedVertexOutputFormat();
-
-    final Queue<Integer> partitionIdQueue =
-        (numPartitions == 0) ? new LinkedList<Integer>() :
-            new ArrayBlockingQueue<Integer>(numPartitions);
-    Iterables.addAll(partitionIdQueue, getPartitionStore().getPartitionIds());
-
-    CallableFactory<Void> callableFactory = new CallableFactory<Void>() {
-      @Override
-      public Callable<Void> newCallable(int callableId) {
-        return new Callable<Void>() {
-          @Override
-          public Void call() throws Exception {
-            VertexWriter<I, V, E> vertexWriter =
-                vertexOutputFormat.createVertexWriter(getContext());
-            vertexWriter.setConf(getConfiguration());
-            vertexWriter.initialize(getContext());
-            long nextPrintVertices = 0;
-            long nextPrintMsecs = System.currentTimeMillis() + 15000;
-            int partitionIndex = 0;
-            int numPartitions = getPartitionStore().getNumPartitions();
-            while (!partitionIdQueue.isEmpty()) {
-              Integer partitionId = partitionIdQueue.poll();
-              if (partitionId == null) {
-                break;
-              }
-
-              Partition<I, V, E> partition =
-                  getPartitionStore().getPartition(partitionId);
-              long verticesWritten = 0;
-              for (Vertex<I, V, E> vertex : partition) {
-                vertexWriter.writeVertex(vertex);
-                ++verticesWritten;
-
-                // Update status at most every 250k vertices or 15 seconds
-                if (verticesWritten > nextPrintVertices &&
-                    System.currentTimeMillis() > nextPrintMsecs) {
-                  LoggerUtils.setStatusAndLog(getContext(), LOG, Level.INFO,
-                      "saveVertices: Saved " + verticesWritten + " out of " +
-                          partition.getVertexCount() + " partition vertices, " +
-                          "on partition " + partitionIndex +
-                          " out of " + numPartitions);
-                  nextPrintMsecs = System.currentTimeMillis() + 15000;
-                  nextPrintVertices = verticesWritten + 250000;
-                }
-              }
-              getPartitionStore().putPartition(partition);
-              ++partitionIndex;
-            }
-            vertexWriter.close(getContext()); // the temp results are saved now
-            return null;
-          }
-        };
-      }
-    };
-    ProgressableUtils.getResultsWithNCallables(callableFactory, numThreads,
-        "save-vertices-%d", getContext());
-
-    LoggerUtils.setStatusAndLog(getContext(), LOG, Level.INFO,
-      "saveVertices: Done saving vertices.");
-    // YARN: must complete the commit the "task" output, Hadoop isn't there.
-    if (getConfiguration().isPureYarnJob() &&
-      getConfiguration().getVertexOutputFormatClass() != null) {
-      try {
-        OutputCommitter outputCommitter =
-          vertexOutputFormat.getOutputCommitter(getContext());
-        if (outputCommitter.needsTaskCommit(getContext())) {
-          LoggerUtils.setStatusAndLog(getContext(), LOG, Level.INFO,
-            "OutputCommitter: committing task output.");
-          // transfer from temp dirs to "task commit" dirs to prep for
-          // the master's OutputCommitter#commitJob(context) call to finish.
-          outputCommitter.commitTask(getContext());
-        }
-      } catch (InterruptedException ie) {
-        LOG.error("Interrupted while attempting to obtain " +
-          "OutputCommitter.", ie);
-      } catch (IOException ioe) {
-        LOG.error("Master task's attempt to commit output has " +
-          "FAILED.", ioe);
-      }
-    }
-  }
-
-  @Override
-  public void cleanup(FinishedSuperstepStats finishedSuperstepStats)
-    throws IOException, InterruptedException {
-    workerClient.closeConnections();
-    setCachedSuperstep(getSuperstep() - 1);
-    saveVertices(finishedSuperstepStats.getLocalVertexCount());
-    getPartitionStore().shutdown();
-    // All worker processes should denote they are done by adding special
-    // znode.  Once the number of znodes equals the number of partitions
-    // for workers and masters, the master will clean up the ZooKeeper
-    // znodes associated with this job.
-    String workerCleanedUpPath = cleanedUpPath  + "/" +
-        getTaskPartition() + WORKER_SUFFIX;
-    try {
-      String finalFinishedPath =
-          getZkExt().createExt(workerCleanedUpPath,
-              null,
-              Ids.OPEN_ACL_UNSAFE,
-              CreateMode.PERSISTENT,
-              true);
-      if (LOG.isInfoEnabled()) {
-        LOG.info("cleanup: Notifying master its okay to cleanup with " +
-            finalFinishedPath);
-      }
-    } catch (KeeperException.NodeExistsException e) {
-      if (LOG.isInfoEnabled()) {
-        LOG.info("cleanup: Couldn't create finished node '" +
-            workerCleanedUpPath);
-      }
-    } catch (KeeperException e) {
-      // Cleaning up, it's okay to fail after cleanup is successful
-      LOG.error("cleanup: Got KeeperException on notification " +
-          "to master about cleanup", e);
-    } catch (InterruptedException e) {
-      // Cleaning up, it's okay to fail after cleanup is successful
-      LOG.error("cleanup: Got InterruptedException on notification " +
-          "to master about cleanup", e);
-    }
-    try {
-      getZkExt().close();
-    } catch (InterruptedException e) {
-      // cleanup phase -- just log the error
-      LOG.error("cleanup: Zookeeper failed to close with " + e);
-    }
-
-    if (getConfiguration().metricsEnabled()) {
-      GiraphMetrics.get().dumpToStream(System.err);
-    }
-
-    // Preferably would shut down the service only after
-    // all clients have disconnected (or the exceptions on the
-    // client side ignored).
-    workerServer.close();
-  }
-
-  @Override
-  public void storeCheckpoint() throws IOException {
-    LoggerUtils.setStatusAndLog(getContext(), LOG, Level.INFO,
-        "storeCheckpoint: Starting checkpoint " +
-            getGraphTaskManager().getGraphFunctions().toString() +
-            " - Attempt=" + getApplicationAttempt() +
-            ", Superstep=" + getSuperstep());
-
-    // Algorithm:
-    // For each partition, dump vertices and messages
-    Path metadataFilePath =
-        new Path(getCheckpointBasePath(getSuperstep()) + "." +
-            getHostnamePartitionId() +
-            CHECKPOINT_METADATA_POSTFIX);
-    Path verticesFilePath =
-        new Path(getCheckpointBasePath(getSuperstep()) + "." +
-            getHostnamePartitionId() +
-            CHECKPOINT_VERTICES_POSTFIX);
-    Path validFilePath =
-        new Path(getCheckpointBasePath(getSuperstep()) + "." +
-            getHostnamePartitionId() +
-            CHECKPOINT_VALID_POSTFIX);
-
-    // Remove these files if they already exist (shouldn't though, unless
-    // of previous failure of this worker)
-    if (getFs().delete(validFilePath, false)) {
-      LOG.warn("storeCheckpoint: Removed valid file " +
-          validFilePath);
-    }
-    if (getFs().delete(metadataFilePath, false)) {
-      LOG.warn("storeCheckpoint: Removed metadata file " +
-          metadataFilePath);
-    }
-    if (getFs().delete(verticesFilePath, false)) {
-      LOG.warn("storeCheckpoint: Removed file " + verticesFilePath);
-    }
-
-    FSDataOutputStream verticesOutputStream =
-        getFs().create(verticesFilePath);
-    ByteArrayOutputStream metadataByteStream = new ByteArrayOutputStream();
-    DataOutput metadataOutput = new DataOutputStream(metadataByteStream);
-    for (Integer partitionId : getPartitionStore().getPartitionIds()) {
-      Partition<I, V, E> partition =
-          getPartitionStore().getPartition(partitionId);
-      long startPos = verticesOutputStream.getPos();
-      partition.write(verticesOutputStream);
-      // write messages
-      getServerData().getCurrentMessageStore().writePartition(
-          verticesOutputStream, partition.getId());
-      // Write the metadata for this partition
-      // Format:
-      // <index count>
-      //   <index 0 start pos><partition id>
-      //   <index 1 start pos><partition id>
-      metadataOutput.writeLong(startPos);
-      metadataOutput.writeInt(partition.getId());
-      if (LOG.isDebugEnabled()) {
-        LOG.debug("storeCheckpoint: Vertex file starting " +
-            "offset = " + startPos + ", length = " +
-            (verticesOutputStream.getPos() - startPos) +
-            ", partition = " + partition.toString());
-      }
-      getPartitionStore().putPartition(partition);
-      getContext().progress();
-    }
-    // Metadata is buffered and written at the end since it's small and
-    // needs to know how many partitions this worker owns
-    FSDataOutputStream metadataOutputStream =
-        getFs().create(metadataFilePath);
-    metadataOutputStream.writeInt(getPartitionStore().getNumPartitions());
-    metadataOutputStream.write(metadataByteStream.toByteArray());
-    metadataOutputStream.close();
-    verticesOutputStream.close();
-    if (LOG.isInfoEnabled()) {
-      LOG.info("storeCheckpoint: Finished metadata (" +
-          metadataFilePath + ") and vertices (" + verticesFilePath + ").");
-    }
-
-    getFs().createNewFile(validFilePath);
-
-    // Notify master that checkpoint is stored
-    String workerWroteCheckpoint =
-        getWorkerWroteCheckpointPath(getApplicationAttempt(),
-            getSuperstep()) + "/" + getHostnamePartitionId();
-    try {
-      getZkExt().createExt(workerWroteCheckpoint,
-          new byte[0],
-          Ids.OPEN_ACL_UNSAFE,
-          CreateMode.PERSISTENT,
-          true);
-    } catch (KeeperException.NodeExistsException e) {
-      LOG.warn("storeCheckpoint: wrote checkpoint worker path " +
-          workerWroteCheckpoint + " already exists!");
-    } catch (KeeperException e) {
-      throw new IllegalStateException("Creating " + workerWroteCheckpoint +
-          " failed with KeeperException", e);
-    } catch (InterruptedException e) {
-      throw new IllegalStateException("Creating " +
-          workerWroteCheckpoint +
-          " failed with InterruptedException", e);
-    }
-  }
-
-  @Override
-  public VertexEdgeCount loadCheckpoint(long superstep) {
-    try {
-      // clear old message stores
-      getServerData().getIncomingMessageStore().clearAll();
-      getServerData().getCurrentMessageStore().clearAll();
-    } catch (IOException e) {
-      throw new RuntimeException(
-          "loadCheckpoint: Failed to clear message stores ", e);
-    }
-
-    // Algorithm:
-    // Examine all the partition owners and load the ones
-    // that match my hostname and id from the master designated checkpoint
-    // prefixes.
-    long startPos = 0;
-    int loadedPartitions = 0;
-    for (PartitionOwner partitionOwner :
-      workerGraphPartitioner.getPartitionOwners()) {
-      if (partitionOwner.getWorkerInfo().equals(getWorkerInfo())) {
-        String metadataFile =
-            partitionOwner.getCheckpointFilesPrefix() +
-            CHECKPOINT_METADATA_POSTFIX;
-        String partitionsFile =
-            partitionOwner.getCheckpointFilesPrefix() +
-            CHECKPOINT_VERTICES_POSTFIX;
-        try {
-          int partitionId = -1;
-          DataInputStream metadataStream =
-              getFs().open(new Path(metadataFile));
-          int partitions = metadataStream.readInt();
-          for (int i = 0; i < partitions; ++i) {
-            startPos = metadataStream.readLong();
-            partitionId = metadataStream.readInt();
-            if (partitionId == partitionOwner.getPartitionId()) {
-              break;
-            }
-          }
-          if (partitionId != partitionOwner.getPartitionId()) {
-            throw new IllegalStateException(
-                "loadCheckpoint: " + partitionOwner +
-                " not found!");
-          }
-          metadataStream.close();
-          Partition<I, V, E> partition =
-              getConfiguration().createPartition(partitionId, getContext());
-          DataInputStream partitionsStream =
-              getFs().open(new Path(partitionsFile));
-          if (partitionsStream.skip(startPos) != startPos) {
-            throw new IllegalStateException(
-                "loadCheckpoint: Failed to skip " + startPos +
-                " on " + partitionsFile);
-          }
-          partition.readFields(partitionsStream);
-          getServerData().getIncomingMessageStore().readFieldsForPartition(
-              partitionsStream, partitionId);
-          partitionsStream.close();
-          if (LOG.isInfoEnabled()) {
-            LOG.info("loadCheckpoint: Loaded partition " +
-                partition);
-          }
-          if (getPartitionStore().hasPartition(partitionId)) {
-            throw new IllegalStateException(
-                "loadCheckpoint: Already has partition owner " +
-                    partitionOwner);
-          }
-          getPartitionStore().addPartition(partition);
-          getContext().progress();
-          ++loadedPartitions;
-        } catch (IOException e) {
-          throw new RuntimeException(
-              "loadCheckpoint: Failed to get partition owner " +
-                  partitionOwner, e);
-        }
-      }
-    }
-    if (LOG.isInfoEnabled()) {
-      LOG.info("loadCheckpoint: Loaded " + loadedPartitions +
-          " partitions of out " +
-          workerGraphPartitioner.getPartitionOwners().size() +
-          " total.");
-    }
-
-    // Load global stats and superstep classes
-    GlobalStats globalStats = new GlobalStats();
-    SuperstepClasses superstepClasses = new SuperstepClasses();
-    String finalizedCheckpointPath =
-        getCheckpointBasePath(superstep) + CHECKPOINT_FINALIZED_POSTFIX;
-    try {
-      DataInputStream finalizedStream =
-          getFs().open(new Path(finalizedCheckpointPath));
-      globalStats.readFields(finalizedStream);
-      superstepClasses.readFields(finalizedStream);
-      getConfiguration().updateSuperstepClasses(superstepClasses);
-    } catch (IOException e) {
-      throw new IllegalStateException(
-          "loadCheckpoint: Failed to load global stats and superstep classes",
-          e);
-    }
-
-    getServerData().prepareSuperstep();
-    // Communication service needs to setup the connections prior to
-    // processing vertices
-/*if[HADOOP_NON_SECURE]
-    workerClient.setup();
-else[HADOOP_NON_SECURE]*/
-    workerClient.setup(getConfiguration().authenticate());
-/*end[HADOOP_NON_SECURE]*/
-    return new VertexEdgeCount(globalStats.getVertexCount(),
-        globalStats.getEdgeCount());
-  }
-
-  /**
-   * Send the worker partitions to their destination workers
-   *
-   * @param workerPartitionMap Map of worker info to the partitions stored
-   *        on this worker to be sent
-   */
-  private void sendWorkerPartitions(
-      Map<WorkerInfo, List<Integer>> workerPartitionMap) {
-    List<Entry<WorkerInfo, List<Integer>>> randomEntryList =
-        new ArrayList<Entry<WorkerInfo, List<Integer>>>(
-            workerPartitionMap.entrySet());
-    Collections.shuffle(randomEntryList);
-    WorkerClientRequestProcessor<I, V, E> workerClientRequestProcessor =
-        new NettyWorkerClientRequestProcessor<I, V, E>(getContext(),
-            getConfiguration(), this);
-    for (Entry<WorkerInfo, List<Integer>> workerPartitionList :
-      randomEntryList) {
-      for (Integer partitionId : workerPartitionList.getValue()) {
-        Partition<I, V, E> partition =
-            getPartitionStore().removePartition(partitionId);
-        if (partition == null) {
-          throw new IllegalStateException(
-              "sendWorkerPartitions: Couldn't find partition " +
-                  partitionId + " to send to " +
-                  workerPartitionList.getKey());
-        }
-        if (LOG.isInfoEnabled()) {
-          LOG.info("sendWorkerPartitions: Sending worker " +
-              workerPartitionList.getKey() + " partition " +
-              partitionId);
-        }
-        workerClientRequestProcessor.sendPartitionRequest(
-            workerPartitionList.getKey(),
-            partition);
-      }
-    }
-
-
-    try {
-      workerClientRequestProcessor.flush();
-      workerClient.waitAllRequests();
-    } catch (IOException e) {
-      throw new IllegalStateException("sendWorkerPartitions: Flush failed", e);
-    }
-    String myPartitionExchangeDonePath =
-        getPartitionExchangeWorkerPath(
-            getApplicationAttempt(), getSuperstep(), getWorkerInfo());
-    try {
-      getZkExt().createExt(myPartitionExchangeDonePath,
-          null,
-          Ids.OPEN_ACL_UNSAFE,
-          CreateMode.PERSISTENT,
-          true);
-    } catch (KeeperException e) {
-      throw new IllegalStateException(
-          "sendWorkerPartitions: KeeperException to create " +
-              myPartitionExchangeDonePath, e);
-    } catch (InterruptedException e) {
-      throw new IllegalStateException(
-          "sendWorkerPartitions: InterruptedException to create " +
-              myPartitionExchangeDonePath, e);
-    }
-    if (LOG.isInfoEnabled()) {
-      LOG.info("sendWorkerPartitions: Done sending all my partitions.");
-    }
-  }
-
-  @Override
-  public final void exchangeVertexPartitions(
-      Collection<? extends PartitionOwner> masterSetPartitionOwners) {
-    // 1. Fix the addresses of the partition ids if they have changed.
-    // 2. Send all the partitions to their destination workers in a random
-    //    fashion.
-    // 3. Notify completion with a ZooKeeper stamp
-    // 4. Wait for all my dependencies to be done (if any)
-    // 5. Add the partitions to myself.
-    PartitionExchange partitionExchange =
-        workerGraphPartitioner.updatePartitionOwners(
-            getWorkerInfo(), masterSetPartitionOwners, getPartitionStore());
-    workerClient.openConnections();
-
-    Map<WorkerInfo, List<Integer>> sendWorkerPartitionMap =
-        partitionExchange.getSendWorkerPartitionMap();
-    if (!getPartitionStore().isEmpty()) {
-      sendWorkerPartitions(sendWorkerPartitionMap);
-    }
-
-    Set<WorkerInfo> myDependencyWorkerSet =
-        partitionExchange.getMyDependencyWorkerSet();
-    Set<String> workerIdSet = new HashSet<String>();
-    for (WorkerInfo tmpWorkerInfo : myDependencyWorkerSet) {
-      if (!workerIdSet.add(tmpWorkerInfo.getHostnameId())) {
-        throw new IllegalStateException(
-            "exchangeVertexPartitions: Duplicate entry " + tmpWorkerInfo);
-      }
-    }
-    if (myDependencyWorkerSet.isEmpty() && getPartitionStore().isEmpty()) {
-      if (LOG.isInfoEnabled()) {
-        LOG.info("exchangeVertexPartitions: Nothing to exchange, " +
-            "exiting early");
-      }
-      return;
-    }
-
-    String vertexExchangePath =
-        getPartitionExchangePath(getApplicationAttempt(), getSuperstep());
-    List<String> workerDoneList;
-    try {
-      while (true) {
-        workerDoneList = getZkExt().getChildrenExt(
-            vertexExchangePath, true, false, false);
-        workerIdSet.removeAll(workerDoneList);
-        if (workerIdSet.isEmpty()) {
-          break;
-        }
-        if (LOG.isInfoEnabled()) {
-          LOG.info("exchangeVertexPartitions: Waiting for workers " +
-              workerIdSet);
-        }
-        getPartitionExchangeChildrenChangedEvent().waitForever();
-        getPartitionExchangeChildrenChangedEvent().reset();
-      }
-    } catch (KeeperException e) {
-      throw new RuntimeException(e);
-    } catch (InterruptedException e) {
-      throw new RuntimeException(e);
-    }
-
-    if (LOG.isInfoEnabled()) {
-      LOG.info("exchangeVertexPartitions: Done with exchange.");
-    }
-  }
-
-  /**
-   * Get event when the state of a partition exchange has changed.
-   *
-   * @return Event to check.
-   */
-  public final BspEvent getPartitionExchangeChildrenChangedEvent() {
-    return partitionExchangeChildrenChanged;
-  }
-
-  @Override
-  protected boolean processEvent(WatchedEvent event) {
-    boolean foundEvent = false;
-    if (event.getPath().startsWith(masterJobStatePath) &&
-        (event.getType() == EventType.NodeChildrenChanged)) {
-      if (LOG.isInfoEnabled()) {
-        LOG.info("processEvent: Job state changed, checking " +
-            "to see if it needs to restart");
-      }
-      JSONObject jsonObj = getJobState();
-      // in YARN, we have to manually commit our own output in 2 stages that we
-      // do not have to do in Hadoop-based Giraph. So jsonObj can be null.
-      if (getConfiguration().isPureYarnJob() && null == jsonObj) {
-        LOG.error("BspServiceWorker#getJobState() came back NULL.");
-        return false; // the event has been processed.
-      }
-      try {
-        if ((ApplicationState.valueOf(jsonObj.getString(JSONOBJ_STATE_KEY)) ==
-            ApplicationState.START_SUPERSTEP) &&
-            jsonObj.getLong(JSONOBJ_APPLICATION_ATTEMPT_KEY) !=
-            getApplicationAttempt()) {
-          LOG.fatal("processEvent: Worker will restart " +
-              "from command - " + jsonObj.toString());
-          System.exit(-1);
-        }
-      } catch (JSONException e) {
-        throw new RuntimeException(
-            "processEvent: Couldn't properly get job state from " +
-                jsonObj.toString());
-      }
-      foundEvent = true;
-    } else if (event.getPath().contains(PARTITION_EXCHANGE_DIR) &&
-        event.getType() == EventType.NodeChildrenChanged) {
-      if (LOG.isInfoEnabled()) {
-        LOG.info("processEvent : partitionExchangeChildrenChanged " +
-            "(at least one worker is done sending partitions)");
-      }
-      partitionExchangeChildrenChanged.signal();
-      foundEvent = true;
-    }
-
-    return foundEvent;
-  }
-
-  @Override
-  public WorkerInfo getWorkerInfo() {
-    return workerInfo;
-  }
-
-  @Override
-  public PartitionStore<I, V, E> getPartitionStore() {
-    return getServerData().getPartitionStore();
-  }
-
-  @Override
-  public PartitionOwner getVertexPartitionOwner(I vertexId) {
-    return workerGraphPartitioner.getPartitionOwner(vertexId);
-  }
-
-  @Override
-  public Iterable<? extends PartitionOwner> getPartitionOwners() {
-    return workerGraphPartitioner.getPartitionOwners();
-  }
-
-  @Override
-  public int getPartitionId(I vertexId) {
-    PartitionOwner partitionOwner = getVertexPartitionOwner(vertexId);
-    return partitionOwner.getPartitionId();
-  }
-
-  @Override
-  public boolean hasPartition(Integer partitionId) {
-    return getPartitionStore().hasPartition(partitionId);
-  }
-
-  @Override
-  public ServerData<I, V, E> getServerData() {
-    return workerServer.getServerData();
-  }
-
-  @Override
-  public WorkerAggregatorHandler getAggregatorHandler() {
-    return aggregatorHandler;
-  }
-
-  @Override
-  public void prepareSuperstep() {
-    if (getSuperstep() != INPUT_SUPERSTEP) {
-      aggregatorHandler.prepareSuperstep(workerAggregatorRequestProcessor);
-    }
-  }
-
-  @Override
-  public SuperstepOutput<I, V, E> getSuperstepOutput() {
-    return superstepOutput;
-  }
-}