You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@giraph.apache.org by cl...@apache.org on 2013/08/26 22:15:48 UTC

[1/2] GIRAPH-732

Updated Branches:
  refs/heads/trunk fa6b75495 -> ae01f0399


http://git-wip-us.apache.org/repos/asf/giraph/blob/ae01f039/giraph-core/src/main/java/org/apache/giraph/worker/BspServiceWorker.java.orig
----------------------------------------------------------------------
diff --git a/giraph-core/src/main/java/org/apache/giraph/worker/BspServiceWorker.java.orig b/giraph-core/src/main/java/org/apache/giraph/worker/BspServiceWorker.java.orig
new file mode 100644
index 0000000..9311fbd
--- /dev/null
+++ b/giraph-core/src/main/java/org/apache/giraph/worker/BspServiceWorker.java.orig
@@ -0,0 +1,1535 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.giraph.worker;
+
+import org.apache.giraph.bsp.ApplicationState;
+import org.apache.giraph.bsp.BspService;
+import org.apache.giraph.bsp.CentralizedServiceWorker;
+import org.apache.giraph.comm.ServerData;
+import org.apache.giraph.comm.WorkerClient;
+import org.apache.giraph.comm.WorkerClientRequestProcessor;
+import org.apache.giraph.comm.WorkerServer;
+import org.apache.giraph.comm.aggregators.WorkerAggregatorRequestProcessor;
+import org.apache.giraph.comm.netty.NettyWorkerAggregatorRequestProcessor;
+import org.apache.giraph.comm.netty.NettyWorkerClient;
+import org.apache.giraph.comm.netty.NettyWorkerClientRequestProcessor;
+import org.apache.giraph.comm.netty.NettyWorkerServer;
+import org.apache.giraph.conf.GiraphConstants;
+import org.apache.giraph.conf.ImmutableClassesGiraphConfiguration;
+import org.apache.giraph.graph.AddressesAndPartitionsWritable;
+import org.apache.giraph.graph.FinishedSuperstepStats;
+import org.apache.giraph.graph.GlobalStats;
+import org.apache.giraph.graph.GraphTaskManager;
+import org.apache.giraph.graph.InputSplitEvents;
+import org.apache.giraph.graph.InputSplitPaths;
+import org.apache.giraph.graph.Vertex;
+import org.apache.giraph.graph.VertexEdgeCount;
+import org.apache.giraph.io.VertexOutputFormat;
+import org.apache.giraph.io.VertexWriter;
+import org.apache.giraph.io.superstep_output.SuperstepOutput;
+import org.apache.giraph.master.MasterInfo;
+import org.apache.giraph.master.SuperstepClasses;
+import org.apache.giraph.metrics.GiraphMetrics;
+import org.apache.giraph.metrics.GiraphTimer;
+import org.apache.giraph.metrics.GiraphTimerContext;
+import org.apache.giraph.metrics.ResetSuperstepMetricsObserver;
+import org.apache.giraph.metrics.SuperstepMetricsRegistry;
+import org.apache.giraph.metrics.WorkerSuperstepMetrics;
+import org.apache.giraph.partition.Partition;
+import org.apache.giraph.partition.PartitionExchange;
+import org.apache.giraph.partition.PartitionOwner;
+import org.apache.giraph.partition.PartitionStats;
+import org.apache.giraph.partition.PartitionStore;
+import org.apache.giraph.partition.WorkerGraphPartitioner;
+import org.apache.giraph.utils.CallableFactory;
+import org.apache.giraph.utils.JMapHistoDumper;
+import org.apache.giraph.utils.LoggerUtils;
+import org.apache.giraph.utils.MemoryUtils;
+import org.apache.giraph.utils.ProgressableUtils;
+import org.apache.giraph.utils.WritableUtils;
+import org.apache.giraph.zk.BspEvent;
+import org.apache.giraph.zk.PredicateLock;
+import org.apache.hadoop.fs.FSDataOutputStream;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.io.Writable;
+import org.apache.hadoop.io.WritableComparable;
+import org.apache.hadoop.mapreduce.Mapper;
+import org.apache.hadoop.mapreduce.OutputCommitter;
+import org.apache.log4j.Level;
+import org.apache.log4j.Logger;
+import org.apache.zookeeper.CreateMode;
+import org.apache.zookeeper.KeeperException;
+import org.apache.zookeeper.WatchedEvent;
+import org.apache.zookeeper.Watcher.Event.EventType;
+import org.apache.zookeeper.ZooDefs.Ids;
+import org.apache.zookeeper.data.Stat;
+import org.json.JSONArray;
+import org.json.JSONException;
+import org.json.JSONObject;
+
+import com.google.common.collect.Iterables;
+import com.google.common.collect.Lists;
+import net.iharder.Base64;
+
+import java.io.ByteArrayOutputStream;
+import java.io.DataInputStream;
+import java.io.DataOutput;
+import java.io.DataOutputStream;
+import java.io.IOException;
+import java.nio.charset.Charset;
+import java.util.ArrayList;
+import java.util.Collection;
+import java.util.Collections;
+import java.util.HashSet;
+import java.util.LinkedList;
+import java.util.List;
+import java.util.Map;
+import java.util.Map.Entry;
+import java.util.Queue;
+import java.util.Set;
+import java.util.concurrent.ArrayBlockingQueue;
+import java.util.concurrent.Callable;
+import java.util.concurrent.TimeUnit;
+
+/**
+ * ZooKeeper-based implementation of {@link CentralizedServiceWorker}.
+ *
+ * @param <I> Vertex id
+ * @param <V> Vertex data
+ * @param <E> Edge data
+ */
+@SuppressWarnings("rawtypes")
+public class BspServiceWorker<I extends WritableComparable,
+    V extends Writable, E extends Writable>
+    extends BspService<I, V, E>
+    implements CentralizedServiceWorker<I, V, E>,
+    ResetSuperstepMetricsObserver {
+  /** Name of gauge for time spent waiting on other workers */
+  public static final String TIMER_WAIT_REQUESTS = "wait-requests-us";
+  /** Class logger */
+  private static final Logger LOG = Logger.getLogger(BspServiceWorker.class);
+  /** My process health znode */
+  private String myHealthZnode;
+  /** Worker info */
+  private final WorkerInfo workerInfo;
+  /** Worker graph partitioner */
+  private final WorkerGraphPartitioner<I, V, E> workerGraphPartitioner;
+
+  /** IPC Client */
+  private final WorkerClient<I, V, E> workerClient;
+  /** IPC Server */
+  private final WorkerServer<I, V, E> workerServer;
+  /** Request processor for aggregator requests */
+  private final WorkerAggregatorRequestProcessor
+  workerAggregatorRequestProcessor;
+  /** Master info */
+  private MasterInfo masterInfo = new MasterInfo();
+  /** List of workers */
+  private List<WorkerInfo> workerInfoList = Lists.newArrayList();
+  /** Have the partition exchange children (workers) changed? */
+  private final BspEvent partitionExchangeChildrenChanged;
+
+  /** Worker Context */
+  private final WorkerContext workerContext;
+
+  /** Handler for aggregators */
+  private final WorkerAggregatorHandler aggregatorHandler;
+
+  /** Superstep output */
+  private SuperstepOutput<I, V, E> superstepOutput;
+
+  /** array of observers to call back to */
+  private final WorkerObserver[] observers;
+
+  // Per-Superstep Metrics
+  /** Timer for WorkerContext#postSuperstep */
+  private GiraphTimer wcPostSuperstepTimer;
+  /** Time spent waiting on requests to finish */
+  private GiraphTimer waitRequestsTimer;
+
+  /**
+   * Constructor for setting up the worker.
+   *
+   * @param serverPortList ZooKeeper server port list
+   * @param sessionMsecTimeout Msecs to timeout connecting to ZooKeeper
+   * @param context Mapper context
+   * @param graphTaskManager GraphTaskManager for this compute node
+   * @throws IOException
+   * @throws InterruptedException
+   */
+  public BspServiceWorker(
+    String serverPortList,
+    int sessionMsecTimeout,
+    Mapper<?, ?, ?, ?>.Context context,
+    GraphTaskManager<I, V, E> graphTaskManager)
+    throws IOException, InterruptedException {
+    super(serverPortList, sessionMsecTimeout, context, graphTaskManager);
+    ImmutableClassesGiraphConfiguration<I, V, E> conf = getConfiguration();
+    partitionExchangeChildrenChanged = new PredicateLock(context);
+    registerBspEvent(partitionExchangeChildrenChanged);
+    workerGraphPartitioner =
+        getGraphPartitionerFactory().createWorkerGraphPartitioner();
+    workerInfo = new WorkerInfo();
+    workerServer = new NettyWorkerServer<I, V, E>(conf, this, context);
+    workerInfo.setInetSocketAddress(workerServer.getMyAddress());
+    workerInfo.setTaskId(getTaskPartition());
+    workerClient = new NettyWorkerClient<I, V, E>(context, conf, this);
+
+    workerAggregatorRequestProcessor =
+        new NettyWorkerAggregatorRequestProcessor(getContext(), conf, this);
+
+    aggregatorHandler = new WorkerAggregatorHandler(this, conf, context);
+
+    workerContext = conf.createWorkerContext();
+    workerContext.setWorkerAggregatorUsage(aggregatorHandler);
+
+    superstepOutput = conf.createSuperstepOutput(context);
+
+    if (conf.isJMapHistogramDumpEnabled()) {
+      conf.addWorkerObserverClass(JMapHistoDumper.class);
+    }
+    observers = conf.createWorkerObservers();
+
+    GiraphMetrics.get().addSuperstepResetObserver(this);
+  }
+
+  @Override
+  public void newSuperstep(SuperstepMetricsRegistry superstepMetrics) {
+    waitRequestsTimer = new GiraphTimer(superstepMetrics,
+        TIMER_WAIT_REQUESTS, TimeUnit.MICROSECONDS);
+    wcPostSuperstepTimer = new GiraphTimer(superstepMetrics,
+        "worker-context-post-superstep", TimeUnit.MICROSECONDS);
+  }
+
+  @Override
+  public WorkerContext getWorkerContext() {
+    return workerContext;
+  }
+
+  @Override
+  public WorkerObserver[] getWorkerObservers() {
+    return observers;
+  }
+
+  @Override
+  public WorkerClient<I, V, E> getWorkerClient() {
+    return workerClient;
+  }
+
+  /**
+   * Intended to check the health of the node.  For instance, can it ssh,
+   * dmesg, etc. For now, does nothing.
+   * TODO: Make this check configurable by the user (i.e. search dmesg for
+   * problems).
+   *
+   * @return True if healthy (always in this case).
+   */
+  public boolean isHealthy() {
+    return true;
+  }
+
+  /**
+   * Load the vertices/edges from input slits. Do this until all the
+   * InputSplits have been processed.
+   * All workers will try to do as many InputSplits as they can.  The master
+   * will monitor progress and stop this once all the InputSplits have been
+   * loaded and check-pointed.  Keep track of the last input split path to
+   * ensure the input split cache is flushed prior to marking the last input
+   * split complete.
+   *
+   * Use one or more threads to do the loading.
+   *
+   * @param inputSplitPathList List of input split paths
+   * @param inputSplitsCallableFactory Factory for {@link InputSplitsCallable}s
+   * @return Statistics of the vertices and edges loaded
+   * @throws InterruptedException
+   * @throws KeeperException
+   */
+  private VertexEdgeCount loadInputSplits(
+      List<String> inputSplitPathList,
+      CallableFactory<VertexEdgeCount> inputSplitsCallableFactory)
+    throws KeeperException, InterruptedException {
+    VertexEdgeCount vertexEdgeCount = new VertexEdgeCount();
+    // Determine how many threads to use based on the number of input splits
+    int maxInputSplitThreads = (inputSplitPathList.size() - 1) /
+        getConfiguration().getMaxWorkers() + 1;
+    int numThreads = Math.min(getConfiguration().getNumInputSplitsThreads(),
+        maxInputSplitThreads);
+    if (LOG.isInfoEnabled()) {
+      LOG.info("loadInputSplits: Using " + numThreads + " thread(s), " +
+          "originally " + getConfiguration().getNumInputSplitsThreads() +
+          " threads(s) for " + inputSplitPathList.size() + " total splits.");
+    }
+
+    List<VertexEdgeCount> results =
+        ProgressableUtils.getResultsWithNCallables(inputSplitsCallableFactory,
+            numThreads, "load-%d", getContext());
+    for (VertexEdgeCount result : results) {
+      vertexEdgeCount = vertexEdgeCount.incrVertexEdgeCount(result);
+    }
+
+    workerClient.waitAllRequests();
+    return vertexEdgeCount;
+  }
+
+
+  /**
+   * Load the vertices from the user-defined
+   * {@link org.apache.giraph.io.VertexReader}
+   *
+   * @return Count of vertices and edges loaded
+   */
+  private VertexEdgeCount loadVertices() throws KeeperException,
+      InterruptedException {
+    List<String> inputSplitPathList =
+        getZkExt().getChildrenExt(vertexInputSplitsPaths.getPath(),
+            false, false, true);
+
+    InputSplitPathOrganizer splitOrganizer =
+        new InputSplitPathOrganizer(getZkExt(),
+            inputSplitPathList, getWorkerInfo().getHostname(),
+            getConfiguration().useInputSplitLocality());
+    InputSplitsHandler splitsHandler = new InputSplitsHandler(
+        splitOrganizer,
+        getZkExt(),
+        getContext(),
+        BspService.VERTEX_INPUT_SPLIT_RESERVED_NODE,
+        BspService.VERTEX_INPUT_SPLIT_FINISHED_NODE);
+
+    VertexInputSplitsCallableFactory<I, V, E> inputSplitsCallableFactory =
+        new VertexInputSplitsCallableFactory<I, V, E>(
+            getConfiguration().createWrappedVertexInputFormat(),
+            getContext(),
+            getConfiguration(),
+            this,
+            splitsHandler,
+            getZkExt());
+
+    return loadInputSplits(inputSplitPathList, inputSplitsCallableFactory);
+  }
+
+  /**
+   * Load the edges from the user-defined
+   * {@link org.apache.giraph.io.EdgeReader}.
+   *
+   * @return Number of edges loaded
+   */
+  private long loadEdges() throws KeeperException, InterruptedException {
+    List<String> inputSplitPathList =
+        getZkExt().getChildrenExt(edgeInputSplitsPaths.getPath(),
+            false, false, true);
+
+    InputSplitPathOrganizer splitOrganizer =
+        new InputSplitPathOrganizer(getZkExt(),
+            inputSplitPathList, getWorkerInfo().getHostname(),
+            getConfiguration().useInputSplitLocality());
+    InputSplitsHandler splitsHandler = new InputSplitsHandler(
+        splitOrganizer,
+        getZkExt(),
+        getContext(),
+        BspService.EDGE_INPUT_SPLIT_RESERVED_NODE,
+        BspService.EDGE_INPUT_SPLIT_FINISHED_NODE);
+
+    EdgeInputSplitsCallableFactory<I, V, E> inputSplitsCallableFactory =
+        new EdgeInputSplitsCallableFactory<I, V, E>(
+            getConfiguration().createWrappedEdgeInputFormat(),
+            getContext(),
+            getConfiguration(),
+            this,
+            splitsHandler,
+            getZkExt());
+
+    return loadInputSplits(inputSplitPathList, inputSplitsCallableFactory).
+        getEdgeCount();
+  }
+
+  @Override
+  public MasterInfo getMasterInfo() {
+    return masterInfo;
+  }
+
+  @Override
+  public List<WorkerInfo> getWorkerInfoList() {
+    return workerInfoList;
+  }
+
+  /**
+   * Ensure the input splits are ready for processing
+   *
+   * @param inputSplitPaths Input split paths
+   * @param inputSplitEvents Input split events
+   */
+  private void ensureInputSplitsReady(InputSplitPaths inputSplitPaths,
+                                      InputSplitEvents inputSplitEvents) {
+    while (true) {
+      Stat inputSplitsReadyStat;
+      try {
+        inputSplitsReadyStat = getZkExt().exists(
+            inputSplitPaths.getAllReadyPath(), true);
+      } catch (KeeperException e) {
+        throw new IllegalStateException("ensureInputSplitsReady: " +
+            "KeeperException waiting on input splits", e);
+      } catch (InterruptedException e) {
+        throw new IllegalStateException("ensureInputSplitsReady: " +
+            "InterruptedException waiting on input splits", e);
+      }
+      if (inputSplitsReadyStat != null) {
+        break;
+      }
+      inputSplitEvents.getAllReadyChanged().waitForever();
+      inputSplitEvents.getAllReadyChanged().reset();
+    }
+  }
+
+  /**
+   * Wait for all workers to finish processing input splits.
+   *
+   * @param inputSplitPaths Input split paths
+   * @param inputSplitEvents Input split events
+   */
+  private void waitForOtherWorkers(InputSplitPaths inputSplitPaths,
+                                   InputSplitEvents inputSplitEvents) {
+    String workerInputSplitsDonePath =
+        inputSplitPaths.getDonePath() + "/" +
+            getWorkerInfo().getHostnameId();
+    try {
+      getZkExt().createExt(workerInputSplitsDonePath,
+          null,
+          Ids.OPEN_ACL_UNSAFE,
+          CreateMode.PERSISTENT,
+          true);
+    } catch (KeeperException e) {
+      throw new IllegalStateException("waitForOtherWorkers: " +
+          "KeeperException creating worker done splits", e);
+    } catch (InterruptedException e) {
+      throw new IllegalStateException("waitForOtherWorkers: " +
+          "InterruptedException creating worker done splits", e);
+    }
+    while (true) {
+      Stat inputSplitsDoneStat;
+      try {
+        inputSplitsDoneStat =
+            getZkExt().exists(inputSplitPaths.getAllDonePath(),
+                true);
+      } catch (KeeperException e) {
+        throw new IllegalStateException("waitForOtherWorkers: " +
+            "KeeperException waiting on worker done splits", e);
+      } catch (InterruptedException e) {
+        throw new IllegalStateException("waitForOtherWorkers: " +
+            "InterruptedException waiting on worker done splits", e);
+      }
+      if (inputSplitsDoneStat != null) {
+        break;
+      }
+      inputSplitEvents.getAllDoneChanged().waitForever();
+      inputSplitEvents.getAllDoneChanged().reset();
+    }
+  }
+
+  @Override
+  public FinishedSuperstepStats setup() {
+    // Unless doing a restart, prepare for computation:
+    // 1. Start superstep INPUT_SUPERSTEP (no computation)
+    // 2. Wait until the INPUT_SPLIT_ALL_READY_PATH node has been created
+    // 3. Process input splits until there are no more.
+    // 4. Wait until the INPUT_SPLIT_ALL_DONE_PATH node has been created
+    // 5. Process any mutations deriving from add edge requests
+    // 6. Wait for superstep INPUT_SUPERSTEP to complete.
+    if (getRestartedSuperstep() != UNSET_SUPERSTEP) {
+      setCachedSuperstep(getRestartedSuperstep());
+      return new FinishedSuperstepStats(0, false, 0, 0, true);
+    }
+
+    JSONObject jobState = getJobState();
+    if (jobState != null) {
+      try {
+        if ((ApplicationState.valueOf(jobState.getString(JSONOBJ_STATE_KEY)) ==
+            ApplicationState.START_SUPERSTEP) &&
+            jobState.getLong(JSONOBJ_SUPERSTEP_KEY) ==
+            getSuperstep()) {
+          if (LOG.isInfoEnabled()) {
+            LOG.info("setup: Restarting from an automated " +
+                "checkpointed superstep " +
+                getSuperstep() + ", attempt " +
+                getApplicationAttempt());
+          }
+          setRestartedSuperstep(getSuperstep());
+          return new FinishedSuperstepStats(0, false, 0, 0, true);
+        }
+      } catch (JSONException e) {
+        throw new RuntimeException(
+            "setup: Failed to get key-values from " +
+                jobState.toString(), e);
+      }
+    }
+
+    // Add the partitions that this worker owns
+    Collection<? extends PartitionOwner> masterSetPartitionOwners =
+        startSuperstep();
+    workerGraphPartitioner.updatePartitionOwners(
+        getWorkerInfo(), masterSetPartitionOwners, getPartitionStore());
+
+    /*if[HADOOP_NON_SECURE]
+      workerClient.setup();
+    else[HADOOP_NON_SECURE]*/
+    workerClient.setup(getConfiguration().authenticate());
+    /*end[HADOOP_NON_SECURE]*/
+
+    // Initialize aggregator at worker side during setup.
+    // Do this just before vertex and edge loading.
+    aggregatorHandler.prepareSuperstep(workerAggregatorRequestProcessor);
+
+    VertexEdgeCount vertexEdgeCount;
+
+    if (getConfiguration().hasVertexInputFormat()) {
+      // Ensure the vertex InputSplits are ready for processing
+      ensureInputSplitsReady(vertexInputSplitsPaths, vertexInputSplitsEvents);
+      getContext().progress();
+      try {
+        vertexEdgeCount = loadVertices();
+      } catch (InterruptedException e) {
+        throw new IllegalStateException(
+            "setup: loadVertices failed with InterruptedException", e);
+      } catch (KeeperException e) {
+        throw new IllegalStateException(
+            "setup: loadVertices failed with KeeperException", e);
+      }
+      getContext().progress();
+    } else {
+      vertexEdgeCount = new VertexEdgeCount();
+    }
+
+    if (getConfiguration().hasEdgeInputFormat()) {
+      // Ensure the edge InputSplits are ready for processing
+      ensureInputSplitsReady(edgeInputSplitsPaths, edgeInputSplitsEvents);
+      getContext().progress();
+      try {
+        vertexEdgeCount = vertexEdgeCount.incrVertexEdgeCount(0, loadEdges());
+      } catch (InterruptedException e) {
+        throw new IllegalStateException(
+            "setup: loadEdges failed with InterruptedException", e);
+      } catch (KeeperException e) {
+        throw new IllegalStateException(
+            "setup: loadEdges failed with KeeperException", e);
+      }
+      getContext().progress();
+    }
+
+    if (LOG.isInfoEnabled()) {
+      LOG.info("setup: Finally loaded a total of " + vertexEdgeCount);
+    }
+
+    if (getConfiguration().hasVertexInputFormat()) {
+      // Workers wait for each other to finish, coordinated by master
+      waitForOtherWorkers(vertexInputSplitsPaths, vertexInputSplitsEvents);
+    }
+
+    if (getConfiguration().hasEdgeInputFormat()) {
+      // Workers wait for each other to finish, coordinated by master
+      waitForOtherWorkers(edgeInputSplitsPaths, edgeInputSplitsEvents);
+    }
+
+    // Create remaining partitions owned by this worker.
+    for (PartitionOwner partitionOwner : masterSetPartitionOwners) {
+      if (partitionOwner.getWorkerInfo().equals(getWorkerInfo()) &&
+          !getPartitionStore().hasPartition(
+              partitionOwner.getPartitionId())) {
+        Partition<I, V, E> partition =
+            getConfiguration().createPartition(
+                partitionOwner.getPartitionId(), getContext());
+        getPartitionStore().addPartition(partition);
+      }
+    }
+
+    if (getConfiguration().hasEdgeInputFormat()) {
+      // Move edges from temporary storage to their source vertices.
+      getServerData().getEdgeStore().moveEdgesToVertices();
+    }
+
+    // Generate the partition stats for the input superstep and process
+    // if necessary
+    List<PartitionStats> partitionStatsList =
+        new ArrayList<PartitionStats>();
+    for (Integer partitionId : getPartitionStore().getPartitionIds()) {
+      Partition<I, V, E> partition =
+          getPartitionStore().getPartition(partitionId);
+      PartitionStats partitionStats =
+          new PartitionStats(partition.getId(),
+              partition.getVertexCount(),
+              0,
+              partition.getEdgeCount(),
+              0, 0);
+      partitionStatsList.add(partitionStats);
+      getPartitionStore().putPartition(partition);
+    }
+    workerGraphPartitioner.finalizePartitionStats(
+        partitionStatsList, getPartitionStore());
+
+    return finishSuperstep(partitionStatsList);
+  }
+
+  /**
+   * Register the health of this worker for a given superstep
+   *
+   * @param superstep Superstep to register health on
+   */
+  private void registerHealth(long superstep) {
+    JSONArray hostnamePort = new JSONArray();
+    hostnamePort.put(getHostname());
+
+    hostnamePort.put(workerInfo.getPort());
+
+    String myHealthPath = null;
+    if (isHealthy()) {
+      myHealthPath = getWorkerInfoHealthyPath(getApplicationAttempt(),
+          getSuperstep());
+    } else {
+      myHealthPath = getWorkerInfoUnhealthyPath(getApplicationAttempt(),
+          getSuperstep());
+    }
+    myHealthPath = myHealthPath + "/" + workerInfo.getHostnameId();
+    try {
+      myHealthZnode = getZkExt().createExt(
+          myHealthPath,
+          WritableUtils.writeToByteArray(workerInfo),
+          Ids.OPEN_ACL_UNSAFE,
+          CreateMode.EPHEMERAL,
+          true);
+    } catch (KeeperException.NodeExistsException e) {
+      LOG.warn("registerHealth: myHealthPath already exists (likely " +
+          "from previous failure): " + myHealthPath +
+          ".  Waiting for change in attempts " +
+          "to re-join the application");
+      getApplicationAttemptChangedEvent().waitForever();
+      if (LOG.isInfoEnabled()) {
+        LOG.info("registerHealth: Got application " +
+            "attempt changed event, killing self");
+      }
+      throw new IllegalStateException(
+          "registerHealth: Trying " +
+              "to get the new application attempt by killing self", e);
+    } catch (KeeperException e) {
+      throw new IllegalStateException("Creating " + myHealthPath +
+          " failed with KeeperException", e);
+    } catch (InterruptedException e) {
+      throw new IllegalStateException("Creating " + myHealthPath +
+          " failed with InterruptedException", e);
+    }
+    if (LOG.isInfoEnabled()) {
+      LOG.info("registerHealth: Created my health node for attempt=" +
+          getApplicationAttempt() + ", superstep=" +
+          getSuperstep() + " with " + myHealthZnode +
+          " and workerInfo= " + workerInfo);
+    }
+  }
+
+  /**
+   * Do this to help notify the master quicker that this worker has failed.
+   */
+  private void unregisterHealth() {
+    LOG.error("unregisterHealth: Got failure, unregistering health on " +
+        myHealthZnode + " on superstep " + getSuperstep());
+    try {
+      getZkExt().deleteExt(myHealthZnode, -1, false);
+    } catch (InterruptedException e) {
+      throw new IllegalStateException(
+          "unregisterHealth: InterruptedException - Couldn't delete " +
+              myHealthZnode, e);
+    } catch (KeeperException e) {
+      throw new IllegalStateException(
+          "unregisterHealth: KeeperException - Couldn't delete " +
+              myHealthZnode, e);
+    }
+  }
+
+  @Override
+  public void failureCleanup() {
+    unregisterHealth();
+  }
+
+  @Override
+  public Collection<? extends PartitionOwner> startSuperstep() {
+    // Algorithm:
+    // 1. Communication service will combine message from previous
+    //    superstep
+    // 2. Register my health for the next superstep.
+    // 3. Wait until the partition assignment is complete and get it
+    // 4. Get the aggregator values from the previous superstep
+    if (getSuperstep() != INPUT_SUPERSTEP) {
+      workerServer.prepareSuperstep();
+    }
+
+    registerHealth(getSuperstep());
+
+    String addressesAndPartitionsPath =
+        getAddressesAndPartitionsPath(getApplicationAttempt(),
+            getSuperstep());
+    AddressesAndPartitionsWritable addressesAndPartitions =
+        new AddressesAndPartitionsWritable(
+            workerGraphPartitioner.createPartitionOwner().getClass());
+    try {
+      while (getZkExt().exists(addressesAndPartitionsPath, true) ==
+          null) {
+        getAddressesAndPartitionsReadyChangedEvent().waitForever();
+        getAddressesAndPartitionsReadyChangedEvent().reset();
+      }
+      WritableUtils.readFieldsFromZnode(
+          getZkExt(),
+          addressesAndPartitionsPath,
+          false,
+          null,
+          addressesAndPartitions);
+    } catch (KeeperException e) {
+      throw new IllegalStateException(
+          "startSuperstep: KeeperException getting assignments", e);
+    } catch (InterruptedException e) {
+      throw new IllegalStateException(
+          "startSuperstep: InterruptedException getting assignments", e);
+    }
+
+    workerInfoList.clear();
+    workerInfoList = addressesAndPartitions.getWorkerInfos();
+    masterInfo = addressesAndPartitions.getMasterInfo();
+
+    if (LOG.isInfoEnabled()) {
+      LOG.info("startSuperstep: " + masterInfo);
+      LOG.info("startSuperstep: Ready for computation on superstep " +
+          getSuperstep() + " since worker " +
+          "selection and vertex range assignments are done in " +
+          addressesAndPartitionsPath);
+    }
+
+    getContext().setStatus("startSuperstep: " +
+        getGraphTaskManager().getGraphFunctions().toString() +
+        " - Attempt=" + getApplicationAttempt() +
+        ", Superstep=" + getSuperstep());
+    return addressesAndPartitions.getPartitionOwners();
+  }
+
+  @Override
+  public FinishedSuperstepStats finishSuperstep(
+      List<PartitionStats> partitionStatsList) {
+    // This barrier blocks until success (or the master signals it to
+    // restart).
+    //
+    // Master will coordinate the barriers and aggregate "doneness" of all
+    // the vertices.  Each worker will:
+    // 1. Ensure that the requests are complete
+    // 2. Execute user postSuperstep() if necessary.
+    // 3. Save aggregator values that are in use.
+    // 4. Report the statistics (vertices, edges, messages, etc.)
+    //    of this worker
+    // 5. Let the master know it is finished.
+    // 6. Wait for the master's superstep info, and check if done
+    waitForRequestsToFinish();
+
+    getGraphTaskManager().notifyFinishedCommunication();
+
+    long workerSentMessages = 0;
+    long workerSentMessageBytes = 0;
+    long localVertices = 0;
+    for (PartitionStats partitionStats : partitionStatsList) {
+      workerSentMessages += partitionStats.getMessagesSentCount();
+      workerSentMessageBytes += partitionStats.getMessageBytesSentCount();
+      localVertices += partitionStats.getVertexCount();
+    }
+
+    if (getSuperstep() != INPUT_SUPERSTEP) {
+      postSuperstepCallbacks();
+    }
+
+    aggregatorHandler.finishSuperstep(workerAggregatorRequestProcessor);
+
+    if (LOG.isInfoEnabled()) {
+      LOG.info("finishSuperstep: Superstep " + getSuperstep() +
+          ", messages = " + workerSentMessages + " " +
+          ", message bytes = " + workerSentMessageBytes + " , " +
+          MemoryUtils.getRuntimeMemoryStats());
+    }
+
+    writeFinshedSuperstepInfoToZK(partitionStatsList,
+      workerSentMessages, workerSentMessageBytes);
+
+    LoggerUtils.setStatusAndLog(getContext(), LOG, Level.INFO,
+        "finishSuperstep: (waiting for rest " +
+            "of workers) " +
+            getGraphTaskManager().getGraphFunctions().toString() +
+            " - Attempt=" + getApplicationAttempt() +
+            ", Superstep=" + getSuperstep());
+
+    String superstepFinishedNode =
+        getSuperstepFinishedPath(getApplicationAttempt(), getSuperstep());
+
+    waitForOtherWorkers(superstepFinishedNode);
+
+    GlobalStats globalStats = new GlobalStats();
+    SuperstepClasses superstepClasses = new SuperstepClasses();
+    WritableUtils.readFieldsFromZnode(
+        getZkExt(), superstepFinishedNode, false, null, globalStats,
+        superstepClasses);
+    if (LOG.isInfoEnabled()) {
+      LOG.info("finishSuperstep: Completed superstep " + getSuperstep() +
+          " with global stats " + globalStats + " and classes " +
+          superstepClasses);
+    }
+    incrCachedSuperstep();
+    getContext().setStatus("finishSuperstep: (all workers done) " +
+        getGraphTaskManager().getGraphFunctions().toString() +
+        " - Attempt=" + getApplicationAttempt() +
+        ", Superstep=" + getSuperstep());
+    getConfiguration().updateSuperstepClasses(superstepClasses);
+
+    return new FinishedSuperstepStats(
+        localVertices,
+        globalStats.getHaltComputation(),
+        globalStats.getVertexCount(),
+        globalStats.getEdgeCount(),
+        false);
+  }
+
+  /**
+   * Handle post-superstep callbacks
+   */
+  private void postSuperstepCallbacks() {
+    GiraphTimerContext timerContext = wcPostSuperstepTimer.time();
+    getWorkerContext().postSuperstep();
+    timerContext.stop();
+    getContext().progress();
+
+    for (WorkerObserver obs : getWorkerObservers()) {
+      obs.postSuperstep(getSuperstep());
+      getContext().progress();
+    }
+  }
+
+  /**
+   * Wait for all the requests to finish.
+   */
+  private void waitForRequestsToFinish() {
+    if (LOG.isInfoEnabled()) {
+      LOG.info("finishSuperstep: Waiting on all requests, superstep " +
+          getSuperstep() + " " +
+          MemoryUtils.getRuntimeMemoryStats());
+    }
+    GiraphTimerContext timerContext = waitRequestsTimer.time();
+    workerClient.waitAllRequests();
+    timerContext.stop();
+  }
+
+  /**
+   * Wait for all the other Workers to finish the superstep.
+   *
+   * @param superstepFinishedNode ZooKeeper path to wait on.
+   */
+  private void waitForOtherWorkers(String superstepFinishedNode) {
+    try {
+      while (getZkExt().exists(superstepFinishedNode, true) == null) {
+        getSuperstepFinishedEvent().waitForever();
+        getSuperstepFinishedEvent().reset();
+      }
+    } catch (KeeperException e) {
+      throw new IllegalStateException(
+          "finishSuperstep: Failed while waiting for master to " +
+              "signal completion of superstep " + getSuperstep(), e);
+    } catch (InterruptedException e) {
+      throw new IllegalStateException(
+          "finishSuperstep: Failed while waiting for master to " +
+              "signal completion of superstep " + getSuperstep(), e);
+    }
+  }
+
+  /**
+   * Write finished superstep info to ZooKeeper.
+   *
+   * @param partitionStatsList List of partition stats from superstep.
+   * @param workerSentMessages Number of messages sent in superstep.
+   * @param workerSentMessageBytes Number of message bytes sent
+   *                               in superstep.
+   */
+  private void writeFinshedSuperstepInfoToZK(
+      List<PartitionStats> partitionStatsList, long workerSentMessages,
+      long workerSentMessageBytes) {
+    Collection<PartitionStats> finalizedPartitionStats =
+        workerGraphPartitioner.finalizePartitionStats(
+            partitionStatsList, getPartitionStore());
+    List<PartitionStats> finalizedPartitionStatsList =
+        new ArrayList<PartitionStats>(finalizedPartitionStats);
+    byte[] partitionStatsBytes =
+        WritableUtils.writeListToByteArray(finalizedPartitionStatsList);
+    WorkerSuperstepMetrics metrics = new WorkerSuperstepMetrics();
+    metrics.readFromRegistry();
+    byte[] metricsBytes = WritableUtils.writeToByteArray(metrics);
+
+    JSONObject workerFinishedInfoObj = new JSONObject();
+    try {
+      workerFinishedInfoObj.put(JSONOBJ_PARTITION_STATS_KEY,
+          Base64.encodeBytes(partitionStatsBytes));
+      workerFinishedInfoObj.put(JSONOBJ_NUM_MESSAGES_KEY, workerSentMessages);
+      workerFinishedInfoObj.put(JSONOBJ_NUM_MESSAGE_BYTES_KEY,
+        workerSentMessageBytes);
+      workerFinishedInfoObj.put(JSONOBJ_METRICS_KEY,
+          Base64.encodeBytes(metricsBytes));
+    } catch (JSONException e) {
+      throw new RuntimeException(e);
+    }
+
+    String finishedWorkerPath =
+        getWorkerFinishedPath(getApplicationAttempt(), getSuperstep()) +
+        "/" + getHostnamePartitionId();
+    try {
+      getZkExt().createExt(finishedWorkerPath,
+          workerFinishedInfoObj.toString().getBytes(Charset.defaultCharset()),
+          Ids.OPEN_ACL_UNSAFE,
+          CreateMode.PERSISTENT,
+          true);
+    } catch (KeeperException.NodeExistsException e) {
+      LOG.warn("finishSuperstep: finished worker path " +
+          finishedWorkerPath + " already exists!");
+    } catch (KeeperException e) {
+      throw new IllegalStateException("Creating " + finishedWorkerPath +
+          " failed with KeeperException", e);
+    } catch (InterruptedException e) {
+      throw new IllegalStateException("Creating " + finishedWorkerPath +
+          " failed with InterruptedException", e);
+    }
+  }
+
+  /**
+   * Save the vertices using the user-defined VertexOutputFormat from our
+   * vertexArray based on the split.
+   *
+   * @param numLocalVertices Number of local vertices
+   * @throws InterruptedException
+   */
+  private void saveVertices(long numLocalVertices) throws IOException,
+      InterruptedException {
+    if (getConfiguration().getVertexOutputFormatClass() == null) {
+      LOG.warn("saveVertices: " +
+          GiraphConstants.VERTEX_OUTPUT_FORMAT_CLASS +
+          " not specified -- there will be no saved output");
+      return;
+    }
+    if (getConfiguration().doOutputDuringComputation()) {
+      if (LOG.isInfoEnabled()) {
+        LOG.info("saveVertices: The option for doing output during " +
+            "computation is selected, so there will be no saving of the " +
+            "output in the end of application");
+      }
+      return;
+    }
+
+    final int numPartitions = getPartitionStore().getNumPartitions();
+    int numThreads = Math.min(getConfiguration().getNumOutputThreads(),
+        numPartitions);
+    LoggerUtils.setStatusAndLog(getContext(), LOG, Level.INFO,
+        "saveVertices: Starting to save " + numLocalVertices + " vertices " +
+            "using " + numThreads + " threads");
+    final VertexOutputFormat<I, V, E> vertexOutputFormat =
+        getConfiguration().createWrappedVertexOutputFormat();
+
+    final Queue<Integer> partitionIdQueue =
+        (numPartitions == 0) ? new LinkedList<Integer>() :
+            new ArrayBlockingQueue<Integer>(numPartitions);
+    Iterables.addAll(partitionIdQueue, getPartitionStore().getPartitionIds());
+
+    CallableFactory<Void> callableFactory = new CallableFactory<Void>() {
+      @Override
+      public Callable<Void> newCallable(int callableId) {
+        return new Callable<Void>() {
+          @Override
+          public Void call() throws Exception {
+            VertexWriter<I, V, E> vertexWriter =
+                vertexOutputFormat.createVertexWriter(getContext());
+            vertexWriter.setConf(getConfiguration());
+            vertexWriter.initialize(getContext());
+            long nextPrintVertices = 0;
+            long nextPrintMsecs = System.currentTimeMillis() + 15000;
+            int partitionIndex = 0;
+            int numPartitions = getPartitionStore().getNumPartitions();
+            while (!partitionIdQueue.isEmpty()) {
+              Integer partitionId = partitionIdQueue.poll();
+              if (partitionId == null) {
+                break;
+              }
+
+              Partition<I, V, E> partition =
+                  getPartitionStore().getPartition(partitionId);
+              long verticesWritten = 0;
+              for (Vertex<I, V, E> vertex : partition) {
+                vertexWriter.writeVertex(vertex);
+                ++verticesWritten;
+
+                // Update status at most every 250k vertices or 15 seconds
+                if (verticesWritten > nextPrintVertices &&
+                    System.currentTimeMillis() > nextPrintMsecs) {
+                  LoggerUtils.setStatusAndLog(getContext(), LOG, Level.INFO,
+                      "saveVertices: Saved " + verticesWritten + " out of " +
+                          partition.getVertexCount() + " partition vertices, " +
+                          "on partition " + partitionIndex +
+                          " out of " + numPartitions);
+                  nextPrintMsecs = System.currentTimeMillis() + 15000;
+                  nextPrintVertices = verticesWritten + 250000;
+                }
+              }
+              getPartitionStore().putPartition(partition);
+              ++partitionIndex;
+            }
+            vertexWriter.close(getContext()); // the temp results are saved now
+            return null;
+          }
+        };
+      }
+    };
+    ProgressableUtils.getResultsWithNCallables(callableFactory, numThreads,
+        "save-vertices-%d", getContext());
+
+    LoggerUtils.setStatusAndLog(getContext(), LOG, Level.INFO,
+      "saveVertices: Done saving vertices.");
+    // YARN: must complete the commit the "task" output, Hadoop isn't there.
+    if (getConfiguration().isPureYarnJob() &&
+      getConfiguration().getVertexOutputFormatClass() != null) {
+      try {
+        OutputCommitter outputCommitter =
+          vertexOutputFormat.getOutputCommitter(getContext());
+        if (outputCommitter.needsTaskCommit(getContext())) {
+          LoggerUtils.setStatusAndLog(getContext(), LOG, Level.INFO,
+            "OutputCommitter: committing task output.");
+          // transfer from temp dirs to "task commit" dirs to prep for
+          // the master's OutputCommitter#commitJob(context) call to finish.
+          outputCommitter.commitTask(getContext());
+        }
+      } catch (InterruptedException ie) {
+        LOG.error("Interrupted while attempting to obtain " +
+          "OutputCommitter.", ie);
+      } catch (IOException ioe) {
+        LOG.error("Master task's attempt to commit output has " +
+          "FAILED.", ioe);
+      }
+    }
+  }
+
+  @Override
+  public void cleanup(FinishedSuperstepStats finishedSuperstepStats)
+    throws IOException, InterruptedException {
+    workerClient.closeConnections();
+    setCachedSuperstep(getSuperstep() - 1);
+    saveVertices(finishedSuperstepStats.getLocalVertexCount());
+    getPartitionStore().shutdown();
+    // All worker processes should denote they are done by adding special
+    // znode.  Once the number of znodes equals the number of partitions
+    // for workers and masters, the master will clean up the ZooKeeper
+    // znodes associated with this job.
+    String workerCleanedUpPath = cleanedUpPath  + "/" +
+        getTaskPartition() + WORKER_SUFFIX;
+    try {
+      String finalFinishedPath =
+          getZkExt().createExt(workerCleanedUpPath,
+              null,
+              Ids.OPEN_ACL_UNSAFE,
+              CreateMode.PERSISTENT,
+              true);
+      if (LOG.isInfoEnabled()) {
+        LOG.info("cleanup: Notifying master its okay to cleanup with " +
+            finalFinishedPath);
+      }
+    } catch (KeeperException.NodeExistsException e) {
+      if (LOG.isInfoEnabled()) {
+        LOG.info("cleanup: Couldn't create finished node '" +
+            workerCleanedUpPath);
+      }
+    } catch (KeeperException e) {
+      // Cleaning up, it's okay to fail after cleanup is successful
+      LOG.error("cleanup: Got KeeperException on notification " +
+          "to master about cleanup", e);
+    } catch (InterruptedException e) {
+      // Cleaning up, it's okay to fail after cleanup is successful
+      LOG.error("cleanup: Got InterruptedException on notification " +
+          "to master about cleanup", e);
+    }
+    try {
+      getZkExt().close();
+    } catch (InterruptedException e) {
+      // cleanup phase -- just log the error
+      LOG.error("cleanup: Zookeeper failed to close with " + e);
+    }
+
+    if (getConfiguration().metricsEnabled()) {
+      GiraphMetrics.get().dumpToStream(System.err);
+    }
+
+    // Preferably would shut down the service only after
+    // all clients have disconnected (or the exceptions on the
+    // client side ignored).
+    workerServer.close();
+  }
+
+  @Override
+  public void storeCheckpoint() throws IOException {
+    LoggerUtils.setStatusAndLog(getContext(), LOG, Level.INFO,
+        "storeCheckpoint: Starting checkpoint " +
+            getGraphTaskManager().getGraphFunctions().toString() +
+            " - Attempt=" + getApplicationAttempt() +
+            ", Superstep=" + getSuperstep());
+
+    // Algorithm:
+    // For each partition, dump vertices and messages
+    Path metadataFilePath =
+        new Path(getCheckpointBasePath(getSuperstep()) + "." +
+            getHostnamePartitionId() +
+            CHECKPOINT_METADATA_POSTFIX);
+    Path verticesFilePath =
+        new Path(getCheckpointBasePath(getSuperstep()) + "." +
+            getHostnamePartitionId() +
+            CHECKPOINT_VERTICES_POSTFIX);
+    Path validFilePath =
+        new Path(getCheckpointBasePath(getSuperstep()) + "." +
+            getHostnamePartitionId() +
+            CHECKPOINT_VALID_POSTFIX);
+
+    // Remove these files if they already exist (shouldn't though, unless
+    // of previous failure of this worker)
+    if (getFs().delete(validFilePath, false)) {
+      LOG.warn("storeCheckpoint: Removed valid file " +
+          validFilePath);
+    }
+    if (getFs().delete(metadataFilePath, false)) {
+      LOG.warn("storeCheckpoint: Removed metadata file " +
+          metadataFilePath);
+    }
+    if (getFs().delete(verticesFilePath, false)) {
+      LOG.warn("storeCheckpoint: Removed file " + verticesFilePath);
+    }
+
+    FSDataOutputStream verticesOutputStream =
+        getFs().create(verticesFilePath);
+    ByteArrayOutputStream metadataByteStream = new ByteArrayOutputStream();
+    DataOutput metadataOutput = new DataOutputStream(metadataByteStream);
+    for (Integer partitionId : getPartitionStore().getPartitionIds()) {
+      Partition<I, V, E> partition =
+          getPartitionStore().getPartition(partitionId);
+      long startPos = verticesOutputStream.getPos();
+      partition.write(verticesOutputStream);
+      // write messages
+      getServerData().getCurrentMessageStore().writePartition(
+          verticesOutputStream, partition.getId());
+      // Write the metadata for this partition
+      // Format:
+      // <index count>
+      //   <index 0 start pos><partition id>
+      //   <index 1 start pos><partition id>
+      metadataOutput.writeLong(startPos);
+      metadataOutput.writeInt(partition.getId());
+      if (LOG.isDebugEnabled()) {
+        LOG.debug("storeCheckpoint: Vertex file starting " +
+            "offset = " + startPos + ", length = " +
+            (verticesOutputStream.getPos() - startPos) +
+            ", partition = " + partition.toString());
+      }
+      getPartitionStore().putPartition(partition);
+      getContext().progress();
+    }
+    // Metadata is buffered and written at the end since it's small and
+    // needs to know how many partitions this worker owns
+    FSDataOutputStream metadataOutputStream =
+        getFs().create(metadataFilePath);
+    metadataOutputStream.writeInt(getPartitionStore().getNumPartitions());
+    metadataOutputStream.write(metadataByteStream.toByteArray());
+    metadataOutputStream.close();
+    verticesOutputStream.close();
+    if (LOG.isInfoEnabled()) {
+      LOG.info("storeCheckpoint: Finished metadata (" +
+          metadataFilePath + ") and vertices (" + verticesFilePath + ").");
+    }
+
+    getFs().createNewFile(validFilePath);
+
+    // Notify master that checkpoint is stored
+    String workerWroteCheckpoint =
+        getWorkerWroteCheckpointPath(getApplicationAttempt(),
+            getSuperstep()) + "/" + getHostnamePartitionId();
+    try {
+      getZkExt().createExt(workerWroteCheckpoint,
+          new byte[0],
+          Ids.OPEN_ACL_UNSAFE,
+          CreateMode.PERSISTENT,
+          true);
+    } catch (KeeperException.NodeExistsException e) {
+      LOG.warn("storeCheckpoint: wrote checkpoint worker path " +
+          workerWroteCheckpoint + " already exists!");
+    } catch (KeeperException e) {
+      throw new IllegalStateException("Creating " + workerWroteCheckpoint +
+          " failed with KeeperException", e);
+    } catch (InterruptedException e) {
+      throw new IllegalStateException("Creating " +
+          workerWroteCheckpoint +
+          " failed with InterruptedException", e);
+    }
+  }
+
+  @Override
+  public VertexEdgeCount loadCheckpoint(long superstep) {
+    try {
+      // clear old message stores
+      getServerData().getIncomingMessageStore().clearAll();
+      getServerData().getCurrentMessageStore().clearAll();
+    } catch (IOException e) {
+      throw new RuntimeException(
+          "loadCheckpoint: Failed to clear message stores ", e);
+    }
+
+    // Algorithm:
+    // Examine all the partition owners and load the ones
+    // that match my hostname and id from the master designated checkpoint
+    // prefixes.
+    long startPos = 0;
+    int loadedPartitions = 0;
+    for (PartitionOwner partitionOwner :
+      workerGraphPartitioner.getPartitionOwners()) {
+      if (partitionOwner.getWorkerInfo().equals(getWorkerInfo())) {
+        String metadataFile =
+            partitionOwner.getCheckpointFilesPrefix() +
+            CHECKPOINT_METADATA_POSTFIX;
+        String partitionsFile =
+            partitionOwner.getCheckpointFilesPrefix() +
+            CHECKPOINT_VERTICES_POSTFIX;
+        try {
+          int partitionId = -1;
+          DataInputStream metadataStream =
+              getFs().open(new Path(metadataFile));
+          int partitions = metadataStream.readInt();
+          for (int i = 0; i < partitions; ++i) {
+            startPos = metadataStream.readLong();
+            partitionId = metadataStream.readInt();
+            if (partitionId == partitionOwner.getPartitionId()) {
+              break;
+            }
+          }
+          if (partitionId != partitionOwner.getPartitionId()) {
+            throw new IllegalStateException(
+                "loadCheckpoint: " + partitionOwner +
+                " not found!");
+          }
+          metadataStream.close();
+          Partition<I, V, E> partition =
+              getConfiguration().createPartition(partitionId, getContext());
+          DataInputStream partitionsStream =
+              getFs().open(new Path(partitionsFile));
+          if (partitionsStream.skip(startPos) != startPos) {
+            throw new IllegalStateException(
+                "loadCheckpoint: Failed to skip " + startPos +
+                " on " + partitionsFile);
+          }
+          partition.readFields(partitionsStream);
+          getServerData().getIncomingMessageStore().readFieldsForPartition(
+              partitionsStream, partitionId);
+          partitionsStream.close();
+          if (LOG.isInfoEnabled()) {
+            LOG.info("loadCheckpoint: Loaded partition " +
+                partition);
+          }
+          if (getPartitionStore().hasPartition(partitionId)) {
+            throw new IllegalStateException(
+                "loadCheckpoint: Already has partition owner " +
+                    partitionOwner);
+          }
+          getPartitionStore().addPartition(partition);
+          getContext().progress();
+          ++loadedPartitions;
+        } catch (IOException e) {
+          throw new RuntimeException(
+              "loadCheckpoint: Failed to get partition owner " +
+                  partitionOwner, e);
+        }
+      }
+    }
+    if (LOG.isInfoEnabled()) {
+      LOG.info("loadCheckpoint: Loaded " + loadedPartitions +
+          " partitions of out " +
+          workerGraphPartitioner.getPartitionOwners().size() +
+          " total.");
+    }
+
+    // Load global stats and superstep classes
+    GlobalStats globalStats = new GlobalStats();
+    SuperstepClasses superstepClasses = new SuperstepClasses();
+    String finalizedCheckpointPath =
+        getCheckpointBasePath(superstep) + CHECKPOINT_FINALIZED_POSTFIX;
+    try {
+      DataInputStream finalizedStream =
+          getFs().open(new Path(finalizedCheckpointPath));
+      globalStats.readFields(finalizedStream);
+      superstepClasses.readFields(finalizedStream);
+      getConfiguration().updateSuperstepClasses(superstepClasses);
+    } catch (IOException e) {
+      throw new IllegalStateException(
+          "loadCheckpoint: Failed to load global stats and superstep classes",
+          e);
+    }
+
+    getServerData().prepareSuperstep();
+    // Communication service needs to setup the connections prior to
+    // processing vertices
+/*if[HADOOP_NON_SECURE]
+    workerClient.setup();
+else[HADOOP_NON_SECURE]*/
+    workerClient.setup(getConfiguration().authenticate());
+/*end[HADOOP_NON_SECURE]*/
+    return new VertexEdgeCount(globalStats.getVertexCount(),
+        globalStats.getEdgeCount());
+  }
+
+  /**
+   * Send the worker partitions to their destination workers
+   *
+   * @param workerPartitionMap Map of worker info to the partitions stored
+   *        on this worker to be sent
+   */
+  private void sendWorkerPartitions(
+      Map<WorkerInfo, List<Integer>> workerPartitionMap) {
+    List<Entry<WorkerInfo, List<Integer>>> randomEntryList =
+        new ArrayList<Entry<WorkerInfo, List<Integer>>>(
+            workerPartitionMap.entrySet());
+    Collections.shuffle(randomEntryList);
+    WorkerClientRequestProcessor<I, V, E> workerClientRequestProcessor =
+        new NettyWorkerClientRequestProcessor<I, V, E>(getContext(),
+            getConfiguration(), this);
+    for (Entry<WorkerInfo, List<Integer>> workerPartitionList :
+      randomEntryList) {
+      for (Integer partitionId : workerPartitionList.getValue()) {
+        Partition<I, V, E> partition =
+            getPartitionStore().removePartition(partitionId);
+        if (partition == null) {
+          throw new IllegalStateException(
+              "sendWorkerPartitions: Couldn't find partition " +
+                  partitionId + " to send to " +
+                  workerPartitionList.getKey());
+        }
+        if (LOG.isInfoEnabled()) {
+          LOG.info("sendWorkerPartitions: Sending worker " +
+              workerPartitionList.getKey() + " partition " +
+              partitionId);
+        }
+        workerClientRequestProcessor.sendPartitionRequest(
+            workerPartitionList.getKey(),
+            partition);
+      }
+    }
+
+
+    try {
+      workerClientRequestProcessor.flush();
+      workerClient.waitAllRequests();
+    } catch (IOException e) {
+      throw new IllegalStateException("sendWorkerPartitions: Flush failed", e);
+    }
+    String myPartitionExchangeDonePath =
+        getPartitionExchangeWorkerPath(
+            getApplicationAttempt(), getSuperstep(), getWorkerInfo());
+    try {
+      getZkExt().createExt(myPartitionExchangeDonePath,
+          null,
+          Ids.OPEN_ACL_UNSAFE,
+          CreateMode.PERSISTENT,
+          true);
+    } catch (KeeperException e) {
+      throw new IllegalStateException(
+          "sendWorkerPartitions: KeeperException to create " +
+              myPartitionExchangeDonePath, e);
+    } catch (InterruptedException e) {
+      throw new IllegalStateException(
+          "sendWorkerPartitions: InterruptedException to create " +
+              myPartitionExchangeDonePath, e);
+    }
+    if (LOG.isInfoEnabled()) {
+      LOG.info("sendWorkerPartitions: Done sending all my partitions.");
+    }
+  }
+
+  @Override
+  public final void exchangeVertexPartitions(
+      Collection<? extends PartitionOwner> masterSetPartitionOwners) {
+    // 1. Fix the addresses of the partition ids if they have changed.
+    // 2. Send all the partitions to their destination workers in a random
+    //    fashion.
+    // 3. Notify completion with a ZooKeeper stamp
+    // 4. Wait for all my dependencies to be done (if any)
+    // 5. Add the partitions to myself.
+    PartitionExchange partitionExchange =
+        workerGraphPartitioner.updatePartitionOwners(
+            getWorkerInfo(), masterSetPartitionOwners, getPartitionStore());
+    workerClient.openConnections();
+
+    Map<WorkerInfo, List<Integer>> sendWorkerPartitionMap =
+        partitionExchange.getSendWorkerPartitionMap();
+    if (!getPartitionStore().isEmpty()) {
+      sendWorkerPartitions(sendWorkerPartitionMap);
+    }
+
+    Set<WorkerInfo> myDependencyWorkerSet =
+        partitionExchange.getMyDependencyWorkerSet();
+    Set<String> workerIdSet = new HashSet<String>();
+    for (WorkerInfo tmpWorkerInfo : myDependencyWorkerSet) {
+      if (!workerIdSet.add(tmpWorkerInfo.getHostnameId())) {
+        throw new IllegalStateException(
+            "exchangeVertexPartitions: Duplicate entry " + tmpWorkerInfo);
+      }
+    }
+    if (myDependencyWorkerSet.isEmpty() && getPartitionStore().isEmpty()) {
+      if (LOG.isInfoEnabled()) {
+        LOG.info("exchangeVertexPartitions: Nothing to exchange, " +
+            "exiting early");
+      }
+      return;
+    }
+
+    String vertexExchangePath =
+        getPartitionExchangePath(getApplicationAttempt(), getSuperstep());
+    List<String> workerDoneList;
+    try {
+      while (true) {
+        workerDoneList = getZkExt().getChildrenExt(
+            vertexExchangePath, true, false, false);
+        workerIdSet.removeAll(workerDoneList);
+        if (workerIdSet.isEmpty()) {
+          break;
+        }
+        if (LOG.isInfoEnabled()) {
+          LOG.info("exchangeVertexPartitions: Waiting for workers " +
+              workerIdSet);
+        }
+        getPartitionExchangeChildrenChangedEvent().waitForever();
+        getPartitionExchangeChildrenChangedEvent().reset();
+      }
+    } catch (KeeperException e) {
+      throw new RuntimeException(e);
+    } catch (InterruptedException e) {
+      throw new RuntimeException(e);
+    }
+
+    if (LOG.isInfoEnabled()) {
+      LOG.info("exchangeVertexPartitions: Done with exchange.");
+    }
+  }
+
+  /**
+   * Get event when the state of a partition exchange has changed.
+   *
+   * @return Event to check.
+   */
+  public final BspEvent getPartitionExchangeChildrenChangedEvent() {
+    return partitionExchangeChildrenChanged;
+  }
+
+  @Override
+  protected boolean processEvent(WatchedEvent event) {
+    boolean foundEvent = false;
+    if (event.getPath().startsWith(masterJobStatePath) &&
+        (event.getType() == EventType.NodeChildrenChanged)) {
+      if (LOG.isInfoEnabled()) {
+        LOG.info("processEvent: Job state changed, checking " +
+            "to see if it needs to restart");
+      }
+      JSONObject jsonObj = getJobState();
+      // in YARN, we have to manually commit our own output in 2 stages that we
+      // do not have to do in Hadoop-based Giraph. So jsonObj can be null.
+      if (getConfiguration().isPureYarnJob() && null == jsonObj) {
+        LOG.error("BspServiceWorker#getJobState() came back NULL.");
+        return false; // the event has been processed.
+      }
+      try {
+        if ((ApplicationState.valueOf(jsonObj.getString(JSONOBJ_STATE_KEY)) ==
+            ApplicationState.START_SUPERSTEP) &&
+            jsonObj.getLong(JSONOBJ_APPLICATION_ATTEMPT_KEY) !=
+            getApplicationAttempt()) {
+          LOG.fatal("processEvent: Worker will restart " +
+              "from command - " + jsonObj.toString());
+          System.exit(-1);
+        }
+      } catch (JSONException e) {
+        throw new RuntimeException(
+            "processEvent: Couldn't properly get job state from " +
+                jsonObj.toString());
+      }
+      foundEvent = true;
+    } else if (event.getPath().contains(PARTITION_EXCHANGE_DIR) &&
+        event.getType() == EventType.NodeChildrenChanged) {
+      if (LOG.isInfoEnabled()) {
+        LOG.info("processEvent : partitionExchangeChildrenChanged " +
+            "(at least one worker is done sending partitions)");
+      }
+      partitionExchangeChildrenChanged.signal();
+      foundEvent = true;
+    }
+
+    return foundEvent;
+  }
+
+  @Override
+  public WorkerInfo getWorkerInfo() {
+    return workerInfo;
+  }
+
+  @Override
+  public PartitionStore<I, V, E> getPartitionStore() {
+    return getServerData().getPartitionStore();
+  }
+
+  @Override
+  public PartitionOwner getVertexPartitionOwner(I vertexId) {
+    return workerGraphPartitioner.getPartitionOwner(vertexId);
+  }
+
+  @Override
+  public Iterable<? extends PartitionOwner> getPartitionOwners() {
+    return workerGraphPartitioner.getPartitionOwners();
+  }
+
+  @Override
+  public int getPartitionId(I vertexId) {
+    PartitionOwner partitionOwner = getVertexPartitionOwner(vertexId);
+    return partitionOwner.getPartitionId();
+  }
+
+  @Override
+  public boolean hasPartition(Integer partitionId) {
+    return getPartitionStore().hasPartition(partitionId);
+  }
+
+  @Override
+  public ServerData<I, V, E> getServerData() {
+    return workerServer.getServerData();
+  }
+
+  @Override
+  public WorkerAggregatorHandler getAggregatorHandler() {
+    return aggregatorHandler;
+  }
+
+  @Override
+  public void prepareSuperstep() {
+    if (getSuperstep() != INPUT_SUPERSTEP) {
+      aggregatorHandler.prepareSuperstep(workerAggregatorRequestProcessor);
+    }
+  }
+
+  @Override
+  public SuperstepOutput<I, V, E> getSuperstepOutput() {
+    return superstepOutput;
+  }
+}

http://git-wip-us.apache.org/repos/asf/giraph/blob/ae01f039/giraph-core/src/test/java/org/apache/giraph/io/TestSrcIdDstIdEdgeValueTextOutputFormat.java
----------------------------------------------------------------------
diff --git a/giraph-core/src/test/java/org/apache/giraph/io/TestSrcIdDstIdEdgeValueTextOutputFormat.java b/giraph-core/src/test/java/org/apache/giraph/io/TestSrcIdDstIdEdgeValueTextOutputFormat.java
new file mode 100644
index 0000000..9909871
--- /dev/null
+++ b/giraph-core/src/test/java/org/apache/giraph/io/TestSrcIdDstIdEdgeValueTextOutputFormat.java
@@ -0,0 +1,114 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.giraph.io;
+
+import static org.apache.giraph.conf.GiraphConstants.GIRAPH_TEXT_OUTPUT_FORMAT_REVERSE;
+import static org.apache.giraph.conf.GiraphConstants.GIRAPH_TEXT_OUTPUT_FORMAT_SEPARATOR;
+import static org.mockito.Mockito.mock;
+import static org.mockito.Mockito.verify;
+import static org.mockito.Mockito.when;
+
+import java.io.IOException;
+
+import org.apache.giraph.conf.GiraphConfiguration;
+import org.apache.giraph.conf.ImmutableClassesGiraphConfiguration;
+import org.apache.giraph.edge.Edge;
+import org.apache.giraph.io.formats.SrcIdDstIdEdgeValueTextOutputFormat;
+import org.apache.giraph.utils.NoOpComputation;
+import org.apache.hadoop.io.DoubleWritable;
+import org.apache.hadoop.io.LongWritable;
+import org.apache.hadoop.io.Text;
+import org.apache.hadoop.mapreduce.RecordWriter;
+import org.apache.hadoop.mapreduce.TaskAttemptContext;
+import org.junit.Before;
+import org.junit.Test;
+
+public class TestSrcIdDstIdEdgeValueTextOutputFormat
+  extends SrcIdDstIdEdgeValueTextOutputFormat<LongWritable,
+          LongWritable, LongWritable> {
+  /** Test configuration */
+  private ImmutableClassesGiraphConfiguration<
+      LongWritable, LongWritable, LongWritable> conf;
+  /**
+   * Dummy class to allow ImmutableClassesGiraphConfiguration to be created.
+   */
+  public static class DummyComputation extends NoOpComputation<Text,
+      DoubleWritable, DoubleWritable, DoubleWritable> { }
+
+  @Before
+  public void setUp() {
+    GiraphConfiguration giraphConfiguration = new GiraphConfiguration();
+    giraphConfiguration.setComputationClass(DummyComputation.class);
+    conf =
+      new ImmutableClassesGiraphConfiguration<LongWritable, LongWritable,
+        LongWritable>(giraphConfiguration);
+  }
+
+  @Test
+  public void testHappyPath() throws IOException, InterruptedException {
+    Text expected = new Text("0\t1\t5");
+
+    checkSrcIdDstIdEdgeValueWorker(expected);
+  }
+
+  @Test
+  public void testReverseIdAndValue() throws IOException, InterruptedException {
+    GIRAPH_TEXT_OUTPUT_FORMAT_REVERSE.set(this.conf, true);
+    Text expected = new Text("5\t1\t0");
+
+    checkSrcIdDstIdEdgeValueWorker(expected);
+  }
+
+  @Test
+  public void testWithDifferentDelimiter()  throws IOException,
+      InterruptedException {
+    GIRAPH_TEXT_OUTPUT_FORMAT_SEPARATOR.set(this.conf, "->");
+    Text expected = new Text("0->1->5");
+
+    checkSrcIdDstIdEdgeValueWorker(expected);
+  }
+
+  private void checkSrcIdDstIdEdgeValueWorker(Text expected)
+    throws IOException, InterruptedException {
+
+    TaskAttemptContext tac = mock(TaskAttemptContext.class);
+    when(tac.getConfiguration()).thenReturn(conf);
+
+    Edge edge = mock(Edge.class);
+
+    when(edge.getTargetVertexId()).thenReturn(new LongWritable(1));
+    when(edge.getValue()).thenReturn(new LongWritable(5));
+
+    final RecordWriter<Text, Text> tw = mock(RecordWriter.class);
+    SrcIdDstIdEdgeValueEdgeWriter writer = new SrcIdDstIdEdgeValueEdgeWriter() {
+      @Override
+      protected RecordWriter<Text, Text> createLineRecordWriter(
+        TaskAttemptContext context) throws IOException, InterruptedException {
+
+        return tw;
+      }
+    };
+
+    writer.setConf(conf);
+    writer.initialize(tac);
+    writer.writeEdge(new LongWritable(0), new LongWritable(0), edge);
+
+    verify(tw).write(expected, null);
+  }
+}


[2/2] git commit: updated refs/heads/trunk to ae01f03

Posted by cl...@apache.org.
GIRAPH-732


Project: http://git-wip-us.apache.org/repos/asf/giraph/repo
Commit: http://git-wip-us.apache.org/repos/asf/giraph/commit/ae01f039
Tree: http://git-wip-us.apache.org/repos/asf/giraph/tree/ae01f039
Diff: http://git-wip-us.apache.org/repos/asf/giraph/diff/ae01f039

Branch: refs/heads/trunk
Commit: ae01f0399cae6baab045b2ece0d71096aebe8ca3
Parents: fa6b754
Author: Claudio Martella <cl...@apache.org>
Authored: Mon Aug 26 22:15:18 2013 +0200
Committer: Claudio Martella <cl...@apache.org>
Committed: Mon Aug 26 22:15:18 2013 +0200

----------------------------------------------------------------------
 CHANGELOG                                       |    2 +
 .../java/org/apache/giraph/GiraphRunner.java    |    2 +-
 .../org/apache/giraph/conf/GiraphClasses.java   |   25 +
 .../apache/giraph/conf/GiraphConfiguration.java |   57 +
 .../org/apache/giraph/conf/GiraphConstants.java |   23 +
 .../ImmutableClassesGiraphConfiguration.java    |   45 +
 .../org/apache/giraph/io/EdgeOutputFormat.java  |   82 +
 .../java/org/apache/giraph/io/EdgeWriter.java   |   74 +
 .../io/formats/GiraphTextOutputFormat.java      |   90 +
 .../io/formats/IdWithValueTextOutputFormat.java |   19 +-
 .../SrcIdDstIdEdgeValueTextOutputFormat.java    |   91 ++
 .../giraph/io/formats/TextEdgeOutputFormat.java |  165 ++
 .../io/formats/TextVertexOutputFormat.java      |   15 +-
 .../io/internal/WrappedEdgeOutputFormat.java    |  169 ++
 .../apache/giraph/utils/ConfigurationUtils.java |   53 +-
 .../apache/giraph/worker/BspServiceWorker.java  |  124 +-
 .../giraph/worker/BspServiceWorker.java.orig    | 1535 ++++++++++++++++++
 ...TestSrcIdDstIdEdgeValueTextOutputFormat.java |  114 ++
 18 files changed, 2659 insertions(+), 26 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/giraph/blob/ae01f039/CHANGELOG
----------------------------------------------------------------------
diff --git a/CHANGELOG b/CHANGELOG
index af43ef8..deca52b 100644
--- a/CHANGELOG
+++ b/CHANGELOG
@@ -1,6 +1,8 @@
 Giraph Change Log
 
 Release 1.1.0 - unreleased
+  GIRAPH-732: EdgeOutputFormat (aarmax00 via claudio)
+
   GIRAPH-512: JavaDoc warnings (tdn120 via nitay)
 
   GIRAPH-736: Bring back FindBugs (nitay)

http://git-wip-us.apache.org/repos/asf/giraph/blob/ae01f039/giraph-core/src/main/java/org/apache/giraph/GiraphRunner.java
----------------------------------------------------------------------
diff --git a/giraph-core/src/main/java/org/apache/giraph/GiraphRunner.java b/giraph-core/src/main/java/org/apache/giraph/GiraphRunner.java
index 1bd79b5..9af50e1 100644
--- a/giraph-core/src/main/java/org/apache/giraph/GiraphRunner.java
+++ b/giraph-core/src/main/java/org/apache/giraph/GiraphRunner.java
@@ -102,7 +102,7 @@ public class GiraphRunner implements Tool {
    */
   private void prepareHadoopMRJob(final GiraphJob job, final CommandLine cmd)
     throws Exception {
-    if (cmd.hasOption("of")) {
+    if (cmd.hasOption("vof") || cmd.hasOption("eof")) {
       if (cmd.hasOption("op")) {
         FileOutputFormat.setOutputPath(job.getInternalJob(),
           new Path(cmd.getOptionValue("op")));

http://git-wip-us.apache.org/repos/asf/giraph/blob/ae01f039/giraph-core/src/main/java/org/apache/giraph/conf/GiraphClasses.java
----------------------------------------------------------------------
diff --git a/giraph-core/src/main/java/org/apache/giraph/conf/GiraphClasses.java b/giraph-core/src/main/java/org/apache/giraph/conf/GiraphClasses.java
index 71fe885..f97446f 100644
--- a/giraph-core/src/main/java/org/apache/giraph/conf/GiraphClasses.java
+++ b/giraph-core/src/main/java/org/apache/giraph/conf/GiraphClasses.java
@@ -28,6 +28,7 @@ import org.apache.giraph.graph.Computation;
 import org.apache.giraph.graph.DefaultVertexResolver;
 import org.apache.giraph.graph.VertexResolver;
 import org.apache.giraph.io.EdgeInputFormat;
+import org.apache.giraph.io.EdgeOutputFormat;
 import org.apache.giraph.io.VertexInputFormat;
 import org.apache.giraph.io.VertexOutputFormat;
 import org.apache.giraph.io.filters.DefaultEdgeInputFilter;
@@ -85,6 +86,9 @@ public class GiraphClasses<I extends WritableComparable,
   /** Edge input format class - cached for fast access */
   protected Class<? extends EdgeInputFormat<I, E>>
   edgeInputFormatClass;
+  /** Edge output format class - cached for fast access */
+  protected Class<? extends EdgeOutputFormat<I, V, E>>
+  edgeOutputFormatClass;
 
   /** Aggregator writer class - cached for fast access */
   protected Class<? extends AggregatorWriter> aggregatorWriterClass;
@@ -168,6 +172,8 @@ public class GiraphClasses<I extends WritableComparable,
         VERTEX_OUTPUT_FORMAT_CLASS.get(conf);
     edgeInputFormatClass = (Class<? extends EdgeInputFormat<I, E>>)
         EDGE_INPUT_FORMAT_CLASS.get(conf);
+    edgeOutputFormatClass = (Class<? extends EdgeOutputFormat<I, V, E>>)
+        EDGE_OUTPUT_FORMAT_CLASS.get(conf);
 
     aggregatorWriterClass = AGGREGATOR_WRITER_CLASS.get(conf);
     combinerClass = (Class<? extends Combiner<I, ? extends Writable>>)
@@ -347,6 +353,25 @@ public class GiraphClasses<I extends WritableComparable,
   }
 
   /**
+   * Check if EdgeOutputFormat is set
+   *
+   * @return true if EdgeOutputFormat is set
+   */
+  public boolean hasEdgeOutputFormat() {
+    return edgeOutputFormatClass != null;
+  }
+
+  /**
+   * Get VertexOutputFormat set
+   *
+   * @return VertexOutputFormat
+   */
+  public Class<? extends EdgeOutputFormat<I, V, E>>
+  getEdgeOutputFormatClass() {
+    return edgeOutputFormatClass;
+  }
+
+  /**
    * Check if AggregatorWriter is set
    *
    * @return true if AggregatorWriter is set

http://git-wip-us.apache.org/repos/asf/giraph/blob/ae01f039/giraph-core/src/main/java/org/apache/giraph/conf/GiraphConfiguration.java
----------------------------------------------------------------------
diff --git a/giraph-core/src/main/java/org/apache/giraph/conf/GiraphConfiguration.java b/giraph-core/src/main/java/org/apache/giraph/conf/GiraphConfiguration.java
index 23bcd32..15ff861 100644
--- a/giraph-core/src/main/java/org/apache/giraph/conf/GiraphConfiguration.java
+++ b/giraph-core/src/main/java/org/apache/giraph/conf/GiraphConfiguration.java
@@ -27,6 +27,7 @@ import org.apache.giraph.factories.VertexValueFactory;
 import org.apache.giraph.graph.Computation;
 import org.apache.giraph.graph.VertexResolver;
 import org.apache.giraph.io.EdgeInputFormat;
+import org.apache.giraph.io.EdgeOutputFormat;
 import org.apache.giraph.io.VertexInputFormat;
 import org.apache.giraph.io.VertexOutputFormat;
 import org.apache.giraph.io.filters.EdgeInputFilter;
@@ -345,6 +346,25 @@ public class GiraphConfiguration extends Configuration
     VERTEX_OUTPUT_FORMAT_CLASS.set(this, vertexOutputFormatClass);
   }
 
+
+  /**
+   * Does the job have a {@link EdgeOutputFormat} subdir?
+   *
+   * @return True iff a {@link EdgeOutputFormat} subdir has been specified.
+   */
+  public boolean hasVertexOutputFormatSubdir() {
+    return !VERTEX_OUTPUT_FORMAT_SUBDIR.get(this).isEmpty();
+  }
+
+  /**
+   * Set the vertex output format path
+   *
+   * @param path path where the verteces will be written
+   */
+  public final void setVertexOutputFormatSubdir(String path) {
+    VERTEX_OUTPUT_FORMAT_SUBDIR.set(this, path);
+  }
+
   /**
    * Check if output should be done during computation
    *
@@ -386,6 +406,43 @@ public class GiraphConfiguration extends Configuration
   }
 
   /**
+   * Does the job have a {@link EdgeOutputFormat}?
+   *
+   * @return True iff a {@link EdgeOutputFormat} has been specified.
+   */
+  public boolean hasEdgeOutputFormat() {
+    return EDGE_OUTPUT_FORMAT_CLASS.get(this) != null;
+  }
+
+  /**
+   * Set the edge output format class (optional)
+   *
+   * @param edgeOutputFormatClass Determines how graph is output
+   */
+  public final void setEdgeOutputFormatClass(
+      Class<? extends EdgeOutputFormat> edgeOutputFormatClass) {
+    EDGE_OUTPUT_FORMAT_CLASS.set(this, edgeOutputFormatClass);
+  }
+
+  /**
+   * Does the job have a {@link EdgeOutputFormat} subdir?
+   *
+   * @return True iff a {@link EdgeOutputFormat} subdir has been specified.
+   */
+  public boolean hasEdgeOutputFormatSubdir() {
+    return !EDGE_OUTPUT_FORMAT_SUBDIR.get(this).isEmpty();
+  }
+
+  /**
+   * Set the edge output format path
+   *
+   * @param path path where the edges will be written
+   */
+  public final void setEdgeOutputFormatSubdir(String path) {
+    EDGE_OUTPUT_FORMAT_SUBDIR.set(this, path);
+  }
+
+  /**
    * Get the number of threads to use for writing output in the end of the
    * application. If output format is not thread safe, returns 1.
    *

http://git-wip-us.apache.org/repos/asf/giraph/blob/ae01f039/giraph-core/src/main/java/org/apache/giraph/conf/GiraphConstants.java
----------------------------------------------------------------------
diff --git a/giraph-core/src/main/java/org/apache/giraph/conf/GiraphConstants.java b/giraph-core/src/main/java/org/apache/giraph/conf/GiraphConstants.java
index c276c2a..604729a 100644
--- a/giraph-core/src/main/java/org/apache/giraph/conf/GiraphConstants.java
+++ b/giraph-core/src/main/java/org/apache/giraph/conf/GiraphConstants.java
@@ -38,6 +38,7 @@ import org.apache.giraph.graph.DefaultVertexResolver;
 import org.apache.giraph.graph.Language;
 import org.apache.giraph.graph.VertexResolver;
 import org.apache.giraph.io.EdgeInputFormat;
+import org.apache.giraph.io.EdgeOutputFormat;
 import org.apache.giraph.io.VertexInputFormat;
 import org.apache.giraph.io.VertexOutputFormat;
 import org.apache.giraph.io.filters.DefaultEdgeInputFilter;
@@ -210,6 +211,28 @@ public interface GiraphConstants {
   ClassConfOption<VertexOutputFormat> VERTEX_OUTPUT_FORMAT_CLASS =
       ClassConfOption.create("giraph.vertexOutputFormatClass", null,
           VertexOutputFormat.class, "VertexOutputFormat class");
+  /** EdgeOutputFormat sub-directory */
+  StrConfOption VERTEX_OUTPUT_FORMAT_SUBDIR =
+    new StrConfOption("giraph.vertex.output.subdir", "",
+                      "VertexOutputFormat sub-directory");
+  /** EdgeOutputFormat class */
+  ClassConfOption<EdgeOutputFormat> EDGE_OUTPUT_FORMAT_CLASS =
+      ClassConfOption.create("giraph.edgeOutputFormatClass", null,
+          EdgeOutputFormat.class, "EdgeOutputFormat class");
+  /** EdgeOutputFormat sub-directory */
+  StrConfOption EDGE_OUTPUT_FORMAT_SUBDIR =
+    new StrConfOption("giraph.edge.output.subdir", "edges",
+                      "EdgeOutputFormat sub-directory");
+
+  /** GiraphTextOuputFormat Separator */
+  StrConfOption GIRAPH_TEXT_OUTPUT_FORMAT_SEPARATOR =
+    new StrConfOption("giraph.textoutputformat.separator", "\t",
+                      "GiraphTextOuputFormat Separator");
+  /** Reverse values in the output */
+  BooleanConfOption GIRAPH_TEXT_OUTPUT_FORMAT_REVERSE =
+      new BooleanConfOption("giraph.textoutputformat.reverse", false,
+                            "Reverse values in the output");
+
   /**
    * If you use this option, instead of having saving vertices in the end of
    * application, saveVertex will be called right after each vertex.compute()

http://git-wip-us.apache.org/repos/asf/giraph/blob/ae01f039/giraph-core/src/main/java/org/apache/giraph/conf/ImmutableClassesGiraphConfiguration.java
----------------------------------------------------------------------
diff --git a/giraph-core/src/main/java/org/apache/giraph/conf/ImmutableClassesGiraphConfiguration.java b/giraph-core/src/main/java/org/apache/giraph/conf/ImmutableClassesGiraphConfiguration.java
index 49a2ebc..2506c21 100644
--- a/giraph-core/src/main/java/org/apache/giraph/conf/ImmutableClassesGiraphConfiguration.java
+++ b/giraph-core/src/main/java/org/apache/giraph/conf/ImmutableClassesGiraphConfiguration.java
@@ -36,11 +36,13 @@ import org.apache.giraph.graph.Language;
 import org.apache.giraph.graph.Vertex;
 import org.apache.giraph.graph.VertexResolver;
 import org.apache.giraph.io.EdgeInputFormat;
+import org.apache.giraph.io.EdgeOutputFormat;
 import org.apache.giraph.io.VertexInputFormat;
 import org.apache.giraph.io.VertexOutputFormat;
 import org.apache.giraph.io.filters.EdgeInputFilter;
 import org.apache.giraph.io.filters.VertexInputFilter;
 import org.apache.giraph.io.internal.WrappedEdgeInputFormat;
+import org.apache.giraph.io.internal.WrappedEdgeOutputFormat;
 import org.apache.giraph.io.internal.WrappedVertexInputFormat;
 import org.apache.giraph.io.internal.WrappedVertexOutputFormat;
 import org.apache.giraph.io.superstep_output.MultiThreadedSuperstepOutput;
@@ -289,6 +291,49 @@ public class ImmutableClassesGiraphConfiguration<I extends WritableComparable,
     return wrappedVertexOutputFormat;
   }
 
+  @Override
+  public boolean hasEdgeOutputFormat() {
+    return classes.hasEdgeOutputFormat();
+  }
+
+  /**
+   * Get the user's subclassed
+   * {@link org.apache.giraph.io.EdgeOutputFormat}.
+   *
+   * @return User's edge output format class
+   */
+  public Class<? extends EdgeOutputFormat<I, V, E>>
+  getEdgeOutputFormatClass() {
+    return classes.getEdgeOutputFormatClass();
+  }
+
+  /**
+   * Create a user edge output format class.
+   * Note: Giraph should only use WrappedEdgeOutputFormat,
+   * which makes sure that Configuration parameters are set properly.
+   *
+   * @return Instantiated user edge output format class
+   */
+  private EdgeOutputFormat<I, V, E> createEdgeOutputFormat() {
+    Class<? extends EdgeOutputFormat<I, V, E>> klass =
+        getEdgeOutputFormatClass();
+    return ReflectionUtils.newInstance(klass, this);
+  }
+
+  /**
+   * Create a wrapper for user edge output format,
+   * which makes sure that Configuration parameters are set properly in all
+   * methods related to this format.
+   *
+   * @return Wrapper around user edge output format
+   */
+  public WrappedEdgeOutputFormat<I, V, E> createWrappedEdgeOutputFormat() {
+    WrappedEdgeOutputFormat<I, V, E> wrappedEdgeOutputFormat =
+        new WrappedEdgeOutputFormat<I, V, E>(createEdgeOutputFormat());
+    configureIfPossible(wrappedEdgeOutputFormat);
+    return wrappedEdgeOutputFormat;
+  }
+
   /**
    * Create the proper superstep output, based on the configuration settings.
    *

http://git-wip-us.apache.org/repos/asf/giraph/blob/ae01f039/giraph-core/src/main/java/org/apache/giraph/io/EdgeOutputFormat.java
----------------------------------------------------------------------
diff --git a/giraph-core/src/main/java/org/apache/giraph/io/EdgeOutputFormat.java b/giraph-core/src/main/java/org/apache/giraph/io/EdgeOutputFormat.java
new file mode 100644
index 0000000..ac4c6ce
--- /dev/null
+++ b/giraph-core/src/main/java/org/apache/giraph/io/EdgeOutputFormat.java
@@ -0,0 +1,82 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.giraph.io;
+
+import java.io.IOException;
+
+import org.apache.giraph.conf.DefaultImmutableClassesGiraphConfigurable;
+import org.apache.hadoop.io.Writable;
+import org.apache.hadoop.io.WritableComparable;
+import org.apache.hadoop.mapreduce.JobContext;
+import org.apache.hadoop.mapreduce.OutputCommitter;
+import org.apache.hadoop.mapreduce.TaskAttemptContext;
+
+/**
+ * abstract class which can only write edges
+ *
+ * @param <I> Vertex id
+ * @param <V> Vertex value
+ * @param <E> Edge value
+ */
+@SuppressWarnings("rawtypes")
+public abstract class EdgeOutputFormat<
+    I extends WritableComparable, V extends Writable,
+    E extends Writable> extends
+    DefaultImmutableClassesGiraphConfigurable<I, V, E> {
+  /**
+   * Create an edge writer for a given split. The framework will call
+   * {@link EdgeWriter#initialize(TaskAttemptContext)} before
+   * the split is used.
+   *
+   * @param context the information about the task
+   * @return a new vertex writer
+   * @throws IOException
+   * @throws InterruptedException
+   */
+  public abstract EdgeWriter<I, V, E> createEdgeWriter(
+    TaskAttemptContext context) throws IOException, InterruptedException;
+
+  /**
+   * Check for validity of the output-specification for the job.
+   * (Copied from Hadoop OutputFormat)
+   *
+   * <p>This is to validate the output specification for the job when it is
+   * a job is submitted.  Typically checks that it does not already exist,
+   * throwing an exception when it already exists, so that output is not
+   * overwritten.</p>
+   *
+   * @param  context information about the job
+   * @throws IOException when output should not be attempted
+   */
+  public abstract void checkOutputSpecs(JobContext context)
+    throws IOException, InterruptedException;
+
+  /**
+   * Get the output committer for this output format. This is responsible
+   * for ensuring the output is committed correctly.
+   * (Copied from Hadoop OutputFormat)
+   *
+   * @param context the task context
+   * @return an output committer
+   * @throws IOException
+   * @throws InterruptedException
+   */
+  public abstract OutputCommitter getOutputCommitter(
+    TaskAttemptContext context) throws IOException, InterruptedException;
+}

http://git-wip-us.apache.org/repos/asf/giraph/blob/ae01f039/giraph-core/src/main/java/org/apache/giraph/io/EdgeWriter.java
----------------------------------------------------------------------
diff --git a/giraph-core/src/main/java/org/apache/giraph/io/EdgeWriter.java b/giraph-core/src/main/java/org/apache/giraph/io/EdgeWriter.java
new file mode 100644
index 0000000..e5a78c2
--- /dev/null
+++ b/giraph-core/src/main/java/org/apache/giraph/io/EdgeWriter.java
@@ -0,0 +1,74 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.giraph.io;
+
+import java.io.IOException;
+
+import org.apache.giraph.conf.DefaultImmutableClassesGiraphConfigurable;
+import org.apache.giraph.edge.Edge;
+import org.apache.hadoop.io.Writable;
+import org.apache.hadoop.io.WritableComparable;
+import org.apache.hadoop.mapreduce.TaskAttemptContext;
+
+/**
+ *
+ * @param <I> Vertex id
+ * @param <V> Vertex value
+ * @param <E> Edge value
+ */
+@SuppressWarnings("rawtypes")
+public abstract class EdgeWriter<
+    I extends WritableComparable, V extends Writable,
+    E extends Writable> extends
+    DefaultImmutableClassesGiraphConfigurable<I, V, E> {
+
+  /**
+   * Writes the next vertex and associated data
+   *
+   * @param   sourceId    the vertex ID from which the edge originates
+   * @param   sourceValue the vertex value; the vertex is the one from which
+   *                      the edge originates
+   * @param   edge        edge to be written
+   * @throws  IOException
+   * @throws  InterruptedException
+   */
+  public abstract void writeEdge(I sourceId, V sourceValue, Edge<I, E> edge)
+    throws IOException, InterruptedException;
+
+  /**
+   * Use the context to setup writing the edges.
+   * Guaranteed to be called prior to any other function.
+   *
+   * @param  context Context used to write the vertices.
+   * @throws IOException
+   * @throws InterruptedException
+   */
+  public abstract void initialize(TaskAttemptContext context)
+    throws IOException, InterruptedException;
+
+  /**
+   * Close this {@link EdgeWriter} to future operations.
+   *
+   * @param  context the context of the task
+   * @throws IOException
+   * @throws InterruptedException
+   */
+  public abstract void close(TaskAttemptContext context)
+    throws IOException, InterruptedException;
+}

http://git-wip-us.apache.org/repos/asf/giraph/blob/ae01f039/giraph-core/src/main/java/org/apache/giraph/io/formats/GiraphTextOutputFormat.java
----------------------------------------------------------------------
diff --git a/giraph-core/src/main/java/org/apache/giraph/io/formats/GiraphTextOutputFormat.java b/giraph-core/src/main/java/org/apache/giraph/io/formats/GiraphTextOutputFormat.java
new file mode 100644
index 0000000..582dea2
--- /dev/null
+++ b/giraph-core/src/main/java/org/apache/giraph/io/formats/GiraphTextOutputFormat.java
@@ -0,0 +1,90 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.giraph.io.formats;
+
+import java.io.DataOutputStream;
+import java.io.IOException;
+
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.FSDataOutputStream;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.io.Text;
+import org.apache.hadoop.io.compress.CompressionCodec;
+import org.apache.hadoop.io.compress.GzipCodec;
+import org.apache.hadoop.mapreduce.RecordWriter;
+import org.apache.hadoop.mapreduce.TaskAttemptContext;
+import org.apache.hadoop.mapreduce.lib.output.TextOutputFormat;
+import org.apache.hadoop.util.ReflectionUtils;
+
+/**
+ * The text output format used for Giraph text writing.
+ */
+public abstract class GiraphTextOutputFormat
+  extends TextOutputFormat<Text, Text> {
+
+  @Override
+  public RecordWriter<Text, Text> getRecordWriter(TaskAttemptContext job)
+    throws IOException, InterruptedException {
+    String extension = "";
+    CompressionCodec codec = null;
+    Configuration conf = job.getConfiguration();
+    boolean isCompressed = getCompressOutput(job);
+
+    if (isCompressed) {
+      Class<? extends CompressionCodec> codecClass =
+        getOutputCompressorClass(job, GzipCodec.class);
+      codec =
+        (CompressionCodec) ReflectionUtils.newInstance(codecClass, conf);
+      extension = codec.getDefaultExtension();
+    }
+    Path file = getDefaultWorkFile(job, extension);
+
+    /* adjust the path */
+    FSDataOutputStream fileOut;
+    FileSystem fs = file.getFileSystem(conf);
+    String subdir = getSubdir();
+    if (!subdir.isEmpty()) {
+      Path subdirPath = new Path(subdir);
+      Path subdirAbsPath = new Path(file.getParent(), subdirPath);
+      Path vertexFile = new Path(subdirAbsPath, file.getName());
+      fileOut = fs.create(vertexFile, false);
+    } else {
+      fileOut = fs.create(file, false);
+    }
+
+    String separator = "\t";
+
+    if (!isCompressed) {
+      return new LineRecordWriter<Text, Text>(fileOut, separator);
+    } else {
+      DataOutputStream out =
+        new DataOutputStream(codec.createOutputStream(fileOut));
+      return new LineRecordWriter<Text, Text>(out, separator);
+    }
+  }
+
+  /**
+   * This function is used to provide an additional path level to keep
+   * different text outputs into different directories.
+   *
+   * @return  the subdirectory to be created under the output path
+   */
+  protected abstract String getSubdir();
+}

http://git-wip-us.apache.org/repos/asf/giraph/blob/ae01f039/giraph-core/src/main/java/org/apache/giraph/io/formats/IdWithValueTextOutputFormat.java
----------------------------------------------------------------------
diff --git a/giraph-core/src/main/java/org/apache/giraph/io/formats/IdWithValueTextOutputFormat.java b/giraph-core/src/main/java/org/apache/giraph/io/formats/IdWithValueTextOutputFormat.java
index bd69586..e886059 100644
--- a/giraph-core/src/main/java/org/apache/giraph/io/formats/IdWithValueTextOutputFormat.java
+++ b/giraph-core/src/main/java/org/apache/giraph/io/formats/IdWithValueTextOutputFormat.java
@@ -77,19 +77,18 @@ public class IdWithValueTextOutputFormat<I extends WritableComparable,
     @Override
     protected Text convertVertexToLine(Vertex<I, V, E> vertex)
       throws IOException {
-      String first;
-      String second;
+
+      StringBuilder str = new StringBuilder();
       if (reverseOutput) {
-        first = vertex.getValue().toString();
-        second = vertex.getId().toString();
+        str.append(vertex.getValue().toString());
+        str.append(delimiter);
+        str.append(vertex.getId().toString());
       } else {
-        first = vertex.getId().toString();
-        second = vertex.getValue().toString();
+        str.append(vertex.getId().toString());
+        str.append(delimiter);
+        str.append(vertex.getValue().toString());
       }
-      Text line = new Text(first + delimiter + second);
-      return line;
+      return new Text(str.toString());
     }
-
   }
-
 }

http://git-wip-us.apache.org/repos/asf/giraph/blob/ae01f039/giraph-core/src/main/java/org/apache/giraph/io/formats/SrcIdDstIdEdgeValueTextOutputFormat.java
----------------------------------------------------------------------
diff --git a/giraph-core/src/main/java/org/apache/giraph/io/formats/SrcIdDstIdEdgeValueTextOutputFormat.java b/giraph-core/src/main/java/org/apache/giraph/io/formats/SrcIdDstIdEdgeValueTextOutputFormat.java
new file mode 100644
index 0000000..1d7478f
--- /dev/null
+++ b/giraph-core/src/main/java/org/apache/giraph/io/formats/SrcIdDstIdEdgeValueTextOutputFormat.java
@@ -0,0 +1,91 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.giraph.io.formats;
+
+import java.io.IOException;
+
+import org.apache.giraph.edge.Edge;
+import org.apache.hadoop.io.Text;
+import org.apache.hadoop.io.Writable;
+import org.apache.hadoop.io.WritableComparable;
+import org.apache.hadoop.mapreduce.TaskAttemptContext;
+
+import static org.apache.giraph.conf.GiraphConstants.GIRAPH_TEXT_OUTPUT_FORMAT_SEPARATOR;
+import static org.apache.giraph.conf.GiraphConstants.GIRAPH_TEXT_OUTPUT_FORMAT_REVERSE;
+
+/**
+ * Write out Edge Value with Source and Destination ID, but not the vertex
+ * value.
+ * This is a demostration output format to show the possibility to separately
+ * output edges from vertices.
+ *
+ * @param <I> Vertex index value
+ * @param <V> Vertex value
+ * @param <E> Edge value
+ */
+@SuppressWarnings("rawtypes")
+public class SrcIdDstIdEdgeValueTextOutputFormat<I extends WritableComparable,
+    V extends Writable, E extends Writable>
+    extends TextEdgeOutputFormat<I, V, E> {
+
+  @Override
+  public TextEdgeWriter createEdgeWriter(TaskAttemptContext context) {
+    return new SrcIdDstIdEdgeValueEdgeWriter();
+  }
+
+  /**
+   * Edge writer used with {@link SrcIdDstIdEdgeValueTextOutputFormat}.
+   */
+  protected class SrcIdDstIdEdgeValueEdgeWriter
+    extends TextEdgeWriterToEachLine {
+
+    /** Saved delimiter */
+    private String delimiter;
+    /** Cached reserve option */
+    private boolean reverseOutput;
+
+    @Override
+    public void initialize(TaskAttemptContext context)
+      throws IOException, InterruptedException {
+      super.initialize(context);
+      delimiter = GIRAPH_TEXT_OUTPUT_FORMAT_SEPARATOR.get(getConf());
+      reverseOutput = GIRAPH_TEXT_OUTPUT_FORMAT_REVERSE.get(getConf());
+    }
+
+    @Override
+    protected Text convertEdgeToLine(I sourceId, V sourceValue, Edge<I, E> edge)
+      throws IOException {
+      StringBuilder msg = new StringBuilder();
+      if (reverseOutput) {
+        msg.append(edge.getValue().toString());
+        msg.append(delimiter);
+        msg.append(edge.getTargetVertexId().toString());
+        msg.append(delimiter);
+        msg.append(sourceId.toString());
+      } else {
+        msg.append(sourceId.toString());
+        msg.append(delimiter);
+        msg.append(edge.getTargetVertexId().toString());
+        msg.append(delimiter);
+        msg.append(edge.getValue().toString());
+      }
+      return new Text(msg.toString());
+    }
+  }
+}

http://git-wip-us.apache.org/repos/asf/giraph/blob/ae01f039/giraph-core/src/main/java/org/apache/giraph/io/formats/TextEdgeOutputFormat.java
----------------------------------------------------------------------
diff --git a/giraph-core/src/main/java/org/apache/giraph/io/formats/TextEdgeOutputFormat.java b/giraph-core/src/main/java/org/apache/giraph/io/formats/TextEdgeOutputFormat.java
new file mode 100644
index 0000000..1b20c57
--- /dev/null
+++ b/giraph-core/src/main/java/org/apache/giraph/io/formats/TextEdgeOutputFormat.java
@@ -0,0 +1,165 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.giraph.io.formats;
+
+import static org.apache.giraph.conf.GiraphConstants.EDGE_OUTPUT_FORMAT_SUBDIR;
+
+import java.io.IOException;
+
+import org.apache.giraph.edge.Edge;
+import org.apache.giraph.io.EdgeOutputFormat;
+import org.apache.giraph.io.EdgeWriter;
+import org.apache.hadoop.io.Text;
+import org.apache.hadoop.io.Writable;
+import org.apache.hadoop.io.WritableComparable;
+import org.apache.hadoop.mapreduce.JobContext;
+import org.apache.hadoop.mapreduce.OutputCommitter;
+import org.apache.hadoop.mapreduce.RecordWriter;
+import org.apache.hadoop.mapreduce.TaskAttemptContext;
+
+/**
+ * Abstract class that users should subclass to use their own text based
+ * edge output format.
+ *
+ * @param <I> Vertex index value
+ * @param <V> Vertex value
+ * @param <E> Edge value
+ */
+@SuppressWarnings("rawtypes")
+public abstract class TextEdgeOutputFormat<I extends WritableComparable,
+    V extends Writable, E extends Writable>
+    extends EdgeOutputFormat<I, V, E> {
+  /** Uses the TextOutputFormat to do everything */
+  protected GiraphTextOutputFormat textOutputFormat =
+    new GiraphTextOutputFormat() {
+      @Override
+      protected String getSubdir() {
+        return EDGE_OUTPUT_FORMAT_SUBDIR.get(getConf());
+      }
+    };
+
+  @Override
+  public void checkOutputSpecs(JobContext context)
+    throws IOException, InterruptedException {
+    textOutputFormat.checkOutputSpecs(context);
+  }
+
+  @Override
+  public OutputCommitter getOutputCommitter(TaskAttemptContext context)
+    throws IOException, InterruptedException {
+    return textOutputFormat.getOutputCommitter(context);
+  }
+
+  /**
+   * The factory method which produces the {@link TextEdgeWriter} used by this
+   * output format.
+   *
+   * @param context  the information about the task
+   * @return         the text edge writer to be used
+   */
+  @Override
+  public abstract TextEdgeWriter createEdgeWriter(TaskAttemptContext
+      context) throws IOException, InterruptedException;
+
+  /**
+   * Abstract class to be implemented by the user based on their specific
+   * edge output.  Easiest to ignore the key value separator and only use
+   * key instead.
+   */
+  protected abstract class TextEdgeWriter
+      extends EdgeWriter<I, V, E> {
+    /** Internal line record writer */
+    private RecordWriter<Text, Text> lineRecordWriter;
+    /** Context passed to initialize */
+    private TaskAttemptContext context;
+
+    @Override
+    public void initialize(TaskAttemptContext context) throws IOException,
+           InterruptedException {
+      lineRecordWriter = createLineRecordWriter(context);
+      this.context = context;
+    }
+
+    /**
+     * Create the line record writer. Override this to use a different
+     * underlying record writer (useful for testing).
+     *
+     * @param  context the context passed to initialize
+     * @return the record writer to be used
+     * @throws IOException          exception that can be thrown during creation
+     * @throws InterruptedException exception that can be thrown during creation
+     */
+    protected RecordWriter<Text, Text> createLineRecordWriter(
+        TaskAttemptContext context) throws IOException, InterruptedException {
+      return textOutputFormat.getRecordWriter(context);
+    }
+
+    @Override
+    public void close(TaskAttemptContext context) throws IOException,
+        InterruptedException {
+      lineRecordWriter.close(context);
+    }
+
+    /**
+     * Get the line record writer.
+     *
+     * @return Record writer to be used for writing.
+     */
+    public RecordWriter<Text, Text> getRecordWriter() {
+      return lineRecordWriter;
+    }
+
+    /**
+     * Get the context.
+     *
+     * @return Context passed to initialize.
+     */
+    public TaskAttemptContext getContext() {
+      return context;
+    }
+  }
+
+  /**
+   * Abstract class to be implemented by the user to write a line for each
+   * edge.
+   */
+  protected abstract class TextEdgeWriterToEachLine extends TextEdgeWriter {
+
+    @Override
+    public final void writeEdge(I sourceId, V sourceValue, Edge<I, E> edge)
+      throws IOException, InterruptedException {
+
+      // Note we are writing line as key with null value
+      getRecordWriter().write(
+          convertEdgeToLine(sourceId, sourceValue, edge), null);
+    }
+
+    /**
+     * Writes a line for the given edge.
+     *
+     * @param sourceId    the current id of the source vertex
+     * @param sourceValue the current value of the source vertex
+     * @param edge        the current vertex for writing
+     * @return the text line to be written
+     * @throws IOException exception that can be thrown while writing
+     */
+    protected abstract Text convertEdgeToLine(I sourceId,
+      V sourceValue, Edge<I, E> edge) throws IOException;
+  }
+}

http://git-wip-us.apache.org/repos/asf/giraph/blob/ae01f039/giraph-core/src/main/java/org/apache/giraph/io/formats/TextVertexOutputFormat.java
----------------------------------------------------------------------
diff --git a/giraph-core/src/main/java/org/apache/giraph/io/formats/TextVertexOutputFormat.java b/giraph-core/src/main/java/org/apache/giraph/io/formats/TextVertexOutputFormat.java
index c91d543..c57ecd7 100644
--- a/giraph-core/src/main/java/org/apache/giraph/io/formats/TextVertexOutputFormat.java
+++ b/giraph-core/src/main/java/org/apache/giraph/io/formats/TextVertexOutputFormat.java
@@ -19,6 +19,7 @@
 package org.apache.giraph.io.formats;
 
 import java.io.IOException;
+
 import org.apache.giraph.graph.Vertex;
 import org.apache.giraph.io.VertexOutputFormat;
 import org.apache.giraph.io.VertexWriter;
@@ -29,7 +30,8 @@ import org.apache.hadoop.mapreduce.JobContext;
 import org.apache.hadoop.mapreduce.OutputCommitter;
 import org.apache.hadoop.mapreduce.RecordWriter;
 import org.apache.hadoop.mapreduce.TaskAttemptContext;
-import org.apache.hadoop.mapreduce.lib.output.TextOutputFormat;
+
+import static org.apache.giraph.conf.GiraphConstants.VERTEX_OUTPUT_FORMAT_SUBDIR;
 
 /**
  * Abstract class that users should subclass to use their own text based
@@ -43,10 +45,14 @@ import org.apache.hadoop.mapreduce.lib.output.TextOutputFormat;
 public abstract class TextVertexOutputFormat<I extends WritableComparable,
     V extends Writable, E extends Writable>
     extends VertexOutputFormat<I, V, E> {
-
   /** Uses the TextOutputFormat to do everything */
-  protected TextOutputFormat<Text, Text> textOutputFormat =
-      new TextOutputFormat<Text, Text>();
+  protected GiraphTextOutputFormat textOutputFormat =
+    new GiraphTextOutputFormat() {
+      @Override
+      protected String getSubdir() {
+        return VERTEX_OUTPUT_FORMAT_SUBDIR.get(getConf());
+      }
+    };
 
   @Override
   public void checkOutputSpecs(JobContext context)
@@ -161,5 +167,4 @@ public abstract class TextVertexOutputFormat<I extends WritableComparable,
     protected abstract Text convertVertexToLine(Vertex<I, V, E> vertex)
       throws IOException;
   }
-
 }

http://git-wip-us.apache.org/repos/asf/giraph/blob/ae01f039/giraph-core/src/main/java/org/apache/giraph/io/internal/WrappedEdgeOutputFormat.java
----------------------------------------------------------------------
diff --git a/giraph-core/src/main/java/org/apache/giraph/io/internal/WrappedEdgeOutputFormat.java b/giraph-core/src/main/java/org/apache/giraph/io/internal/WrappedEdgeOutputFormat.java
new file mode 100644
index 0000000..2222255
--- /dev/null
+++ b/giraph-core/src/main/java/org/apache/giraph/io/internal/WrappedEdgeOutputFormat.java
@@ -0,0 +1,169 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.giraph.io.internal;
+
+import java.io.IOException;
+
+import org.apache.giraph.conf.ImmutableClassesGiraphConfiguration;
+import org.apache.giraph.edge.Edge;
+import org.apache.giraph.io.EdgeOutputFormat;
+import org.apache.giraph.io.EdgeWriter;
+import org.apache.giraph.job.HadoopUtils;
+import org.apache.hadoop.io.Writable;
+import org.apache.hadoop.io.WritableComparable;
+import org.apache.hadoop.mapreduce.JobContext;
+import org.apache.hadoop.mapreduce.JobStatus;
+import org.apache.hadoop.mapreduce.OutputCommitter;
+import org.apache.hadoop.mapreduce.TaskAttemptContext;
+
+/**
+ * For internal use only.
+ *
+ * Wraps user set {@link EdgeOutputFormat} to make sure proper configuration
+ * parameters are passed around, that user can set parameters in
+ * configuration and they will be available in other methods related to this
+ * format.
+ *
+ * @param <I> Vertex id
+ * @param <V> Vertex data
+ * @param <E> Edge data
+ */
+@SuppressWarnings("rawtypes")
+public class WrappedEdgeOutputFormat<I extends WritableComparable,
+    V extends Writable, E extends Writable>
+    extends EdgeOutputFormat<I, V, E> {
+
+  /** {@link EdgeOutputFormat} which is wrapped */
+  private final EdgeOutputFormat<I, V, E> originalOutputFormat;
+
+  /**
+   * Constructor
+   *
+   * @param edgeOutputFormat Edge output format to wrap
+   */
+  public WrappedEdgeOutputFormat(
+      EdgeOutputFormat<I, V, E> edgeOutputFormat) {
+    originalOutputFormat = edgeOutputFormat;
+  }
+
+  @Override
+  public EdgeWriter<I, V, E> createEdgeWriter(
+      TaskAttemptContext context) throws IOException, InterruptedException {
+    final EdgeWriter<I, V, E> edgeWriter =
+        originalOutputFormat.createEdgeWriter(
+            HadoopUtils.makeTaskAttemptContext(getConf(), context));
+    return new EdgeWriter<I, V, E>() {
+      @Override
+      public void setConf(ImmutableClassesGiraphConfiguration<I, V, E> conf) {
+        super.setConf(conf);
+        edgeWriter.setConf(conf);
+      }
+
+      @Override
+      public void initialize(TaskAttemptContext context)
+        throws IOException, InterruptedException {
+        edgeWriter.initialize(
+          HadoopUtils.makeTaskAttemptContext(getConf(), context));
+      }
+
+      @Override
+      public void close(
+          TaskAttemptContext context) throws IOException, InterruptedException {
+        edgeWriter.close(
+          HadoopUtils.makeTaskAttemptContext(getConf(), context));
+      }
+
+      @Override
+      public void writeEdge(I sourceId, V sourceValue, Edge<I, E> edge)
+        throws IOException, InterruptedException {
+        edgeWriter.writeEdge(sourceId, sourceValue, edge);
+      }
+    };
+  }
+
+  @Override
+  public void checkOutputSpecs(JobContext context)
+    throws IOException, InterruptedException {
+    originalOutputFormat.checkOutputSpecs(
+        HadoopUtils.makeJobContext(getConf(), context));
+  }
+
+  @Override
+  public OutputCommitter getOutputCommitter(TaskAttemptContext context)
+    throws IOException, InterruptedException {
+
+    final OutputCommitter outputCommitter =
+        originalOutputFormat.getOutputCommitter(
+            HadoopUtils.makeTaskAttemptContext(getConf(), context));
+
+    return new OutputCommitter() {
+      @Override
+      public void setupJob(JobContext context) throws IOException {
+        outputCommitter.setupJob(
+            HadoopUtils.makeJobContext(getConf(), context));
+      }
+
+      @Override
+      public void setupTask(TaskAttemptContext context) throws IOException {
+        outputCommitter.setupTask(
+            HadoopUtils.makeTaskAttemptContext(getConf(), context));
+      }
+
+      @Override
+      public boolean needsTaskCommit(
+          TaskAttemptContext context) throws IOException {
+        return outputCommitter.needsTaskCommit(
+            HadoopUtils.makeTaskAttemptContext(getConf(), context));
+      }
+
+      @Override
+      public void commitTask(TaskAttemptContext context) throws IOException {
+        outputCommitter.commitTask(
+            HadoopUtils.makeTaskAttemptContext(getConf(), context));
+      }
+
+      @Override
+      public void abortTask(TaskAttemptContext context) throws IOException {
+        outputCommitter.abortTask(
+            HadoopUtils.makeTaskAttemptContext(getConf(), context));
+      }
+
+      @Override
+      public void cleanupJob(JobContext context) throws IOException {
+        outputCommitter.cleanupJob(
+            HadoopUtils.makeJobContext(getConf(), context));
+      }
+
+      /*if_not[HADOOP_NON_COMMIT_JOB]*/
+      @Override
+      public void commitJob(JobContext context) throws IOException {
+        outputCommitter.commitJob(
+            HadoopUtils.makeJobContext(getConf(), context));
+      }
+
+      @Override
+      public void abortJob(JobContext context,
+          JobStatus.State state) throws IOException {
+        outputCommitter.abortJob(
+            HadoopUtils.makeJobContext(getConf(), context), state);
+      }
+      /*end[HADOOP_NON_COMMIT_JOB]*/
+    };
+  }
+}

http://git-wip-us.apache.org/repos/asf/giraph/blob/ae01f039/giraph-core/src/main/java/org/apache/giraph/utils/ConfigurationUtils.java
----------------------------------------------------------------------
diff --git a/giraph-core/src/main/java/org/apache/giraph/utils/ConfigurationUtils.java b/giraph-core/src/main/java/org/apache/giraph/utils/ConfigurationUtils.java
index 745764b..4bc4f4d 100644
--- a/giraph-core/src/main/java/org/apache/giraph/utils/ConfigurationUtils.java
+++ b/giraph-core/src/main/java/org/apache/giraph/utils/ConfigurationUtils.java
@@ -36,6 +36,7 @@ import org.apache.giraph.factories.VertexValueFactory;
 import org.apache.giraph.graph.Computation;
 import org.apache.giraph.graph.Language;
 import org.apache.giraph.io.EdgeInputFormat;
+import org.apache.giraph.io.EdgeOutputFormat;
 import org.apache.giraph.io.VertexInputFormat;
 import org.apache.giraph.io.VertexOutputFormat;
 import org.apache.giraph.io.formats.GiraphFileInputFormat;
@@ -97,10 +98,16 @@ public final class ConfigurationUtils {
     OPTIONS.addOption("w", "workers", true, "Number of workers");
     OPTIONS.addOption("vif", "vertexInputFormat", true, "Vertex input format");
     OPTIONS.addOption("eif", "edgeInputFormat", true, "Edge input format");
-    OPTIONS.addOption("of", "outputFormat", true, "Vertex output format");
+    OPTIONS.addOption("vof", "vertexOutputFormat", true,
+        "Vertex output format");
+    OPTIONS.addOption("eof", "edgeOutputFormat", true, "Edge output format");
     OPTIONS.addOption("vip", "vertexInputPath", true, "Vertex input path");
     OPTIONS.addOption("eip", "edgeInputPath", true, "Edge input path");
-    OPTIONS.addOption("op", "outputPath", true, "Vertex output path");
+    OPTIONS.addOption("op",  "outputPath", true, "Output path");
+    OPTIONS.addOption("vsd",  "vertexSubDir", true, "subdirectory to be used " +
+        "for the vertex output");
+    OPTIONS.addOption("esd",  "edgeSubDir", true, "subdirectory to be used " +
+        "for the edge output");
     OPTIONS.addOption("c", "combiner", true, "Combiner class");
     OPTIONS.addOption("ve", "outEdges", true, "Vertex edges class");
     OPTIONS.addOption("wc", "workerContext", true, "WorkerContext class");
@@ -316,14 +323,46 @@ public final class ConfigurationUtils {
           "InputFormat does not require one.");
       }
     }
-    if (cmd.hasOption("of")) {
+    if (cmd.hasOption("vof")) {
       conf.setVertexOutputFormatClass(
           (Class<? extends VertexOutputFormat>) Class
-              .forName(cmd.getOptionValue("of")));
+              .forName(cmd.getOptionValue("vof")));
     } else {
       if (LOG.isInfoEnabled()) {
-        LOG.info("No output format specified. Ensure your OutputFormat " +
-          "does not require one.");
+        LOG.info("No vertex output format specified. Ensure your " +
+          "OutputFormat does not require one.");
+      }
+    }
+    if (cmd.hasOption("vof")) {
+      if (cmd.hasOption("vsd")) {
+        conf.setVertexOutputFormatSubdir(cmd.getOptionValue("vsd"));
+      }
+    }
+    if (cmd.hasOption("eof")) {
+      conf.setEdgeOutputFormatClass(
+          (Class<? extends EdgeOutputFormat>) Class
+              .forName(cmd.getOptionValue("eof")));
+    } else {
+      if (LOG.isInfoEnabled()) {
+        LOG.info("No edge output format specified. Ensure your " +
+          "OutputFormat does not require one.");
+      }
+    }
+    if (cmd.hasOption("eof")) {
+      if (cmd.hasOption("esd")) {
+        conf.setEdgeOutputFormatSubdir(cmd.getOptionValue("esd"));
+      }
+    }
+    /* check for path clashes */
+    if (cmd.hasOption("vof") && cmd.hasOption("eof") && cmd.hasOption("op")) {
+      if (!cmd.hasOption("vsd") || cmd.hasOption("esd")) {
+        if (!conf.hasEdgeOutputFormatSubdir() ||
+            !conf.hasVertexOutputFormatSubdir()) {
+
+          throw new IllegalArgumentException("If VertexOutputFormat and " +
+              "EdgeOutputFormat are both set, it is mandatory to provide " +
+              "both vertex subdirectory as well as edge subdirectory");
+        }
       }
     }
     if (cmd.hasOption("pc")) {
@@ -385,7 +424,7 @@ public final class ConfigurationUtils {
           Integer.parseInt(cmd.getOptionValue("yh")));
     }
     /*if[PURE_YARN]
-    if (cmd.hasOption("of")) {
+    if (cmd.hasOption("vof") || cmd.hasOption("eof")) {
       if (cmd.hasOption("op")) {
         // For YARN conf to get the out dir we need w/o a Job obj
         Path outputDir =

http://git-wip-us.apache.org/repos/asf/giraph/blob/ae01f039/giraph-core/src/main/java/org/apache/giraph/worker/BspServiceWorker.java
----------------------------------------------------------------------
diff --git a/giraph-core/src/main/java/org/apache/giraph/worker/BspServiceWorker.java b/giraph-core/src/main/java/org/apache/giraph/worker/BspServiceWorker.java
index 9311fbd..112b76d 100644
--- a/giraph-core/src/main/java/org/apache/giraph/worker/BspServiceWorker.java
+++ b/giraph-core/src/main/java/org/apache/giraph/worker/BspServiceWorker.java
@@ -32,6 +32,7 @@ import org.apache.giraph.comm.netty.NettyWorkerClientRequestProcessor;
 import org.apache.giraph.comm.netty.NettyWorkerServer;
 import org.apache.giraph.conf.GiraphConstants;
 import org.apache.giraph.conf.ImmutableClassesGiraphConfiguration;
+import org.apache.giraph.edge.Edge;
 import org.apache.giraph.graph.AddressesAndPartitionsWritable;
 import org.apache.giraph.graph.FinishedSuperstepStats;
 import org.apache.giraph.graph.GlobalStats;
@@ -40,6 +41,8 @@ import org.apache.giraph.graph.InputSplitEvents;
 import org.apache.giraph.graph.InputSplitPaths;
 import org.apache.giraph.graph.Vertex;
 import org.apache.giraph.graph.VertexEdgeCount;
+import org.apache.giraph.io.EdgeOutputFormat;
+import org.apache.giraph.io.EdgeWriter;
 import org.apache.giraph.io.VertexOutputFormat;
 import org.apache.giraph.io.VertexWriter;
 import org.apache.giraph.io.superstep_output.SuperstepOutput;
@@ -919,13 +922,15 @@ public class BspServiceWorker<I extends WritableComparable,
    */
   private void saveVertices(long numLocalVertices) throws IOException,
       InterruptedException {
-    if (getConfiguration().getVertexOutputFormatClass() == null) {
+    ImmutableClassesGiraphConfiguration<I, V, E>  conf = getConfiguration();
+
+    if (conf.getVertexOutputFormatClass() == null) {
       LOG.warn("saveVertices: " +
           GiraphConstants.VERTEX_OUTPUT_FORMAT_CLASS +
           " not specified -- there will be no saved output");
       return;
     }
-    if (getConfiguration().doOutputDuringComputation()) {
+    if (conf.doOutputDuringComputation()) {
       if (LOG.isInfoEnabled()) {
         LOG.info("saveVertices: The option for doing output during " +
             "computation is selected, so there will be no saving of the " +
@@ -1024,12 +1029,126 @@ public class BspServiceWorker<I extends WritableComparable,
     }
   }
 
+  /**
+   * Save the edges using the user-defined EdgeOutputFormat from our
+   * vertexArray based on the split.
+   *
+   * @throws InterruptedException
+   */
+  private void saveEdges() throws IOException, InterruptedException {
+    final ImmutableClassesGiraphConfiguration<I, V, E>  conf =
+      getConfiguration();
+
+    if (conf.getEdgeOutputFormatClass() == null) {
+      LOG.warn("saveEdges: " +
+               GiraphConstants.EDGE_OUTPUT_FORMAT_CLASS +
+               "Make sure that the EdgeOutputFormat is not required.");
+      return;
+    }
+
+    final int numPartitions = getPartitionStore().getNumPartitions();
+    int numThreads = Math.min(conf.getNumOutputThreads(),
+        numPartitions);
+    LoggerUtils.setStatusAndLog(getContext(), LOG, Level.INFO,
+        "saveEdges: Starting to save the edges using " +
+        numThreads + " threads");
+    final EdgeOutputFormat<I, V, E> edgeOutputFormat =
+        conf.createWrappedEdgeOutputFormat();
+
+    final Queue<Integer> partitionIdQueue =
+        (numPartitions == 0) ? new LinkedList<Integer>() :
+            new ArrayBlockingQueue<Integer>(numPartitions);
+    Iterables.addAll(partitionIdQueue, getPartitionStore().getPartitionIds());
+
+    CallableFactory<Void> callableFactory = new CallableFactory<Void>() {
+      @Override
+      public Callable<Void> newCallable(int callableId) {
+        return new Callable<Void>() {
+          @Override
+          public Void call() throws Exception {
+            EdgeWriter<I, V, E>  edgeWriter =
+                edgeOutputFormat.createEdgeWriter(getContext());
+            edgeWriter.setConf(conf);
+            edgeWriter.initialize(getContext());
+
+            long nextPrintVertices = 0;
+            long nextPrintMsecs = System.currentTimeMillis() + 15000;
+            int partitionIndex = 0;
+            int numPartitions = getPartitionStore().getNumPartitions();
+            while (!partitionIdQueue.isEmpty()) {
+              Integer partitionId = partitionIdQueue.poll();
+              if (partitionId == null) {
+                break;
+              }
+
+              Partition<I, V, E> partition =
+                  getPartitionStore().getPartition(partitionId);
+              long vertices = 0;
+              long edges = 0;
+              long partitionEdgeCount = partition.getEdgeCount();
+              for (Vertex<I, V, E> vertex : partition) {
+                for (Edge<I, E> edge : vertex.getEdges()) {
+                  edgeWriter.writeEdge(vertex.getId(), vertex.getValue(), edge);
+                  ++edges;
+                }
+                ++vertices;
+
+                // Update status at most every 250k vertices or 15 seconds
+                if (vertices > nextPrintVertices &&
+                    System.currentTimeMillis() > nextPrintMsecs) {
+                  LoggerUtils.setStatusAndLog(getContext(), LOG, Level.INFO,
+                      "saveEdges: Saved " + edges +
+                      " edges out of " + partitionEdgeCount +
+                      " partition edges, on partition " + partitionIndex +
+                      " out of " + numPartitions);
+                  nextPrintMsecs = System.currentTimeMillis() + 15000;
+                  nextPrintVertices = vertices + 250000;
+                }
+              }
+              getPartitionStore().putPartition(partition);
+              ++partitionIndex;
+            }
+            edgeWriter.close(getContext()); // the temp results are saved now
+            return null;
+          }
+        };
+      }
+    };
+    ProgressableUtils.getResultsWithNCallables(callableFactory, numThreads,
+        "save-vertices-%d", getContext());
+
+    LoggerUtils.setStatusAndLog(getContext(), LOG, Level.INFO,
+      "saveEdges: Done saving edges.");
+    // YARN: must complete the commit the "task" output, Hadoop isn't there.
+    if (conf.isPureYarnJob() &&
+      conf.getVertexOutputFormatClass() != null) {
+      try {
+        OutputCommitter outputCommitter =
+          edgeOutputFormat.getOutputCommitter(getContext());
+        if (outputCommitter.needsTaskCommit(getContext())) {
+          LoggerUtils.setStatusAndLog(getContext(), LOG, Level.INFO,
+            "OutputCommitter: committing task output.");
+          // transfer from temp dirs to "task commit" dirs to prep for
+          // the master's OutputCommitter#commitJob(context) call to finish.
+          outputCommitter.commitTask(getContext());
+        }
+      } catch (InterruptedException ie) {
+        LOG.error("Interrupted while attempting to obtain " +
+          "OutputCommitter.", ie);
+      } catch (IOException ioe) {
+        LOG.error("Master task's attempt to commit output has " +
+          "FAILED.", ioe);
+      }
+    }
+  }
+
   @Override
   public void cleanup(FinishedSuperstepStats finishedSuperstepStats)
     throws IOException, InterruptedException {
     workerClient.closeConnections();
     setCachedSuperstep(getSuperstep() - 1);
     saveVertices(finishedSuperstepStats.getLocalVertexCount());
+    saveEdges();
     getPartitionStore().shutdown();
     // All worker processes should denote they are done by adding special
     // znode.  Once the number of znodes equals the number of partitions
@@ -1331,7 +1450,6 @@ else[HADOOP_NON_SECURE]*/
       }
     }
 
-
     try {
       workerClientRequestProcessor.flush();
       workerClient.waitAllRequests();