You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@giraph.apache.org by ni...@apache.org on 2013/01/04 21:52:39 UTC

[5/23] GIRAPH-409: Refactor / cleanups (nitay)

http://git-wip-us.apache.org/repos/asf/giraph/blob/1684891e/giraph-core/src/main/java/org/apache/giraph/worker/BspServiceWorker.java
----------------------------------------------------------------------
diff --git a/giraph-core/src/main/java/org/apache/giraph/worker/BspServiceWorker.java b/giraph-core/src/main/java/org/apache/giraph/worker/BspServiceWorker.java
new file mode 100644
index 0000000..f33fe58
--- /dev/null
+++ b/giraph-core/src/main/java/org/apache/giraph/worker/BspServiceWorker.java
@@ -0,0 +1,1405 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.giraph.worker;
+
+
+import org.apache.giraph.bsp.ApplicationState;
+import org.apache.giraph.bsp.CentralizedServiceWorker;
+import org.apache.giraph.comm.ServerData;
+import org.apache.giraph.comm.WorkerClient;
+import org.apache.giraph.comm.WorkerClientRequestProcessor;
+import org.apache.giraph.comm.WorkerServer;
+import org.apache.giraph.comm.aggregators.WorkerAggregatorRequestProcessor;
+import org.apache.giraph.comm.netty.NettyWorkerAggregatorRequestProcessor;
+import org.apache.giraph.comm.netty.NettyWorkerClient;
+import org.apache.giraph.comm.netty.NettyWorkerClientRequestProcessor;
+import org.apache.giraph.comm.netty.NettyWorkerServer;
+import org.apache.giraph.conf.GiraphConstants;
+import org.apache.giraph.graph.AddressesAndPartitionsWritable;
+import org.apache.giraph.bsp.BspService;
+import org.apache.giraph.graph.FinishedSuperstepStats;
+import org.apache.giraph.graph.GlobalStats;
+import org.apache.giraph.graph.GraphMapper;
+import org.apache.giraph.graph.GraphState;
+import org.apache.giraph.graph.InputSplitEvents;
+import org.apache.giraph.graph.InputSplitPaths;
+import org.apache.giraph.vertex.Vertex;
+import org.apache.giraph.graph.VertexEdgeCount;
+import org.apache.giraph.io.VertexOutputFormat;
+import org.apache.giraph.io.VertexWriter;
+import org.apache.giraph.partition.Partition;
+import org.apache.giraph.partition.PartitionExchange;
+import org.apache.giraph.partition.PartitionOwner;
+import org.apache.giraph.partition.PartitionStats;
+import org.apache.giraph.partition.PartitionStore;
+import org.apache.giraph.partition.WorkerGraphPartitioner;
+import org.apache.giraph.master.MasterInfo;
+import org.apache.giraph.metrics.GiraphMetrics;
+import org.apache.giraph.metrics.GiraphTimer;
+import org.apache.giraph.metrics.GiraphTimerContext;
+import org.apache.giraph.metrics.ResetSuperstepMetricsObserver;
+import org.apache.giraph.metrics.SuperstepMetricsRegistry;
+import org.apache.giraph.metrics.WorkerSuperstepMetrics;
+import org.apache.giraph.utils.LoggerUtils;
+import org.apache.giraph.utils.MemoryUtils;
+import org.apache.giraph.utils.ProgressableUtils;
+import org.apache.giraph.utils.WritableUtils;
+import org.apache.giraph.zk.BspEvent;
+import org.apache.giraph.zk.PredicateLock;
+import org.apache.hadoop.fs.FSDataOutputStream;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.io.Writable;
+import org.apache.hadoop.io.WritableComparable;
+import org.apache.hadoop.mapreduce.Mapper;
+import org.apache.log4j.Level;
+import org.apache.log4j.Logger;
+import org.apache.zookeeper.CreateMode;
+import org.apache.zookeeper.KeeperException;
+import org.apache.zookeeper.WatchedEvent;
+import org.apache.zookeeper.Watcher.Event.EventType;
+import org.apache.zookeeper.ZooDefs.Ids;
+import org.apache.zookeeper.data.Stat;
+import org.json.JSONArray;
+import org.json.JSONException;
+import org.json.JSONObject;
+
+import com.google.common.collect.Lists;
+import com.google.common.util.concurrent.ThreadFactoryBuilder;
+import net.iharder.Base64;
+
+import java.io.ByteArrayOutputStream;
+import java.io.DataInputStream;
+import java.io.DataOutput;
+import java.io.DataOutputStream;
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.Collection;
+import java.util.Collections;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Map;
+import java.util.Map.Entry;
+import java.util.Set;
+import java.util.concurrent.Callable;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.Executors;
+import java.util.concurrent.Future;
+import java.util.concurrent.TimeUnit;
+
+/**
+ * ZooKeeper-based implementation of {@link CentralizedServiceWorker}.
+ *
+ * @param <I> Vertex id
+ * @param <V> Vertex data
+ * @param <E> Edge data
+ * @param <M> Message data
+ */
+@SuppressWarnings("rawtypes")
+public class BspServiceWorker<I extends WritableComparable,
+    V extends Writable, E extends Writable, M extends Writable>
+    extends BspService<I, V, E, M>
+    implements CentralizedServiceWorker<I, V, E, M>,
+    ResetSuperstepMetricsObserver {
+  /** Name of gauge for time spent waiting on other workers */
+  public static final String TIMER_WAIT_REQUESTS = "wait-requests-us";
+  /** Class logger */
+  private static final Logger LOG = Logger.getLogger(BspServiceWorker.class);
+  /** My process health znode */
+  private String myHealthZnode;
+  /** Worker info */
+  private final WorkerInfo workerInfo;
+  /** Worker graph partitioner */
+  private final WorkerGraphPartitioner<I, V, E, M> workerGraphPartitioner;
+
+  /** IPC Client */
+  private final WorkerClient<I, V, E, M> workerClient;
+  /** IPC Server */
+  private final WorkerServer<I, V, E, M> workerServer;
+  /** Request processor for aggregator requests */
+  private final WorkerAggregatorRequestProcessor
+  workerAggregatorRequestProcessor;
+  /** Master info */
+  private MasterInfo masterInfo = new MasterInfo();
+  /** List of workers */
+  private List<WorkerInfo> workerInfoList = Lists.newArrayList();
+  /** Have the partition exchange children (workers) changed? */
+  private final BspEvent partitionExchangeChildrenChanged;
+
+  /** Worker Context */
+  private final WorkerContext workerContext;
+
+  /** Handler for aggregators */
+  private final WorkerAggregatorHandler aggregatorHandler;
+
+  // Per-Superstep Metrics
+  /** Timer for WorkerContext#postSuperstep */
+  private GiraphTimer wcPostSuperstepTimer;
+  /** Time spent waiting on requests to finish */
+  private GiraphTimer waitRequestsTimer;
+
+  /**
+   * Constructor for setting up the worker.
+   *
+   * @param serverPortList ZooKeeper server port list
+   * @param sessionMsecTimeout Msecs to timeout connecting to ZooKeeper
+   * @param context Mapper context
+   * @param graphMapper Graph mapper
+   * @throws IOException
+   * @throws InterruptedException
+   */
+  public BspServiceWorker(
+    String serverPortList,
+    int sessionMsecTimeout,
+    Mapper<?, ?, ?, ?>.Context context,
+    GraphMapper<I, V, E, M> graphMapper)
+    throws IOException, InterruptedException {
+    super(serverPortList, sessionMsecTimeout, context, graphMapper);
+    partitionExchangeChildrenChanged = new PredicateLock(context);
+    registerBspEvent(partitionExchangeChildrenChanged);
+    workerGraphPartitioner =
+        getGraphPartitionerFactory().createWorkerGraphPartitioner();
+    workerInfo = new WorkerInfo();
+    workerServer =
+        new NettyWorkerServer<I, V, E, M>(getConfiguration(), this, context);
+    workerInfo.setInetSocketAddress(workerServer.getMyAddress());
+    workerInfo.setTaskId(getTaskPartition());
+    workerClient =
+        new NettyWorkerClient<I, V, E, M>(context, getConfiguration(), this);
+
+    workerAggregatorRequestProcessor =
+        new NettyWorkerAggregatorRequestProcessor(getContext(),
+            getConfiguration(), this);
+
+    this.workerContext = getConfiguration().createWorkerContext(null);
+
+    aggregatorHandler =
+        new WorkerAggregatorHandler(this, getConfiguration(), context);
+
+    GiraphMetrics.get().addSuperstepResetObserver(this);
+  }
+
+  @Override
+  public void newSuperstep(SuperstepMetricsRegistry superstepMetrics) {
+    waitRequestsTimer = new GiraphTimer(superstepMetrics,
+        TIMER_WAIT_REQUESTS, TimeUnit.MICROSECONDS);
+    wcPostSuperstepTimer = new GiraphTimer(superstepMetrics,
+        "worker-context-post-superstep", TimeUnit.MICROSECONDS);
+  }
+
+  @Override
+  public WorkerContext getWorkerContext() {
+    return workerContext;
+  }
+
+  @Override
+  public WorkerClient<I, V, E, M> getWorkerClient() {
+    return workerClient;
+  }
+
+  /**
+   * Intended to check the health of the node.  For instance, can it ssh,
+   * dmesg, etc. For now, does nothing.
+   * TODO: Make this check configurable by the user (i.e. search dmesg for
+   * problems).
+   *
+   * @return True if healthy (always in this case).
+   */
+  public boolean isHealthy() {
+    return true;
+  }
+
+  /**
+   * Load the vertices/edges from input slits. Do this until all the
+   * InputSplits have been processed.
+   * All workers will try to do as many InputSplits as they can.  The master
+   * will monitor progress and stop this once all the InputSplits have been
+   * loaded and check-pointed.  Keep track of the last input split path to
+   * ensure the input split cache is flushed prior to marking the last input
+   * split complete.
+   *
+   * Use one or more threads to do the loading.
+   *
+   * @param inputSplitPathList List of input split paths
+   * @param inputSplitsCallableFactory Factory for {@link InputSplitsCallable}s
+   * @return Statistics of the vertices and edges loaded
+   * @throws InterruptedException
+   * @throws KeeperException
+   */
+  private VertexEdgeCount loadInputSplits(
+      List<String> inputSplitPathList,
+      InputSplitsCallableFactory<I, V, E, M> inputSplitsCallableFactory)
+    throws KeeperException, InterruptedException {
+    VertexEdgeCount vertexEdgeCount = new VertexEdgeCount();
+    // Determine how many threads to use based on the number of input splits
+    int maxInputSplitThreads =
+        Math.max(
+            inputSplitPathList.size() / getConfiguration().getMaxWorkers(), 1);
+    int numThreads = Math.min(getConfiguration().getNumInputSplitsThreads(),
+        maxInputSplitThreads);
+    ExecutorService inputSplitsExecutor =
+        Executors.newFixedThreadPool(numThreads,
+            new ThreadFactoryBuilder().setNameFormat("load-%d").build());
+    List<Future<VertexEdgeCount>> threadsFutures =
+        Lists.newArrayListWithCapacity(numThreads);
+    if (LOG.isInfoEnabled()) {
+      LOG.info("loadInputSplits: Using " + numThreads + " thread(s), " +
+          "originally " + getConfiguration().getNumInputSplitsThreads() +
+          " threads(s) for " + inputSplitPathList.size() + " total splits.");
+    }
+    for (int i = 0; i < numThreads; ++i) {
+      Callable<VertexEdgeCount> inputSplitsCallable =
+          inputSplitsCallableFactory.newCallable();
+      threadsFutures.add(inputSplitsExecutor.submit(inputSplitsCallable));
+    }
+
+    // Wait until all the threads are done to wait on all requests
+    for (Future<VertexEdgeCount> threadFuture : threadsFutures) {
+      VertexEdgeCount threadVertexEdgeCount =
+          ProgressableUtils.getFutureResult(threadFuture, getContext());
+      vertexEdgeCount =
+          vertexEdgeCount.incrVertexEdgeCount(threadVertexEdgeCount);
+    }
+
+    workerClient.waitAllRequests();
+    inputSplitsExecutor.shutdown();
+    return vertexEdgeCount;
+  }
+
+
+  /**
+   * Load the vertices from the user-defined {@link VertexReader}
+   *
+   * @return Count of vertices and edges loaded
+   */
+  private VertexEdgeCount loadVertices() throws KeeperException,
+      InterruptedException {
+    List<String> inputSplitPathList =
+        getZkExt().getChildrenExt(vertexInputSplitsPaths.getPath(),
+            false, false, true);
+
+    GraphState<I, V, E, M> graphState = new GraphState<I, V, E, M>(
+        INPUT_SUPERSTEP, 0, 0, getContext(), getGraphMapper(),
+        null, null);
+
+    VertexInputSplitsCallableFactory<I, V, E, M> inputSplitsCallableFactory =
+        new VertexInputSplitsCallableFactory<I, V, E, M>(
+            getContext(),
+            graphState,
+            getConfiguration(),
+            this,
+            inputSplitPathList,
+            getWorkerInfo(),
+            getZkExt());
+
+    return loadInputSplits(inputSplitPathList, inputSplitsCallableFactory);
+  }
+
+  /**
+   * Load the edges from the user-defined {@link EdgeReader}.
+   *
+   * @return Number of edges loaded
+   */
+  private long loadEdges() throws KeeperException, InterruptedException {
+    List<String> inputSplitPathList =
+        getZkExt().getChildrenExt(edgeInputSplitsPaths.getPath(),
+            false, false, true);
+
+    GraphState<I, V, E, M> graphState = new GraphState<I, V, E, M>(
+        INPUT_SUPERSTEP, 0, 0, getContext(), getGraphMapper(),
+        null, null);
+
+    EdgeInputSplitsCallableFactory<I, V, E, M> inputSplitsCallableFactory =
+        new EdgeInputSplitsCallableFactory<I, V, E, M>(
+            getContext(),
+            graphState,
+            getConfiguration(),
+            this,
+            inputSplitPathList,
+            getWorkerInfo(),
+            getZkExt());
+
+    return loadInputSplits(inputSplitPathList, inputSplitsCallableFactory).
+        getEdgeCount();
+  }
+
+  @Override
+  public MasterInfo getMasterInfo() {
+    return masterInfo;
+  }
+
+  @Override
+  public List<WorkerInfo> getWorkerInfoList() {
+    return workerInfoList;
+  }
+
+  /**
+   * Ensure the input splits are ready for processing
+   *
+   * @param inputSplitPaths Input split paths
+   * @param inputSplitEvents Input split events
+   */
+  private void ensureInputSplitsReady(InputSplitPaths inputSplitPaths,
+                                      InputSplitEvents inputSplitEvents) {
+    while (true) {
+      Stat inputSplitsReadyStat;
+      try {
+        inputSplitsReadyStat = getZkExt().exists(
+            inputSplitPaths.getAllReadyPath(), true);
+      } catch (KeeperException e) {
+        throw new IllegalStateException("ensureInputSplitsReady: " +
+            "KeeperException waiting on input splits", e);
+      } catch (InterruptedException e) {
+        throw new IllegalStateException("ensureInputSplitsReady: " +
+            "InterruptedException waiting on input splits", e);
+      }
+      if (inputSplitsReadyStat != null) {
+        break;
+      }
+      inputSplitEvents.getAllReadyChanged().waitForever();
+      inputSplitEvents.getAllReadyChanged().reset();
+    }
+  }
+
+  /**
+   * Wait for all workers to finish processing input splits.
+   *
+   * @param inputSplitPaths Input split paths
+   * @param inputSplitEvents Input split events
+   */
+  private void waitForOtherWorkers(InputSplitPaths inputSplitPaths,
+                                   InputSplitEvents inputSplitEvents) {
+    String workerInputSplitsDonePath =
+        inputSplitPaths.getDonePath() + "/" +
+            getWorkerInfo().getHostnameId();
+    try {
+      getZkExt().createExt(workerInputSplitsDonePath,
+          null,
+          Ids.OPEN_ACL_UNSAFE,
+          CreateMode.PERSISTENT,
+          true);
+    } catch (KeeperException e) {
+      throw new IllegalStateException("waitForOtherWorkers: " +
+          "KeeperException creating worker done splits", e);
+    } catch (InterruptedException e) {
+      throw new IllegalStateException("waitForOtherWorkers: " +
+          "InterruptedException creating worker done splits", e);
+    }
+    while (true) {
+      Stat inputSplitsDoneStat;
+      try {
+        inputSplitsDoneStat =
+            getZkExt().exists(inputSplitPaths.getAllDonePath(),
+                true);
+      } catch (KeeperException e) {
+        throw new IllegalStateException("waitForOtherWorkers: " +
+            "KeeperException waiting on worker done splits", e);
+      } catch (InterruptedException e) {
+        throw new IllegalStateException("waitForOtherWorkers: " +
+            "InterruptedException waiting on worker done splits", e);
+      }
+      if (inputSplitsDoneStat != null) {
+        break;
+      }
+      inputSplitEvents.getAllDoneChanged().waitForever();
+      inputSplitEvents.getAllDoneChanged().reset();
+    }
+  }
+
+  @Override
+  public FinishedSuperstepStats setup() {
+    // Unless doing a restart, prepare for computation:
+    // 1. Start superstep INPUT_SUPERSTEP (no computation)
+    // 2. Wait until the INPUT_SPLIT_ALL_READY_PATH node has been created
+    // 3. Process input splits until there are no more.
+    // 4. Wait until the INPUT_SPLIT_ALL_DONE_PATH node has been created
+    // 5. Process any mutations deriving from add edge requests
+    // 6. Wait for superstep INPUT_SUPERSTEP to complete.
+    if (getRestartedSuperstep() != UNSET_SUPERSTEP) {
+      setCachedSuperstep(getRestartedSuperstep());
+      return new FinishedSuperstepStats(false, -1, -1);
+    }
+
+    JSONObject jobState = getJobState();
+    if (jobState != null) {
+      try {
+        if ((ApplicationState.valueOf(jobState.getString(JSONOBJ_STATE_KEY)) ==
+            ApplicationState.START_SUPERSTEP) &&
+            jobState.getLong(JSONOBJ_SUPERSTEP_KEY) ==
+            getSuperstep()) {
+          if (LOG.isInfoEnabled()) {
+            LOG.info("setup: Restarting from an automated " +
+                "checkpointed superstep " +
+                getSuperstep() + ", attempt " +
+                getApplicationAttempt());
+          }
+          setRestartedSuperstep(getSuperstep());
+          return new FinishedSuperstepStats(false, -1, -1);
+        }
+      } catch (JSONException e) {
+        throw new RuntimeException(
+            "setup: Failed to get key-values from " +
+                jobState.toString(), e);
+      }
+    }
+
+    // Add the partitions that this worker owns
+    GraphState<I, V, E, M> graphState =
+        new GraphState<I, V, E, M>(INPUT_SUPERSTEP, 0, 0,
+            getContext(), getGraphMapper(), null, null);
+    Collection<? extends PartitionOwner> masterSetPartitionOwners =
+        startSuperstep(graphState);
+    workerGraphPartitioner.updatePartitionOwners(
+        getWorkerInfo(), masterSetPartitionOwners, getPartitionStore());
+
+/*if[HADOOP_NON_SECURE]
+    workerClient.setup();
+else[HADOOP_NON_SECURE]*/
+    workerClient.setup(getConfiguration().authenticate());
+/*end[HADOOP_NON_SECURE]*/
+
+    VertexEdgeCount vertexEdgeCount;
+
+    if (getConfiguration().hasVertexInputFormat()) {
+      // Ensure the vertex InputSplits are ready for processing
+      ensureInputSplitsReady(vertexInputSplitsPaths, vertexInputSplitsEvents);
+      getContext().progress();
+      try {
+        vertexEdgeCount = loadVertices();
+      } catch (InterruptedException e) {
+        throw new IllegalStateException(
+            "setup: loadVertices failed with InterruptedException", e);
+      } catch (KeeperException e) {
+        throw new IllegalStateException(
+            "setup: loadVertices failed with KeeperException", e);
+      }
+      getContext().progress();
+    } else {
+      vertexEdgeCount = new VertexEdgeCount();
+    }
+
+    if (getConfiguration().hasEdgeInputFormat()) {
+      // Ensure the edge InputSplits are ready for processing
+      ensureInputSplitsReady(edgeInputSplitsPaths, edgeInputSplitsEvents);
+      getContext().progress();
+      try {
+        vertexEdgeCount = vertexEdgeCount.incrVertexEdgeCount(0, loadEdges());
+      } catch (InterruptedException e) {
+        throw new IllegalStateException(
+            "setup: loadEdges failed with InterruptedException", e);
+      } catch (KeeperException e) {
+        throw new IllegalStateException(
+            "setup: loadEdges failed with KeeperException", e);
+      }
+      getContext().progress();
+    }
+
+    if (LOG.isInfoEnabled()) {
+      LOG.info("setup: Finally loaded a total of " + vertexEdgeCount);
+    }
+
+    if (getConfiguration().hasVertexInputFormat()) {
+      // Workers wait for each other to finish, coordinated by master
+      waitForOtherWorkers(vertexInputSplitsPaths, vertexInputSplitsEvents);
+    }
+
+    if (getConfiguration().hasEdgeInputFormat()) {
+      // Workers wait for each other to finish, coordinated by master
+      waitForOtherWorkers(edgeInputSplitsPaths, edgeInputSplitsEvents);
+    }
+
+    // Create remaining partitions owned by this worker.
+    for (PartitionOwner partitionOwner : masterSetPartitionOwners) {
+      if (partitionOwner.getWorkerInfo().equals(getWorkerInfo()) &&
+          !getPartitionStore().hasPartition(
+              partitionOwner.getPartitionId())) {
+        Partition<I, V, E, M> partition =
+            getConfiguration().createPartition(
+                partitionOwner.getPartitionId(), getContext());
+        getPartitionStore().addPartition(partition);
+      }
+    }
+
+    if (getConfiguration().hasEdgeInputFormat()) {
+      // Create vertices from added edges via vertex resolver.
+      // Doing this at the beginning of superstep 0 is not enough,
+      // because we want the vertex/edge stats to be accurate.
+      workerServer.resolveMutations(graphState);
+    }
+
+    // Generate the partition stats for the input superstep and process
+    // if necessary
+    List<PartitionStats> partitionStatsList =
+        new ArrayList<PartitionStats>();
+    for (Partition<I, V, E, M> partition :
+        getPartitionStore().getPartitions()) {
+      PartitionStats partitionStats =
+          new PartitionStats(partition.getId(),
+              partition.getVertexCount(),
+              0,
+              partition.getEdgeCount(),
+              0);
+      partitionStatsList.add(partitionStats);
+    }
+    workerGraphPartitioner.finalizePartitionStats(
+        partitionStatsList, getPartitionStore());
+
+    return finishSuperstep(graphState, partitionStatsList);
+  }
+
+  /**
+   * Register the health of this worker for a given superstep
+   *
+   * @param superstep Superstep to register health on
+   */
+  private void registerHealth(long superstep) {
+    JSONArray hostnamePort = new JSONArray();
+    hostnamePort.put(getHostname());
+
+    hostnamePort.put(workerInfo.getPort());
+
+    String myHealthPath = null;
+    if (isHealthy()) {
+      myHealthPath = getWorkerInfoHealthyPath(getApplicationAttempt(),
+          getSuperstep());
+    } else {
+      myHealthPath = getWorkerInfoUnhealthyPath(getApplicationAttempt(),
+          getSuperstep());
+    }
+    myHealthPath = myHealthPath + "/" + workerInfo.getHostnameId();
+    try {
+      myHealthZnode = getZkExt().createExt(
+          myHealthPath,
+          WritableUtils.writeToByteArray(workerInfo),
+          Ids.OPEN_ACL_UNSAFE,
+          CreateMode.EPHEMERAL,
+          true);
+    } catch (KeeperException.NodeExistsException e) {
+      LOG.warn("registerHealth: myHealthPath already exists (likely " +
+          "from previous failure): " + myHealthPath +
+          ".  Waiting for change in attempts " +
+          "to re-join the application");
+      getApplicationAttemptChangedEvent().waitForever();
+      if (LOG.isInfoEnabled()) {
+        LOG.info("registerHealth: Got application " +
+            "attempt changed event, killing self");
+      }
+      throw new IllegalStateException(
+          "registerHealth: Trying " +
+              "to get the new application attempt by killing self", e);
+    } catch (KeeperException e) {
+      throw new IllegalStateException("Creating " + myHealthPath +
+          " failed with KeeperException", e);
+    } catch (InterruptedException e) {
+      throw new IllegalStateException("Creating " + myHealthPath +
+          " failed with InterruptedException", e);
+    }
+    if (LOG.isInfoEnabled()) {
+      LOG.info("registerHealth: Created my health node for attempt=" +
+          getApplicationAttempt() + ", superstep=" +
+          getSuperstep() + " with " + myHealthZnode +
+          " and workerInfo= " + workerInfo);
+    }
+  }
+
+  /**
+   * Do this to help notify the master quicker that this worker has failed.
+   */
+  private void unregisterHealth() {
+    LOG.error("unregisterHealth: Got failure, unregistering health on " +
+        myHealthZnode + " on superstep " + getSuperstep());
+    try {
+      getZkExt().deleteExt(myHealthZnode, -1, false);
+    } catch (InterruptedException e) {
+      throw new IllegalStateException(
+          "unregisterHealth: InterruptedException - Couldn't delete " +
+              myHealthZnode, e);
+    } catch (KeeperException e) {
+      throw new IllegalStateException(
+          "unregisterHealth: KeeperException - Couldn't delete " +
+              myHealthZnode, e);
+    }
+  }
+
+  @Override
+  public void failureCleanup() {
+    unregisterHealth();
+  }
+
+  @Override
+  public Collection<? extends PartitionOwner> startSuperstep(
+      GraphState<I, V, E, M> graphState) {
+    // Algorithm:
+    // 1. Communication service will combine message from previous
+    //    superstep
+    // 2. Register my health for the next superstep.
+    // 3. Wait until the partition assignment is complete and get it
+    // 4. Get the aggregator values from the previous superstep
+    if (getSuperstep() != INPUT_SUPERSTEP) {
+      workerServer.prepareSuperstep(graphState);
+    }
+
+    registerHealth(getSuperstep());
+
+    String addressesAndPartitionsPath =
+        getAddressesAndPartitionsPath(getApplicationAttempt(),
+            getSuperstep());
+    AddressesAndPartitionsWritable addressesAndPartitions =
+        new AddressesAndPartitionsWritable(
+            workerGraphPartitioner.createPartitionOwner().getClass());
+    try {
+      while (getZkExt().exists(addressesAndPartitionsPath, true) ==
+          null) {
+        getAddressesAndPartitionsReadyChangedEvent().waitForever();
+        getAddressesAndPartitionsReadyChangedEvent().reset();
+      }
+      WritableUtils.readFieldsFromZnode(
+          getZkExt(),
+          addressesAndPartitionsPath,
+          false,
+          null,
+          addressesAndPartitions);
+    } catch (KeeperException e) {
+      throw new IllegalStateException(
+          "startSuperstep: KeeperException getting assignments", e);
+    } catch (InterruptedException e) {
+      throw new IllegalStateException(
+          "startSuperstep: InterruptedException getting assignments", e);
+    }
+
+    workerInfoList.clear();
+    workerInfoList = addressesAndPartitions.getWorkerInfos();
+    masterInfo = addressesAndPartitions.getMasterInfo();
+
+    if (LOG.isInfoEnabled()) {
+      LOG.info("startSuperstep: " + masterInfo);
+      LOG.info("startSuperstep: Ready for computation on superstep " +
+          getSuperstep() + " since worker " +
+          "selection and vertex range assignments are done in " +
+          addressesAndPartitionsPath);
+    }
+
+    getContext().setStatus("startSuperstep: " +
+        getGraphMapper().getMapFunctions().toString() +
+        " - Attempt=" + getApplicationAttempt() +
+        ", Superstep=" + getSuperstep());
+    return addressesAndPartitions.getPartitionOwners();
+  }
+
+  @Override
+  public FinishedSuperstepStats finishSuperstep(
+      GraphState<I, V, E, M> graphState,
+      List<PartitionStats> partitionStatsList) {
+    // This barrier blocks until success (or the master signals it to
+    // restart).
+    //
+    // Master will coordinate the barriers and aggregate "doneness" of all
+    // the vertices.  Each worker will:
+    // 1. Ensure that the requests are complete
+    // 2. Execute user postSuperstep() if necessary.
+    // 3. Save aggregator values that are in use.
+    // 4. Report the statistics (vertices, edges, messages, etc.)
+    //    of this worker
+    // 5. Let the master know it is finished.
+    // 6. Wait for the master's global stats, and check if done
+    waitForRequestsToFinish();
+
+    graphState.getGraphMapper().notifyFinishedCommunication();
+
+    long workerSentMessages = 0;
+    for (PartitionStats partitionStats : partitionStatsList) {
+      workerSentMessages += partitionStats.getMessagesSentCount();
+    }
+
+    if (getSuperstep() != INPUT_SUPERSTEP) {
+      getWorkerContext().setGraphState(graphState);
+      GiraphTimerContext timerContext = wcPostSuperstepTimer.time();
+      getWorkerContext().postSuperstep();
+      timerContext.stop();
+      getContext().progress();
+    }
+
+    aggregatorHandler.finishSuperstep(workerAggregatorRequestProcessor);
+
+    if (LOG.isInfoEnabled()) {
+      LOG.info("finishSuperstep: Superstep " + getSuperstep() +
+          ", messages = " + workerSentMessages + " " +
+          MemoryUtils.getRuntimeMemoryStats());
+    }
+
+    writeFinshedSuperstepInfoToZK(partitionStatsList, workerSentMessages);
+
+    LoggerUtils.setStatusAndLog(getContext(), LOG, Level.INFO,
+        "finishSuperstep: (waiting for rest " +
+            "of workers) " +
+            getGraphMapper().getMapFunctions().toString() +
+            " - Attempt=" + getApplicationAttempt() +
+            ", Superstep=" + getSuperstep());
+
+    String superstepFinishedNode =
+        getSuperstepFinishedPath(getApplicationAttempt(), getSuperstep());
+
+    waitForOtherWorkers(superstepFinishedNode);
+
+    GlobalStats globalStats = new GlobalStats();
+    WritableUtils.readFieldsFromZnode(
+        getZkExt(), superstepFinishedNode, false, null, globalStats);
+    if (LOG.isInfoEnabled()) {
+      LOG.info("finishSuperstep: Completed superstep " + getSuperstep() +
+          " with global stats " + globalStats);
+    }
+    incrCachedSuperstep();
+    getContext().setStatus("finishSuperstep: (all workers done) " +
+        getGraphMapper().getMapFunctions().toString() +
+        " - Attempt=" + getApplicationAttempt() +
+        ", Superstep=" + getSuperstep());
+
+    return new FinishedSuperstepStats(
+        globalStats.getHaltComputation(),
+        globalStats.getVertexCount(),
+        globalStats.getEdgeCount());
+  }
+
+  /**
+   * Wait for all the requests to finish.
+   */
+  private void waitForRequestsToFinish() {
+    if (LOG.isInfoEnabled()) {
+      LOG.info("finishSuperstep: Waiting on all requests, superstep " +
+          getSuperstep() + " " +
+          MemoryUtils.getRuntimeMemoryStats());
+    }
+    GiraphTimerContext timerContext = waitRequestsTimer.time();
+    workerClient.waitAllRequests();
+    timerContext.stop();
+  }
+
+  /**
+   * Wait for all the other Workers to finish the superstep.
+   *
+   * @param superstepFinishedNode ZooKeeper path to wait on.
+   */
+  private void waitForOtherWorkers(String superstepFinishedNode) {
+    try {
+      while (getZkExt().exists(superstepFinishedNode, true) == null) {
+        getSuperstepFinishedEvent().waitForever();
+        getSuperstepFinishedEvent().reset();
+      }
+    } catch (KeeperException e) {
+      throw new IllegalStateException(
+          "finishSuperstep: Failed while waiting for master to " +
+              "signal completion of superstep " + getSuperstep(), e);
+    } catch (InterruptedException e) {
+      throw new IllegalStateException(
+          "finishSuperstep: Failed while waiting for master to " +
+              "signal completion of superstep " + getSuperstep(), e);
+    }
+  }
+
+  /**
+   * Write finished superstep info to ZooKeeper.
+   *
+   * @param partitionStatsList List of partition stats from superstep.
+   * @param workerSentMessages Number of messages sent in superstep.
+   */
+  private void writeFinshedSuperstepInfoToZK(
+      List<PartitionStats> partitionStatsList, long workerSentMessages) {
+    Collection<PartitionStats> finalizedPartitionStats =
+        workerGraphPartitioner.finalizePartitionStats(
+            partitionStatsList, getPartitionStore());
+    List<PartitionStats> finalizedPartitionStatsList =
+        new ArrayList<PartitionStats>(finalizedPartitionStats);
+    byte[] partitionStatsBytes =
+        WritableUtils.writeListToByteArray(finalizedPartitionStatsList);
+    WorkerSuperstepMetrics metrics = new WorkerSuperstepMetrics();
+    metrics.readFromRegistry();
+    byte[] metricsBytes = WritableUtils.writeToByteArray(metrics);
+
+    JSONObject workerFinishedInfoObj = new JSONObject();
+    try {
+      workerFinishedInfoObj.put(JSONOBJ_PARTITION_STATS_KEY,
+          Base64.encodeBytes(partitionStatsBytes));
+      workerFinishedInfoObj.put(JSONOBJ_NUM_MESSAGES_KEY, workerSentMessages);
+      workerFinishedInfoObj.put(JSONOBJ_METRICS_KEY,
+          Base64.encodeBytes(metricsBytes));
+    } catch (JSONException e) {
+      throw new RuntimeException(e);
+    }
+
+    String finishedWorkerPath =
+        getWorkerFinishedPath(getApplicationAttempt(), getSuperstep()) +
+        "/" + getHostnamePartitionId();
+    try {
+      getZkExt().createExt(finishedWorkerPath,
+          workerFinishedInfoObj.toString().getBytes(),
+          Ids.OPEN_ACL_UNSAFE,
+          CreateMode.PERSISTENT,
+          true);
+    } catch (KeeperException.NodeExistsException e) {
+      LOG.warn("finishSuperstep: finished worker path " +
+          finishedWorkerPath + " already exists!");
+    } catch (KeeperException e) {
+      throw new IllegalStateException("Creating " + finishedWorkerPath +
+          " failed with KeeperException", e);
+    } catch (InterruptedException e) {
+      throw new IllegalStateException("Creating " + finishedWorkerPath +
+          " failed with InterruptedException", e);
+    }
+  }
+
+  /**
+   * Save the vertices using the user-defined VertexOutputFormat from our
+   * vertexArray based on the split.
+   * @throws InterruptedException
+   */
+  private void saveVertices() throws IOException, InterruptedException {
+    if (getConfiguration().getVertexOutputFormatClass() == null) {
+      LOG.warn("saveVertices: " +
+          GiraphConstants.VERTEX_OUTPUT_FORMAT_CLASS +
+          " not specified -- there will be no saved output");
+      return;
+    }
+
+    LoggerUtils.setStatusAndLog(getContext(), LOG, Level.INFO,
+        "saveVertices: Starting to save vertices");
+    VertexOutputFormat<I, V, E> vertexOutputFormat =
+        getConfiguration().createVertexOutputFormat();
+    VertexWriter<I, V, E> vertexWriter =
+        vertexOutputFormat.createVertexWriter(getContext());
+    vertexWriter.initialize(getContext());
+    for (Partition<I, V, E, M> partition :
+        getPartitionStore().getPartitions()) {
+      for (Vertex<I, V, E, M> vertex : partition) {
+        getContext().progress();
+        vertexWriter.writeVertex(vertex);
+      }
+      getContext().progress();
+    }
+    vertexWriter.close(getContext());
+    LoggerUtils.setStatusAndLog(getContext(), LOG, Level.INFO,
+        "saveVertices: Done saving vertices");
+  }
+
+  @Override
+  public void cleanup() throws IOException, InterruptedException {
+    workerClient.closeConnections();
+    setCachedSuperstep(getSuperstep() - 1);
+    saveVertices();
+    // All worker processes should denote they are done by adding special
+    // znode.  Once the number of znodes equals the number of partitions
+    // for workers and masters, the master will clean up the ZooKeeper
+    // znodes associated with this job.
+    String workerCleanedUpPath = cleanedUpPath  + "/" +
+        getTaskPartition() + WORKER_SUFFIX;
+    try {
+      String finalFinishedPath =
+          getZkExt().createExt(workerCleanedUpPath,
+              null,
+              Ids.OPEN_ACL_UNSAFE,
+              CreateMode.PERSISTENT,
+              true);
+      if (LOG.isInfoEnabled()) {
+        LOG.info("cleanup: Notifying master its okay to cleanup with " +
+            finalFinishedPath);
+      }
+    } catch (KeeperException.NodeExistsException e) {
+      if (LOG.isInfoEnabled()) {
+        LOG.info("cleanup: Couldn't create finished node '" +
+            workerCleanedUpPath);
+      }
+    } catch (KeeperException e) {
+      // Cleaning up, it's okay to fail after cleanup is successful
+      LOG.error("cleanup: Got KeeperException on notification " +
+          "to master about cleanup", e);
+    } catch (InterruptedException e) {
+      // Cleaning up, it's okay to fail after cleanup is successful
+      LOG.error("cleanup: Got InterruptedException on notification " +
+          "to master about cleanup", e);
+    }
+    try {
+      getZkExt().close();
+    } catch (InterruptedException e) {
+      // cleanup phase -- just log the error
+      LOG.error("cleanup: Zookeeper failed to close with " + e);
+    }
+
+    if (getConfiguration().metricsEnabled()) {
+      GiraphMetrics.get().dumpToStdout();
+    }
+
+    // Preferably would shut down the service only after
+    // all clients have disconnected (or the exceptions on the
+    // client side ignored).
+    workerServer.close();
+  }
+
+  @Override
+  public void storeCheckpoint() throws IOException {
+    LoggerUtils.setStatusAndLog(getContext(), LOG, Level.INFO,
+        "storeCheckpoint: Starting checkpoint " +
+            getGraphMapper().getMapFunctions().toString() +
+            " - Attempt=" + getApplicationAttempt() +
+            ", Superstep=" + getSuperstep());
+
+    // Algorithm:
+    // For each partition, dump vertices and messages
+    Path metadataFilePath =
+        new Path(getCheckpointBasePath(getSuperstep()) + "." +
+            getHostnamePartitionId() +
+            CHECKPOINT_METADATA_POSTFIX);
+    Path verticesFilePath =
+        new Path(getCheckpointBasePath(getSuperstep()) + "." +
+            getHostnamePartitionId() +
+            CHECKPOINT_VERTICES_POSTFIX);
+    Path validFilePath =
+        new Path(getCheckpointBasePath(getSuperstep()) + "." +
+            getHostnamePartitionId() +
+            CHECKPOINT_VALID_POSTFIX);
+
+    // Remove these files if they already exist (shouldn't though, unless
+    // of previous failure of this worker)
+    if (getFs().delete(validFilePath, false)) {
+      LOG.warn("storeCheckpoint: Removed valid file " +
+          validFilePath);
+    }
+    if (getFs().delete(metadataFilePath, false)) {
+      LOG.warn("storeCheckpoint: Removed metadata file " +
+          metadataFilePath);
+    }
+    if (getFs().delete(verticesFilePath, false)) {
+      LOG.warn("storeCheckpoint: Removed file " + verticesFilePath);
+    }
+
+    FSDataOutputStream verticesOutputStream =
+        getFs().create(verticesFilePath);
+    ByteArrayOutputStream metadataByteStream = new ByteArrayOutputStream();
+    DataOutput metadataOutput = new DataOutputStream(metadataByteStream);
+    for (Partition<I, V, E, M> partition :
+        getPartitionStore().getPartitions()) {
+      long startPos = verticesOutputStream.getPos();
+      partition.write(verticesOutputStream);
+      // write messages
+      getServerData().getCurrentMessageStore().writePartition(
+          verticesOutputStream, partition.getId());
+      // Write the metadata for this partition
+      // Format:
+      // <index count>
+      //   <index 0 start pos><partition id>
+      //   <index 1 start pos><partition id>
+      metadataOutput.writeLong(startPos);
+      metadataOutput.writeInt(partition.getId());
+      if (LOG.isDebugEnabled()) {
+        LOG.debug("storeCheckpoint: Vertex file starting " +
+            "offset = " + startPos + ", length = " +
+            (verticesOutputStream.getPos() - startPos) +
+            ", partition = " + partition.toString());
+      }
+      getContext().progress();
+    }
+    // Metadata is buffered and written at the end since it's small and
+    // needs to know how many partitions this worker owns
+    FSDataOutputStream metadataOutputStream =
+        getFs().create(metadataFilePath);
+    metadataOutputStream.writeInt(getPartitionStore().getNumPartitions());
+    metadataOutputStream.write(metadataByteStream.toByteArray());
+    metadataOutputStream.close();
+    verticesOutputStream.close();
+    if (LOG.isInfoEnabled()) {
+      LOG.info("storeCheckpoint: Finished metadata (" +
+          metadataFilePath + ") and vertices (" + verticesFilePath + ").");
+    }
+
+    getFs().createNewFile(validFilePath);
+
+    // Notify master that checkpoint is stored
+    String workerWroteCheckpoint =
+        getWorkerWroteCheckpointPath(getApplicationAttempt(),
+            getSuperstep()) + "/" + getHostnamePartitionId();
+    try {
+      getZkExt().createExt(workerWroteCheckpoint,
+          new byte[0],
+          Ids.OPEN_ACL_UNSAFE,
+          CreateMode.PERSISTENT,
+          true);
+    } catch (KeeperException.NodeExistsException e) {
+      LOG.warn("finishSuperstep: wrote checkpoint worker path " +
+          workerWroteCheckpoint + " already exists!");
+    } catch (KeeperException e) {
+      throw new IllegalStateException("Creating " + workerWroteCheckpoint +
+          " failed with KeeperException", e);
+    } catch (InterruptedException e) {
+      throw new IllegalStateException("Creating " +
+          workerWroteCheckpoint +
+          " failed with InterruptedException", e);
+    }
+  }
+
+  @Override
+  public VertexEdgeCount loadCheckpoint(long superstep) {
+    try {
+      // clear old message stores
+      getServerData().getIncomingMessageStore().clearAll();
+      getServerData().getCurrentMessageStore().clearAll();
+    } catch (IOException e) {
+      throw new RuntimeException(
+          "loadCheckpoint: Failed to clear message stores ", e);
+    }
+
+    // Algorithm:
+    // Examine all the partition owners and load the ones
+    // that match my hostname and id from the master designated checkpoint
+    // prefixes.
+    long startPos = 0;
+    int loadedPartitions = 0;
+    for (PartitionOwner partitionOwner :
+      workerGraphPartitioner.getPartitionOwners()) {
+      if (partitionOwner.getWorkerInfo().equals(getWorkerInfo())) {
+        String metadataFile =
+            partitionOwner.getCheckpointFilesPrefix() +
+            CHECKPOINT_METADATA_POSTFIX;
+        String partitionsFile =
+            partitionOwner.getCheckpointFilesPrefix() +
+            CHECKPOINT_VERTICES_POSTFIX;
+        try {
+          int partitionId = -1;
+          DataInputStream metadataStream =
+              getFs().open(new Path(metadataFile));
+          int partitions = metadataStream.readInt();
+          for (int i = 0; i < partitions; ++i) {
+            startPos = metadataStream.readLong();
+            partitionId = metadataStream.readInt();
+            if (partitionId == partitionOwner.getPartitionId()) {
+              break;
+            }
+          }
+          if (partitionId != partitionOwner.getPartitionId()) {
+            throw new IllegalStateException(
+                "loadCheckpoint: " + partitionOwner +
+                " not found!");
+          }
+          metadataStream.close();
+          Partition<I, V, E, M> partition =
+              getConfiguration().createPartition(partitionId, getContext());
+          DataInputStream partitionsStream =
+              getFs().open(new Path(partitionsFile));
+          if (partitionsStream.skip(startPos) != startPos) {
+            throw new IllegalStateException(
+                "loadCheckpoint: Failed to skip " + startPos +
+                " on " + partitionsFile);
+          }
+          partition.readFields(partitionsStream);
+          if (partitionsStream.readBoolean()) {
+            getServerData().getCurrentMessageStore().readFieldsForPartition(
+                partitionsStream, partitionId);
+          }
+          partitionsStream.close();
+          if (LOG.isInfoEnabled()) {
+            LOG.info("loadCheckpoint: Loaded partition " +
+                partition);
+          }
+          if (getPartitionStore().hasPartition(partitionId)) {
+            throw new IllegalStateException(
+                "loadCheckpoint: Already has partition owner " +
+                    partitionOwner);
+          }
+          getPartitionStore().addPartition(partition);
+          getContext().progress();
+          ++loadedPartitions;
+        } catch (IOException e) {
+          throw new RuntimeException(
+              "loadCheckpoint: Failed to get partition owner " +
+                  partitionOwner, e);
+        }
+      }
+    }
+    if (LOG.isInfoEnabled()) {
+      LOG.info("loadCheckpoint: Loaded " + loadedPartitions +
+          " partitions of out " +
+          workerGraphPartitioner.getPartitionOwners().size() +
+          " total.");
+    }
+
+    // Load global statistics
+    GlobalStats globalStats = null;
+    String finalizedCheckpointPath =
+        getCheckpointBasePath(superstep) + CHECKPOINT_FINALIZED_POSTFIX;
+    try {
+      DataInputStream finalizedStream =
+          getFs().open(new Path(finalizedCheckpointPath));
+      globalStats = new GlobalStats();
+      globalStats.readFields(finalizedStream);
+    } catch (IOException e) {
+      throw new IllegalStateException(
+          "loadCheckpoint: Failed to load global statistics", e);
+    }
+
+    // Communication service needs to setup the connections prior to
+    // processing vertices
+/*if[HADOOP_NON_SECURE]
+    workerClient.setup();
+else[HADOOP_NON_SECURE]*/
+    workerClient.setup(getConfiguration().authenticate());
+/*end[HADOOP_NON_SECURE]*/
+    return new VertexEdgeCount(globalStats.getVertexCount(),
+        globalStats.getEdgeCount());
+  }
+
+  /**
+   * Send the worker partitions to their destination workers
+   *
+   * @param workerPartitionMap Map of worker info to the partitions stored
+   *        on this worker to be sent
+   */
+  private void sendWorkerPartitions(
+      Map<WorkerInfo, List<Integer>> workerPartitionMap) {
+    List<Entry<WorkerInfo, List<Integer>>> randomEntryList =
+        new ArrayList<Entry<WorkerInfo, List<Integer>>>(
+            workerPartitionMap.entrySet());
+    Collections.shuffle(randomEntryList);
+    WorkerClientRequestProcessor<I, V, E, M> workerClientRequestProcessor =
+        new NettyWorkerClientRequestProcessor<I, V, E, M>(getContext(),
+            getConfiguration(), this);
+    for (Entry<WorkerInfo, List<Integer>> workerPartitionList :
+      randomEntryList) {
+      for (Integer partitionId : workerPartitionList.getValue()) {
+        Partition<I, V, E, M> partition =
+            getPartitionStore().removePartition(partitionId);
+        if (partition == null) {
+          throw new IllegalStateException(
+              "sendWorkerPartitions: Couldn't find partition " +
+                  partitionId + " to send to " +
+                  workerPartitionList.getKey());
+        }
+        if (LOG.isInfoEnabled()) {
+          LOG.info("sendWorkerPartitions: Sending worker " +
+              workerPartitionList.getKey() + " partition " +
+              partitionId);
+        }
+        workerClientRequestProcessor.sendPartitionRequest(
+            workerPartitionList.getKey(),
+            partition);
+      }
+    }
+
+
+    try {
+      workerClientRequestProcessor.flush();
+      workerClient.waitAllRequests();
+    } catch (IOException e) {
+      throw new IllegalStateException("sendWorkerPartitions: Flush failed", e);
+    }
+    String myPartitionExchangeDonePath =
+        getPartitionExchangeWorkerPath(
+            getApplicationAttempt(), getSuperstep(), getWorkerInfo());
+    try {
+      getZkExt().createExt(myPartitionExchangeDonePath,
+          null,
+          Ids.OPEN_ACL_UNSAFE,
+          CreateMode.PERSISTENT,
+          true);
+    } catch (KeeperException e) {
+      throw new IllegalStateException(
+          "sendWorkerPartitions: KeeperException to create " +
+              myPartitionExchangeDonePath, e);
+    } catch (InterruptedException e) {
+      throw new IllegalStateException(
+          "sendWorkerPartitions: InterruptedException to create " +
+              myPartitionExchangeDonePath, e);
+    }
+    if (LOG.isInfoEnabled()) {
+      LOG.info("sendWorkerPartitions: Done sending all my partitions.");
+    }
+  }
+
+  @Override
+  public final void exchangeVertexPartitions(
+      Collection<? extends PartitionOwner> masterSetPartitionOwners) {
+    // 1. Fix the addresses of the partition ids if they have changed.
+    // 2. Send all the partitions to their destination workers in a random
+    //    fashion.
+    // 3. Notify completion with a ZooKeeper stamp
+    // 4. Wait for all my dependencies to be done (if any)
+    // 5. Add the partitions to myself.
+    PartitionExchange partitionExchange =
+        workerGraphPartitioner.updatePartitionOwners(
+            getWorkerInfo(), masterSetPartitionOwners, getPartitionStore());
+    workerClient.openConnections();
+
+    Map<WorkerInfo, List<Integer>> sendWorkerPartitionMap =
+        partitionExchange.getSendWorkerPartitionMap();
+    if (!getPartitionStore().isEmpty()) {
+      sendWorkerPartitions(sendWorkerPartitionMap);
+    }
+
+    Set<WorkerInfo> myDependencyWorkerSet =
+        partitionExchange.getMyDependencyWorkerSet();
+    Set<String> workerIdSet = new HashSet<String>();
+    for (WorkerInfo tmpWorkerInfo : myDependencyWorkerSet) {
+      if (!workerIdSet.add(tmpWorkerInfo.getHostnameId())) {
+        throw new IllegalStateException(
+            "exchangeVertexPartitions: Duplicate entry " + tmpWorkerInfo);
+      }
+    }
+    if (myDependencyWorkerSet.isEmpty() && getPartitionStore().isEmpty()) {
+      if (LOG.isInfoEnabled()) {
+        LOG.info("exchangeVertexPartitions: Nothing to exchange, " +
+            "exiting early");
+      }
+      return;
+    }
+
+    String vertexExchangePath =
+        getPartitionExchangePath(getApplicationAttempt(), getSuperstep());
+    List<String> workerDoneList;
+    try {
+      while (true) {
+        workerDoneList = getZkExt().getChildrenExt(
+            vertexExchangePath, true, false, false);
+        workerIdSet.removeAll(workerDoneList);
+        if (workerIdSet.isEmpty()) {
+          break;
+        }
+        if (LOG.isInfoEnabled()) {
+          LOG.info("exchangeVertexPartitions: Waiting for workers " +
+              workerIdSet);
+        }
+        getPartitionExchangeChildrenChangedEvent().waitForever();
+        getPartitionExchangeChildrenChangedEvent().reset();
+      }
+    } catch (KeeperException e) {
+      throw new RuntimeException(e);
+    } catch (InterruptedException e) {
+      throw new RuntimeException(e);
+    }
+
+    if (LOG.isInfoEnabled()) {
+      LOG.info("exchangeVertexPartitions: Done with exchange.");
+    }
+  }
+
+  /**
+   * Get event when the state of a partition exchange has changed.
+   *
+   * @return Event to check.
+   */
+  public final BspEvent getPartitionExchangeChildrenChangedEvent() {
+    return partitionExchangeChildrenChanged;
+  }
+
+  @Override
+  protected boolean processEvent(WatchedEvent event) {
+    boolean foundEvent = false;
+    if (event.getPath().startsWith(masterJobStatePath) &&
+        (event.getType() == EventType.NodeChildrenChanged)) {
+      if (LOG.isInfoEnabled()) {
+        LOG.info("processEvent: Job state changed, checking " +
+            "to see if it needs to restart");
+      }
+      JSONObject jsonObj = getJobState();
+      try {
+        if ((ApplicationState.valueOf(jsonObj.getString(JSONOBJ_STATE_KEY)) ==
+            ApplicationState.START_SUPERSTEP) &&
+            jsonObj.getLong(JSONOBJ_APPLICATION_ATTEMPT_KEY) !=
+            getApplicationAttempt()) {
+          LOG.fatal("processEvent: Worker will restart " +
+              "from command - " + jsonObj.toString());
+          System.exit(-1);
+        }
+      } catch (JSONException e) {
+        throw new RuntimeException(
+            "processEvent: Couldn't properly get job state from " +
+                jsonObj.toString());
+      }
+      foundEvent = true;
+    } else if (event.getPath().contains(PARTITION_EXCHANGE_DIR) &&
+        event.getType() == EventType.NodeChildrenChanged) {
+      if (LOG.isInfoEnabled()) {
+        LOG.info("processEvent : partitionExchangeChildrenChanged " +
+            "(at least one worker is done sending partitions)");
+      }
+      partitionExchangeChildrenChanged.signal();
+      foundEvent = true;
+    }
+
+    return foundEvent;
+  }
+
+  @Override
+  public WorkerInfo getWorkerInfo() {
+    return workerInfo;
+  }
+
+  @Override
+  public PartitionStore<I, V, E, M> getPartitionStore() {
+    return getServerData().getPartitionStore();
+  }
+
+  @Override
+  public PartitionOwner getVertexPartitionOwner(I vertexId) {
+    return workerGraphPartitioner.getPartitionOwner(vertexId);
+  }
+
+  @Override
+  public Iterable<? extends PartitionOwner> getPartitionOwners() {
+    return workerGraphPartitioner.getPartitionOwners();
+  }
+
+  @Override
+  public Partition<I, V, E, M> getPartition(I vertexId) {
+    return getPartitionStore().getPartition(getPartitionId(vertexId));
+  }
+
+  @Override
+  public Integer getPartitionId(I vertexId) {
+    PartitionOwner partitionOwner = getVertexPartitionOwner(vertexId);
+    return partitionOwner.getPartitionId();
+  }
+
+  @Override
+  public boolean hasPartition(Integer partitionId) {
+    return getPartitionStore().hasPartition(partitionId);
+  }
+
+  @Override
+  public Vertex<I, V, E, M> getVertex(I vertexId) {
+    PartitionOwner partitionOwner = getVertexPartitionOwner(vertexId);
+    if (getPartitionStore().hasPartition(partitionOwner.getPartitionId())) {
+      return getPartitionStore().getPartition(
+          partitionOwner.getPartitionId()).getVertex(vertexId);
+    } else {
+      return null;
+    }
+  }
+
+  @Override
+  public ServerData<I, V, E, M> getServerData() {
+    return workerServer.getServerData();
+  }
+
+  @Override
+  public WorkerAggregatorHandler getAggregatorHandler() {
+    return aggregatorHandler;
+  }
+
+  @Override
+  public void prepareSuperstep() {
+    if (getSuperstep() != INPUT_SUPERSTEP) {
+      aggregatorHandler.prepareSuperstep(workerAggregatorRequestProcessor);
+    }
+  }
+}

http://git-wip-us.apache.org/repos/asf/giraph/blob/1684891e/giraph-core/src/main/java/org/apache/giraph/worker/DefaultWorkerContext.java
----------------------------------------------------------------------
diff --git a/giraph-core/src/main/java/org/apache/giraph/worker/DefaultWorkerContext.java b/giraph-core/src/main/java/org/apache/giraph/worker/DefaultWorkerContext.java
new file mode 100644
index 0000000..0ffdc0f
--- /dev/null
+++ b/giraph-core/src/main/java/org/apache/giraph/worker/DefaultWorkerContext.java
@@ -0,0 +1,41 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.giraph.worker;
+
+/**
+ * A dumb implementation of {@link WorkerContext}. This is the default
+ * implementation when no WorkerContext is defined by the user. It does
+ * nothing.
+ */
+public class DefaultWorkerContext extends WorkerContext {
+
+  @Override
+  public void preApplication()
+    throws InstantiationException, IllegalAccessException {
+  }
+
+  @Override
+  public void postApplication() { }
+
+  @Override
+  public void preSuperstep() { }
+
+  @Override
+  public void postSuperstep() { }
+}

http://git-wip-us.apache.org/repos/asf/giraph/blob/1684891e/giraph-core/src/main/java/org/apache/giraph/worker/EdgeInputSplitsCallable.java
----------------------------------------------------------------------
diff --git a/giraph-core/src/main/java/org/apache/giraph/worker/EdgeInputSplitsCallable.java b/giraph-core/src/main/java/org/apache/giraph/worker/EdgeInputSplitsCallable.java
new file mode 100644
index 0000000..23e2ff7
--- /dev/null
+++ b/giraph-core/src/main/java/org/apache/giraph/worker/EdgeInputSplitsCallable.java
@@ -0,0 +1,170 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.giraph.worker;
+
+import org.apache.giraph.conf.ImmutableClassesGiraphConfiguration;
+import org.apache.giraph.io.EdgeInputFormat;
+import org.apache.giraph.io.EdgeReader;
+import org.apache.giraph.graph.EdgeWithSource;
+import org.apache.giraph.graph.GraphState;
+import org.apache.giraph.graph.VertexEdgeCount;
+import org.apache.giraph.metrics.GiraphMetrics;
+import org.apache.giraph.metrics.GiraphMetricsRegistry;
+import org.apache.giraph.utils.LoggerUtils;
+import org.apache.giraph.utils.MemoryUtils;
+import org.apache.giraph.zk.ZooKeeperExt;
+import org.apache.hadoop.io.Writable;
+import org.apache.hadoop.io.WritableComparable;
+import org.apache.hadoop.mapreduce.InputSplit;
+import org.apache.hadoop.mapreduce.Mapper;
+import org.apache.log4j.Level;
+import org.apache.log4j.Logger;
+
+import com.yammer.metrics.core.Counter;
+
+import java.io.IOException;
+import java.util.List;
+
+/**
+ * Load as many edge input splits as possible.
+ * Every thread will has its own instance of WorkerClientRequestProcessor
+ * to send requests.
+ *
+ * @param <I> Vertex id
+ * @param <V> Vertex value
+ * @param <E> Edge value
+ * @param <M> Message data
+ */
+public class EdgeInputSplitsCallable<I extends WritableComparable,
+    V extends Writable, E extends Writable, M extends Writable>
+    extends InputSplitsCallable<I, V, E, M> {
+  /** Class logger */
+  private static final Logger LOG = Logger.getLogger(
+      EdgeInputSplitsCallable.class);
+  /** Total edges loaded */
+  private long totalEdgesLoaded = 0;
+  /** Input split max edges (-1 denotes all) */
+  private final long inputSplitMaxEdges;
+
+  // Metrics
+  /** number of edges loaded counter */
+  private final Counter edgesLoadedCounter;
+
+  /**
+   * Constructor.
+   *
+   * @param context Context
+   * @param graphState Graph state
+   * @param configuration Configuration
+   * @param bspServiceWorker service worker
+   * @param inputSplitPathList List of the paths of the input splits
+   * @param workerInfo This worker's info
+   * @param zooKeeperExt Handle to ZooKeeperExt
+   */
+  public EdgeInputSplitsCallable(
+      Mapper<?, ?, ?, ?>.Context context,
+      GraphState<I, V, E, M> graphState,
+      ImmutableClassesGiraphConfiguration<I, V, E, M> configuration,
+      BspServiceWorker<I, V, E, M> bspServiceWorker,
+      List<String> inputSplitPathList,
+      WorkerInfo workerInfo,
+      ZooKeeperExt zooKeeperExt)  {
+    super(context, graphState, configuration, bspServiceWorker,
+        inputSplitPathList, workerInfo, zooKeeperExt,
+        BspServiceWorker.EDGE_INPUT_SPLIT_RESERVED_NODE,
+        BspServiceWorker.EDGE_INPUT_SPLIT_FINISHED_NODE,
+        bspServiceWorker.getEdgeInputSplitsEvents());
+
+    inputSplitMaxEdges = configuration.getInputSplitMaxEdges();
+
+    // Initialize Metrics
+    GiraphMetricsRegistry jobMetrics = GiraphMetrics.get().perJob();
+    edgesLoadedCounter = jobMetrics.getCounter(COUNTER_EDGES_LOADED);
+  }
+
+  /**
+   * Read edges from input split.  If testing, the user may request a
+   * maximum number of edges to be read from an input split.
+   *
+   * @param inputSplit Input split to process with edge reader
+   * @param graphState Current graph state
+   * @return Edges loaded from this input split
+   * @throws IOException
+   * @throws InterruptedException
+   */
+  @Override
+  protected VertexEdgeCount readInputSplit(
+      InputSplit inputSplit,
+      GraphState<I, V, E, M> graphState) throws IOException,
+      InterruptedException {
+    EdgeInputFormat<I, E> edgeInputFormat =
+        configuration.createEdgeInputFormat();
+    EdgeReader<I, E> edgeReader =
+        edgeInputFormat.createEdgeReader(inputSplit, context);
+    edgeReader.initialize(inputSplit, context);
+    long inputSplitEdgesLoaded = 0;
+    while (edgeReader.nextEdge()) {
+      EdgeWithSource<I, E> readerEdge = edgeReader.getCurrentEdge();
+      if (readerEdge.getSourceVertexId() == null) {
+        throw new IllegalArgumentException(
+            "readInputSplit: Edge reader returned an edge " +
+                "without a source vertex id!  - " + readerEdge);
+      }
+      if (readerEdge.getEdge().getTargetVertexId() == null) {
+        throw new IllegalArgumentException(
+            "readInputSplit: Edge reader returned an edge " +
+                "without a target vertex id!  - " + readerEdge);
+      }
+      if (readerEdge.getEdge().getValue() == null) {
+        throw new IllegalArgumentException(
+            "readInputSplit: Edge reader returned an edge " +
+                "without a value!  - " + readerEdge);
+      }
+
+      graphState.getWorkerClientRequestProcessor().addEdgeRequest(
+          readerEdge.getSourceVertexId(), readerEdge.getEdge());
+      context.progress(); // do this before potential data transfer
+      ++inputSplitEdgesLoaded;
+
+      // Update status every 1M edges
+      if (((inputSplitEdgesLoaded + totalEdgesLoaded) % 1000000) == 0) {
+        LoggerUtils.setStatusAndLog(context, LOG, Level.INFO,
+            "readInputSplit: Loaded " +
+                (inputSplitEdgesLoaded + totalEdgesLoaded) + " edges " +
+                MemoryUtils.getRuntimeMemoryStats());
+      }
+
+      // For sampling, or to limit outlier input splits, the number of
+      // records per input split can be limited
+      if (inputSplitMaxEdges > 0 &&
+          inputSplitEdgesLoaded >= inputSplitMaxEdges) {
+        if (LOG.isInfoEnabled()) {
+          LOG.info("readInputSplit: Leaving the input " +
+              "split early, reached maximum edges " +
+              inputSplitEdgesLoaded);
+        }
+        break;
+      }
+    }
+    edgeReader.close();
+    totalEdgesLoaded += inputSplitEdgesLoaded;
+    edgesLoadedCounter.inc(inputSplitEdgesLoaded);
+    return new VertexEdgeCount(0, inputSplitEdgesLoaded);
+  }
+}

http://git-wip-us.apache.org/repos/asf/giraph/blob/1684891e/giraph-core/src/main/java/org/apache/giraph/worker/EdgeInputSplitsCallableFactory.java
----------------------------------------------------------------------
diff --git a/giraph-core/src/main/java/org/apache/giraph/worker/EdgeInputSplitsCallableFactory.java b/giraph-core/src/main/java/org/apache/giraph/worker/EdgeInputSplitsCallableFactory.java
new file mode 100644
index 0000000..1a9a744
--- /dev/null
+++ b/giraph-core/src/main/java/org/apache/giraph/worker/EdgeInputSplitsCallableFactory.java
@@ -0,0 +1,95 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.giraph.worker;
+
+import org.apache.giraph.conf.ImmutableClassesGiraphConfiguration;
+import org.apache.giraph.graph.GraphState;
+import org.apache.giraph.zk.ZooKeeperExt;
+import org.apache.hadoop.io.Writable;
+import org.apache.hadoop.io.WritableComparable;
+import org.apache.hadoop.mapreduce.Mapper;
+
+import java.util.List;
+
+/**
+ * Factory for {@link EdgeInputSplitsCallable}s.
+ *
+ * @param <I> Vertex id
+ * @param <V> Vertex value
+ * @param <E> Edge value
+ * @param <M> Message data
+ */
+public class EdgeInputSplitsCallableFactory<I extends WritableComparable,
+    V extends Writable, E extends Writable, M extends Writable>
+    implements InputSplitsCallableFactory<I, V, E, M> {
+  /** Mapper context. */
+  private final Mapper<?, ?, ?, ?>.Context context;
+  /** Graph state. */
+  private final GraphState<I, V, E, M> graphState;
+  /** Configuration. */
+  private final ImmutableClassesGiraphConfiguration<I, V, E, M> configuration;
+  /** {@link BspServiceWorker} we're running on. */
+  private final BspServiceWorker<I, V, E, M> bspServiceWorker;
+  /** List of input split paths. */
+  private final List<String> inputSplitPathList;
+  /** Worker info. */
+  private final WorkerInfo workerInfo;
+  /** {@link ZooKeeperExt} for this worker. */
+  private final ZooKeeperExt zooKeeperExt;
+
+  /**
+   * Constructor.
+   *
+   * @param context Mapper context
+   * @param graphState Graph state
+   * @param configuration Configuration
+   * @param bspServiceWorker Calling {@link BspServiceWorker}
+   * @param inputSplitPathList List of input split paths
+   * @param workerInfo Worker info
+   * @param zooKeeperExt {@link ZooKeeperExt} for this worker
+   */
+  public EdgeInputSplitsCallableFactory(
+      Mapper<?, ?, ?, ?>.Context context,
+      GraphState<I, V, E, M> graphState,
+      ImmutableClassesGiraphConfiguration<I, V, E, M> configuration,
+      BspServiceWorker<I, V, E, M> bspServiceWorker,
+      List<String> inputSplitPathList,
+      WorkerInfo workerInfo,
+      ZooKeeperExt zooKeeperExt) {
+    this.context = context;
+    this.graphState = graphState;
+    this.configuration = configuration;
+    this.bspServiceWorker = bspServiceWorker;
+    this.inputSplitPathList = inputSplitPathList;
+    this.workerInfo = workerInfo;
+    this.zooKeeperExt = zooKeeperExt;
+  }
+
+  @Override
+  public InputSplitsCallable<I, V, E, M> newCallable() {
+    return new EdgeInputSplitsCallable<I, V, E, M>(
+        context,
+        graphState,
+        configuration,
+        bspServiceWorker,
+        inputSplitPathList,
+        workerInfo,
+        zooKeeperExt);
+  }
+}

http://git-wip-us.apache.org/repos/asf/giraph/blob/1684891e/giraph-core/src/main/java/org/apache/giraph/worker/InputSplitPathOrganizer.java
----------------------------------------------------------------------
diff --git a/giraph-core/src/main/java/org/apache/giraph/worker/InputSplitPathOrganizer.java b/giraph-core/src/main/java/org/apache/giraph/worker/InputSplitPathOrganizer.java
new file mode 100644
index 0000000..b82da7d
--- /dev/null
+++ b/giraph-core/src/main/java/org/apache/giraph/worker/InputSplitPathOrganizer.java
@@ -0,0 +1,195 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.giraph.worker;
+
+import com.google.common.collect.Lists;
+import java.io.ByteArrayInputStream;
+import java.io.DataInputStream;
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.List;
+import java.util.Iterator;
+import org.apache.giraph.zk.ZooKeeperExt;
+import org.apache.hadoop.io.Text;
+import org.apache.zookeeper.KeeperException;
+
+/**
+ * Utility class to extract the list of InputSplits from the
+ * ZooKeeper tree of "claimable splits" the master created,
+ * and to sort the list to favor local data blocks.
+ *
+ * This class provides an Iterator for the list the worker will
+ * claim splits from, making all sorting and data-code locality
+ * processing done here invisible to callers. The aim is to cut
+ * down on the number of ZK reads workers perform before locating
+ * an unclaimed InputSplit.
+ */
+public class InputSplitPathOrganizer implements Iterable<String> {
+  /** The worker's local ZooKeeperExt ref */
+  private final ZooKeeperExt zooKeeper;
+  /** The List of InputSplit znode paths */
+  private final List<String> pathList;
+  /** The worker's hostname */
+  private final String hostName;
+  /** The adjusted base offset by which to iterate on the path list */
+  private int baseOffset;
+
+  /**
+   * Constructor
+   *
+   * @param zooKeeper the worker's ZkExt
+   * @param zkPathList the path to read from
+   * @param hostName the worker's host name (for matching)
+   * @param port the port number for this worker
+   */
+  public InputSplitPathOrganizer(final ZooKeeperExt zooKeeper,
+    final String zkPathList, final String hostName, final int port)
+    throws KeeperException, InterruptedException {
+    this(zooKeeper, zooKeeper.getChildrenExt(zkPathList, false, false, true),
+        hostName, port);
+  }
+
+  /**
+   * Constructor
+   *
+   * @param zooKeeper the worker's ZkExt
+   * @param inputSplitPathList path of input splits to read from
+   * @param hostName the worker's host name (for matching)
+   * @param port the port number for this worker
+   */
+  public InputSplitPathOrganizer(
+      final ZooKeeperExt zooKeeper, final List<String> inputSplitPathList,
+      final String hostName, final int port)
+    throws KeeperException, InterruptedException {
+    this.zooKeeper = zooKeeper;
+    this.pathList = Lists.newArrayList(inputSplitPathList);
+    this.hostName = hostName;
+    this.baseOffset = 0; // set later after switching out local paths
+    prioritizeLocalInputSplits(port);
+  }
+
+ /**
+  * Re-order list of InputSplits so files local to this worker node's
+  * disk are the first it will iterate over when attempting to claim
+  * a split to read. This will increase locality of data reads with greater
+  * probability as the % of total nodes in the cluster hosting data and workers
+  * BOTH increase towards 100%. Replication increases our chances of a "hit."
+  *
+  * @param port the port number for hashing unique iteration indexes for all
+  *             workers, even those sharing the same host node.
+  */
+  private void prioritizeLocalInputSplits(final int port) {
+    List<String> sortedList = new ArrayList<String>();
+    String hosts = null;
+    for (Iterator<String> iterator = pathList.iterator(); iterator.hasNext();) {
+      final String path = iterator.next();
+      try {
+        hosts = getLocationsFromZkInputSplitData(path);
+      } catch (IOException ioe) {
+        hosts = null; // no problem, just don't sort this entry
+      } catch (KeeperException ke) {
+        hosts = null;
+      } catch (InterruptedException ie) {
+        hosts = null;
+      }
+      if (hosts != null && hosts.contains(hostName)) {
+        sortedList.add(path); // collect the local block
+        iterator.remove(); // remove local block from list
+      }
+    }
+    // shuffle the local blocks in case several workers exist on this host
+    Collections.shuffle(sortedList);
+    // determine the hash-based offset for this worker to iterate from
+    // and place the local blocks into the list at that index, if any
+    final int temp = hostName.hashCode() + (19 * port);
+    if (pathList.size() != 0) {
+      baseOffset = Math.abs(temp % pathList.size());
+    }
+    // re-insert local paths at "adjusted index zero" for caller to iterate on
+    pathList.addAll(baseOffset, sortedList);
+  }
+
+  /**
+   * Utility for extracting locality data from an InputSplit ZNode.
+   *
+   * @param zkSplitPath the input split path to attempt to read
+   * ZNode locality data from for this InputSplit.
+   * @return a String of hostnames from ZNode data, or throws
+   */
+  private String getLocationsFromZkInputSplitData(String zkSplitPath)
+    throws IOException, KeeperException, InterruptedException {
+    byte[] locationData = zooKeeper.getData(zkSplitPath, false, null);
+    DataInputStream inputStream =
+      new DataInputStream(new ByteArrayInputStream(locationData));
+    // only read the "first" entry in the znode data, the locations
+    return Text.readString(inputStream);
+  }
+
+  /**
+   * Utility accessor for Input Split znode path list size
+   *
+   * @return the size of <code>this.pathList</code>
+   */
+  public int getPathListSize() {
+    return this.pathList.size();
+  }
+
+  /**
+   * Iterator for the pathList
+   *
+   * @return an iterator for our list of input split paths
+   */
+  public Iterator<String> iterator() {
+    return new PathListIterator();
+  }
+
+  /**
+   * Iterator for path list that handles the locality and hash offsetting.
+   */
+  public class PathListIterator implements Iterator<String> {
+    /** the current iterator index */
+    private int currentIndex = 0;
+
+    /**
+     *  Do we have more list to iterate upon?
+     *
+     *  @return true if more path strings are available
+     */
+    @Override
+    public boolean hasNext() {
+      return currentIndex < pathList.size();
+    }
+
+    /**
+     * Return the next pathList element
+     *
+     * @return the next input split path
+     */
+    @Override
+    public String next() {
+      return pathList.get((baseOffset + currentIndex++) % pathList.size());
+    }
+
+    /** Just a placeholder; should not do anything! */
+    @Override
+    public void remove() {
+      throw new UnsupportedOperationException("Remove is not allowed.");
+    }
+  }
+}