You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@giraph.apache.org by ma...@apache.org on 2013/03/30 09:14:14 UTC

git commit: updated refs/heads/trunk to 67f5f74

Updated Branches:
  refs/heads/trunk da2a68708 -> 67f5f7475


GIRAPH-600: Create an option to do output during computation (majakabiljo)


Project: http://git-wip-us.apache.org/repos/asf/giraph/repo
Commit: http://git-wip-us.apache.org/repos/asf/giraph/commit/67f5f747
Tree: http://git-wip-us.apache.org/repos/asf/giraph/tree/67f5f747
Diff: http://git-wip-us.apache.org/repos/asf/giraph/diff/67f5f747

Branch: refs/heads/trunk
Commit: 67f5f747578b0e67944da2fea9e4a3d3a22c4e09
Parents: da2a687
Author: Maja Kabiljo <ma...@maja-mbp.local>
Authored: Sat Mar 30 01:13:29 2013 -0700
Committer: Maja Kabiljo <ma...@maja-mbp.local>
Committed: Sat Mar 30 01:13:29 2013 -0700

----------------------------------------------------------------------
 CHANGELOG                                          |    2 +
 .../giraph/bsp/CentralizedServiceWorker.java       |    8 +
 .../apache/giraph/conf/GiraphConfiguration.java    |   40 +++++
 .../org/apache/giraph/conf/GiraphConstants.java    |   16 ++
 .../conf/ImmutableClassesGiraphConfiguration.java  |   24 +++
 .../org/apache/giraph/graph/ComputeCallable.java   |   15 ++-
 .../org/apache/giraph/graph/GraphTaskManager.java  |    3 +-
 .../org/apache/giraph/io/SimpleVertexWriter.java   |   45 ++++++
 .../java/org/apache/giraph/io/VertexWriter.java    |   13 +--
 .../MultiThreadedSuperstepOutput.java              |  119 +++++++++++++++
 .../io/superstep_output/NoOpSuperstepOutput.java   |   57 +++++++
 .../io/superstep_output/SuperstepOutput.java       |   57 +++++++
 .../SynchronizedSuperstepOutput.java               |   95 ++++++++++++
 .../giraph/io/superstep_output/package-info.java   |   21 +++
 .../org/apache/giraph/worker/BspServiceWorker.java |   21 +++-
 15 files changed, 521 insertions(+), 15 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/giraph/blob/67f5f747/CHANGELOG
----------------------------------------------------------------------
diff --git a/CHANGELOG b/CHANGELOG
index 8263026..e0a82a6 100644
--- a/CHANGELOG
+++ b/CHANGELOG
@@ -1,6 +1,8 @@
 Giraph Change Log
 
 Release 0.2.0 - unreleased
+  GIRAPH-600: Create an option to do output during computation (majakabiljo)
+
   GIRAPH-599: Hive IO dependency issues with some Hadoop profiles (nitay via majakabiljo)
 
   GIRAPH-577: Create a testing framework that doesn't require I/O formats (ves via apresta)

http://git-wip-us.apache.org/repos/asf/giraph/blob/67f5f747/giraph-core/src/main/java/org/apache/giraph/bsp/CentralizedServiceWorker.java
----------------------------------------------------------------------
diff --git a/giraph-core/src/main/java/org/apache/giraph/bsp/CentralizedServiceWorker.java b/giraph-core/src/main/java/org/apache/giraph/bsp/CentralizedServiceWorker.java
index 56b5d03..1c7bde4 100644
--- a/giraph-core/src/main/java/org/apache/giraph/bsp/CentralizedServiceWorker.java
+++ b/giraph-core/src/main/java/org/apache/giraph/bsp/CentralizedServiceWorker.java
@@ -24,6 +24,7 @@ import org.apache.giraph.graph.FinishedSuperstepStats;
 import org.apache.giraph.graph.GraphState;
 import org.apache.giraph.graph.GraphTaskManager;
 import org.apache.giraph.graph.VertexEdgeCount;
+import org.apache.giraph.io.superstep_output.SuperstepOutput;
 import org.apache.giraph.master.MasterInfo;
 import org.apache.giraph.partition.PartitionOwner;
 import org.apache.giraph.partition.PartitionStats;
@@ -223,6 +224,13 @@ public interface CentralizedServiceWorker<I extends WritableComparable,
   void prepareSuperstep();
 
   /**
+   * Get the superstep output class
+   *
+   * @return SuperstepOutput
+   */
+  SuperstepOutput<I, V, E> getSuperstepOutput();
+
+  /**
    * Clean up the service (no calls may be issued after this)
    *
    * @param finishedSuperstepStats Finished supestep stats

http://git-wip-us.apache.org/repos/asf/giraph/blob/67f5f747/giraph-core/src/main/java/org/apache/giraph/conf/GiraphConfiguration.java
----------------------------------------------------------------------
diff --git a/giraph-core/src/main/java/org/apache/giraph/conf/GiraphConfiguration.java b/giraph-core/src/main/java/org/apache/giraph/conf/GiraphConfiguration.java
index 963b82a..040c26f 100644
--- a/giraph-core/src/main/java/org/apache/giraph/conf/GiraphConfiguration.java
+++ b/giraph-core/src/main/java/org/apache/giraph/conf/GiraphConfiguration.java
@@ -262,6 +262,46 @@ public class GiraphConfiguration extends Configuration
   }
 
   /**
+   * Check if output should be done during computation
+   *
+   * @return True iff output should be done during computation
+   */
+  public final boolean doOutputDuringComputation() {
+    return DO_OUTPUT_DURING_COMPUTATION.get(this);
+  }
+
+  /**
+   * Set whether or not we should do output during computation
+   *
+   * @param doOutputDuringComputation True iff we want output to happen
+   *                                  during computation
+   */
+  public final void setDoOutputDuringComputation(
+      boolean doOutputDuringComputation) {
+    DO_OUTPUT_DURING_COMPUTATION.set(this, doOutputDuringComputation);
+  }
+
+  /**
+   * Check if VertexOutputFormat is thread-safe
+   *
+   * @return True iff VertexOutputFormat is thread-safe
+   */
+  public final boolean vertexOutputFormatThreadSafe() {
+    return VERTEX_OUTPUT_FORMAT_THREAD_SAFE.get(this);
+  }
+
+  /**
+   * Set whether or not selected VertexOutputFormat is thread-safe
+   *
+   * @param vertexOutputFormatThreadSafe True iff selected VertexOutputFormat
+   *                                     is thread-safe
+   */
+  public final void setVertexOutputFormatThreadSafe(
+      boolean vertexOutputFormatThreadSafe) {
+    VERTEX_OUTPUT_FORMAT_THREAD_SAFE.set(this, vertexOutputFormatThreadSafe);
+  }
+
+  /**
    * Set the vertex combiner class (optional)
    *
    * @param vertexCombinerClass Determines how vertex messages are combined

http://git-wip-us.apache.org/repos/asf/giraph/blob/67f5f747/giraph-core/src/main/java/org/apache/giraph/conf/GiraphConstants.java
----------------------------------------------------------------------
diff --git a/giraph-core/src/main/java/org/apache/giraph/conf/GiraphConstants.java b/giraph-core/src/main/java/org/apache/giraph/conf/GiraphConstants.java
index c5b9b93..eaa8363 100644
--- a/giraph-core/src/main/java/org/apache/giraph/conf/GiraphConstants.java
+++ b/giraph-core/src/main/java/org/apache/giraph/conf/GiraphConstants.java
@@ -124,6 +124,22 @@ public interface GiraphConstants {
   ClassConfOption<VertexOutputFormat> VERTEX_OUTPUT_FORMAT_CLASS =
       ClassConfOption.create("giraph.vertexOutputFormatClass", null,
           VertexOutputFormat.class);
+  /**
+   * If you use this option, instead of having saving vertices in the end of
+   * application, saveVertex will be called right after each vertex.compute()
+   * is called.
+   * NOTE: This feature doesn't work well with checkpointing - if you restart
+   * from a checkpoint you won't have any ouptut from previous supresteps.
+   */
+  BooleanConfOption DO_OUTPUT_DURING_COMPUTATION =
+      new BooleanConfOption("giraph.doOutputDuringComputation", false);
+  /**
+   * Vertex output format thread-safe - if your VertexOutputFormat allows
+   * several vertexWriters to be created and written to in parallel,
+   * you should set this to true.
+   */
+  BooleanConfOption VERTEX_OUTPUT_FORMAT_THREAD_SAFE =
+      new BooleanConfOption("giraph.vertexOutputFormatThreadSafe", false);
 
   /** Output Format Path (for Giraph-on-YARN) */
   String GIRAPH_OUTPUT_DIR = "giraph.output.dir";

http://git-wip-us.apache.org/repos/asf/giraph/blob/67f5f747/giraph-core/src/main/java/org/apache/giraph/conf/ImmutableClassesGiraphConfiguration.java
----------------------------------------------------------------------
diff --git a/giraph-core/src/main/java/org/apache/giraph/conf/ImmutableClassesGiraphConfiguration.java b/giraph-core/src/main/java/org/apache/giraph/conf/ImmutableClassesGiraphConfiguration.java
index 76f6105..e290c57 100644
--- a/giraph-core/src/main/java/org/apache/giraph/conf/ImmutableClassesGiraphConfiguration.java
+++ b/giraph-core/src/main/java/org/apache/giraph/conf/ImmutableClassesGiraphConfiguration.java
@@ -31,6 +31,10 @@ import org.apache.giraph.graph.VertexValueFactory;
 import org.apache.giraph.io.EdgeInputFormat;
 import org.apache.giraph.io.VertexInputFormat;
 import org.apache.giraph.io.VertexOutputFormat;
+import org.apache.giraph.io.superstep_output.MultiThreadedSuperstepOutput;
+import org.apache.giraph.io.superstep_output.NoOpSuperstepOutput;
+import org.apache.giraph.io.superstep_output.SuperstepOutput;
+import org.apache.giraph.io.superstep_output.SynchronizedSuperstepOutput;
 import org.apache.giraph.job.GiraphJobObserver;
 import org.apache.giraph.master.MasterCompute;
 import org.apache.giraph.master.MasterObserver;
@@ -50,6 +54,7 @@ import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.io.NullWritable;
 import org.apache.hadoop.io.Writable;
 import org.apache.hadoop.io.WritableComparable;
+import org.apache.hadoop.mapreduce.Mapper;
 import org.apache.hadoop.util.Progressable;
 
 import static org.apache.giraph.conf.GiraphConstants.USE_UNSAFE_SERIALIZATION;
@@ -201,6 +206,25 @@ public class ImmutableClassesGiraphConfiguration<I extends WritableComparable,
   }
 
   /**
+   * Create the proper superstep output, based on the configuration settings.
+   *
+   * @param context Mapper context
+   * @return SuperstepOutput
+   */
+  public SuperstepOutput<I, V, E> createSuperstepOutput(
+      Mapper<?, ?, ?, ?>.Context context) {
+    if (doOutputDuringComputation()) {
+      if (vertexOutputFormatThreadSafe()) {
+        return new MultiThreadedSuperstepOutput<I, V, E>(this, context);
+      } else {
+        return new SynchronizedSuperstepOutput<I, V, E>(this, context);
+      }
+    } else {
+      return new NoOpSuperstepOutput<I, V, E>();
+    }
+  }
+
+  /**
    * Does the job have an {@link EdgeInputFormat}?
    *
    * @return True iff an {@link EdgeInputFormat} has been specified.

http://git-wip-us.apache.org/repos/asf/giraph/blob/67f5f747/giraph-core/src/main/java/org/apache/giraph/graph/ComputeCallable.java
----------------------------------------------------------------------
diff --git a/giraph-core/src/main/java/org/apache/giraph/graph/ComputeCallable.java b/giraph-core/src/main/java/org/apache/giraph/graph/ComputeCallable.java
index 4840471..51ed4f6 100644
--- a/giraph-core/src/main/java/org/apache/giraph/graph/ComputeCallable.java
+++ b/giraph-core/src/main/java/org/apache/giraph/graph/ComputeCallable.java
@@ -22,6 +22,7 @@ import org.apache.giraph.comm.WorkerClientRequestProcessor;
 import org.apache.giraph.comm.messages.MessageStoreByPartition;
 import org.apache.giraph.comm.netty.NettyWorkerClientRequestProcessor;
 import org.apache.giraph.conf.ImmutableClassesGiraphConfiguration;
+import org.apache.giraph.io.SimpleVertexWriter;
 import org.apache.giraph.metrics.GiraphMetrics;
 import org.apache.giraph.metrics.MetricNames;
 import org.apache.giraph.metrics.SuperstepMetricsRegistry;
@@ -89,6 +90,8 @@ public class ComputeCallable<I extends WritableComparable, V extends Writable,
   /** Sends the messages (unique per Callable) */
   private WorkerClientRequestProcessor<I, V, E, M>
   workerClientRequestProcessor;
+  /** VertexWriter for this ComputeCallable */
+  private SimpleVertexWriter<I, V, E> vertexWriter;
   /** Get the start time in nanos */
   private final long startNanos = TIME.getNanoseconds();
 
@@ -143,6 +146,8 @@ public class ComputeCallable<I extends WritableComparable, V extends Writable,
         context, graphState.getGraphTaskManager(), workerClientRequestProcessor,
         aggregatorUsage);
 
+    vertexWriter = serviceWorker.getSuperstepOutput().getVertexWriter();
+
     List<PartitionStats> partitionStatsList = Lists.newArrayList();
     while (!partitionIdQueue.isEmpty()) {
       Integer partitionId = partitionIdQueue.poll();
@@ -165,11 +170,17 @@ public class ComputeCallable<I extends WritableComparable, V extends Writable,
       } catch (IOException e) {
         throw new IllegalStateException("call: Caught unexpected IOException," +
             " failing.", e);
+      } catch (InterruptedException e) {
+        throw new IllegalStateException("call: Caught unexpected " +
+            "InterruptedException, failing.", e);
       } finally {
         serviceWorker.getPartitionStore().putPartition(partition);
       }
     }
 
+    // Return VertexWriter after the usage
+    serviceWorker.getSuperstepOutput().returnVertexWriter(vertexWriter);
+
     if (LOG.isInfoEnabled()) {
       float seconds = Times.getNanosSince(TIME, startNanos) /
           Time.NS_PER_SECOND_AS_FLOAT;
@@ -193,7 +204,7 @@ public class ComputeCallable<I extends WritableComparable, V extends Writable,
    * @return Partition stats for this computed partition
    */
   private PartitionStats computePartition(Partition<I, V, E, M> partition)
-    throws IOException {
+    throws IOException, InterruptedException {
     PartitionStats partitionStats =
         new PartitionStats(partition.getId(), 0, 0, 0, 0);
     // Make sure this is thread-safe across runs
@@ -225,6 +236,8 @@ public class ComputeCallable<I extends WritableComparable, V extends Writable,
           }
           // Need to unwrap the mutated edges (possibly)
           vertex.unwrapMutableEdges();
+          // Write vertex to superstep output (no-op if it is not used)
+          vertexWriter.writeVertex(vertex);
           // Need to save the vertex changes (possibly)
           partition.saveVertex(vertex);
         }

http://git-wip-us.apache.org/repos/asf/giraph/blob/67f5f747/giraph-core/src/main/java/org/apache/giraph/graph/GraphTaskManager.java
----------------------------------------------------------------------
diff --git a/giraph-core/src/main/java/org/apache/giraph/graph/GraphTaskManager.java b/giraph-core/src/main/java/org/apache/giraph/graph/GraphTaskManager.java
index 57f7dff..3ae5ed3 100644
--- a/giraph-core/src/main/java/org/apache/giraph/graph/GraphTaskManager.java
+++ b/giraph-core/src/main/java/org/apache/giraph/graph/GraphTaskManager.java
@@ -288,9 +288,10 @@ public class GraphTaskManager<I extends WritableComparable, V extends Writable,
   /**
    * Handle post-application callbacks.
    */
-  private void postApplication() {
+  private void postApplication() throws IOException, InterruptedException {
     GiraphTimerContext postAppTimerContext = wcPostAppTimer.time();
     serviceWorker.getWorkerContext().postApplication();
+    serviceWorker.getSuperstepOutput().postApplication();
     postAppTimerContext.stop();
     context.progress();
 

http://git-wip-us.apache.org/repos/asf/giraph/blob/67f5f747/giraph-core/src/main/java/org/apache/giraph/io/SimpleVertexWriter.java
----------------------------------------------------------------------
diff --git a/giraph-core/src/main/java/org/apache/giraph/io/SimpleVertexWriter.java b/giraph-core/src/main/java/org/apache/giraph/io/SimpleVertexWriter.java
new file mode 100644
index 0000000..e4c3496
--- /dev/null
+++ b/giraph-core/src/main/java/org/apache/giraph/io/SimpleVertexWriter.java
@@ -0,0 +1,45 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.giraph.io;
+
+import org.apache.giraph.graph.Vertex;
+import org.apache.hadoop.io.Writable;
+import org.apache.hadoop.io.WritableComparable;
+
+import java.io.IOException;
+
+/**
+ * Interface which can only write vertices
+ *
+ * @param <I> Vertex id
+ * @param <V> Vertex value
+ * @param <E> Edge value
+ */
+public interface SimpleVertexWriter<I extends WritableComparable,
+    V extends Writable, E extends Writable> {
+  /**
+   * Writes the next vertex and associated data
+   *
+   * @param vertex set the properties of this vertex
+   * @throws IOException
+   * @throws InterruptedException
+   */
+  void writeVertex(Vertex<I, V, E, ?> vertex) throws IOException,
+      InterruptedException;
+}

http://git-wip-us.apache.org/repos/asf/giraph/blob/67f5f747/giraph-core/src/main/java/org/apache/giraph/io/VertexWriter.java
----------------------------------------------------------------------
diff --git a/giraph-core/src/main/java/org/apache/giraph/io/VertexWriter.java b/giraph-core/src/main/java/org/apache/giraph/io/VertexWriter.java
index 38c5548..2aa3a71 100644
--- a/giraph-core/src/main/java/org/apache/giraph/io/VertexWriter.java
+++ b/giraph-core/src/main/java/org/apache/giraph/io/VertexWriter.java
@@ -20,7 +20,6 @@ package org.apache.giraph.io;
 
 import java.io.IOException;
 
-import org.apache.giraph.graph.Vertex;
 import org.apache.hadoop.io.Writable;
 import org.apache.hadoop.io.WritableComparable;
 import org.apache.hadoop.mapreduce.TaskAttemptContext;
@@ -34,7 +33,7 @@ import org.apache.hadoop.mapreduce.TaskAttemptContext;
  */
 @SuppressWarnings("rawtypes")
 public interface VertexWriter<I extends WritableComparable, V extends Writable,
-    E extends Writable> {
+    E extends Writable> extends SimpleVertexWriter<I, V, E> {
   /**
    * Use the context to setup writing the vertices.
    * Guaranteed to be called prior to any other function.
@@ -47,16 +46,6 @@ public interface VertexWriter<I extends WritableComparable, V extends Writable,
     InterruptedException;
 
   /**
-   * Writes the next vertex and associated data
-   *
-   * @param vertex set the properties of this vertex
-   * @throws IOException
-   * @throws InterruptedException
-   */
-  void writeVertex(Vertex<I, V, E, ?> vertex)
-    throws IOException, InterruptedException;
-
-  /**
    * Close this {@link VertexWriter} to future operations.
    *
    * @param context the context of the task

http://git-wip-us.apache.org/repos/asf/giraph/blob/67f5f747/giraph-core/src/main/java/org/apache/giraph/io/superstep_output/MultiThreadedSuperstepOutput.java
----------------------------------------------------------------------
diff --git a/giraph-core/src/main/java/org/apache/giraph/io/superstep_output/MultiThreadedSuperstepOutput.java b/giraph-core/src/main/java/org/apache/giraph/io/superstep_output/MultiThreadedSuperstepOutput.java
new file mode 100644
index 0000000..a09f915
--- /dev/null
+++ b/giraph-core/src/main/java/org/apache/giraph/io/superstep_output/MultiThreadedSuperstepOutput.java
@@ -0,0 +1,119 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.giraph.io.superstep_output;
+
+import org.apache.giraph.conf.ImmutableClassesGiraphConfiguration;
+import org.apache.giraph.io.SimpleVertexWriter;
+import org.apache.giraph.io.VertexOutputFormat;
+import org.apache.giraph.io.VertexWriter;
+import org.apache.hadoop.io.Writable;
+import org.apache.hadoop.io.WritableComparable;
+import org.apache.hadoop.mapreduce.Mapper;
+
+import com.google.common.collect.Lists;
+import com.google.common.collect.Sets;
+
+import java.io.IOException;
+import java.util.List;
+import java.util.Set;
+
+/**
+ * Class to use as {@link SuperstepOutput} when chosen VertexOutputFormat is
+ * thread-safe.
+ *
+ * @param <I> Vertex id
+ * @param <V> Vertex value
+ * @param <E> Edge value
+ */
+public class MultiThreadedSuperstepOutput<I extends WritableComparable,
+    V extends Writable, E extends Writable> implements
+    SuperstepOutput<I, V, E> {
+  /** Mapper context */
+  private final Mapper<?, ?, ?, ?>.Context context;
+  /** Vertex output format, used to get new vertex writers */
+  private final VertexOutputFormat<I, V, E> vertexOutputFormat;
+  /**
+   * List of returned vertex writers, these can be reused and will all be
+   * closed in the end of the application
+   */
+  private final List<VertexWriter<I, V, E>> availableVertexWriters;
+  /** Vertex writes which were created by this class and are currently used */
+  private final Set<VertexWriter<I, V, E>> occupiedVertexWriters;
+
+  /**
+   * Constructor
+   *
+   * @param conf    Configuration
+   * @param context Mapper context
+   */
+  public MultiThreadedSuperstepOutput(
+      ImmutableClassesGiraphConfiguration<I, V, E, ?> conf,
+      Mapper<?, ?, ?, ?>.Context context) {
+    vertexOutputFormat = conf.createVertexOutputFormat();
+    this.context = context;
+    availableVertexWriters = Lists.newArrayList();
+    occupiedVertexWriters = Sets.newHashSet();
+  }
+
+  @Override
+  public synchronized SimpleVertexWriter<I, V, E> getVertexWriter() {
+    VertexWriter<I, V, E> vertexWriter;
+    if (availableVertexWriters.isEmpty()) {
+      try {
+        vertexWriter = vertexOutputFormat.createVertexWriter(context);
+        vertexWriter.initialize(context);
+      } catch (IOException e) {
+        throw new IllegalStateException("getVertexWriter: " +
+            "IOException occurred", e);
+      } catch (InterruptedException e) {
+        throw new IllegalStateException("getVertexWriter: " +
+            "InterruptedException occurred", e);
+      }
+    } else {
+      vertexWriter =
+          availableVertexWriters.remove(availableVertexWriters.size() - 1);
+    }
+    occupiedVertexWriters.add(vertexWriter);
+    return vertexWriter;
+  }
+
+  @Override
+  public synchronized void returnVertexWriter(
+      SimpleVertexWriter<I, V, E> vertexWriter) {
+    VertexWriter<I, V, E> returnedWriter = (VertexWriter<I, V, E>) vertexWriter;
+    if (!occupiedVertexWriters.remove(returnedWriter)) {
+      throw new IllegalStateException("returnVertexWriter: " +
+          "Returned vertex writer which is not currently occupied!");
+    }
+    availableVertexWriters.add(returnedWriter);
+  }
+
+  @Override
+  public synchronized void postApplication() throws IOException,
+      InterruptedException {
+    if (!occupiedVertexWriters.isEmpty()) {
+      throw new IllegalStateException("postApplication: " +
+          occupiedVertexWriters.size() +
+          " vertex writers were not returned!");
+    }
+    for (VertexWriter<I, V, E> vertexWriter : availableVertexWriters) {
+      vertexWriter.close(context);
+    }
+  }
+}

http://git-wip-us.apache.org/repos/asf/giraph/blob/67f5f747/giraph-core/src/main/java/org/apache/giraph/io/superstep_output/NoOpSuperstepOutput.java
----------------------------------------------------------------------
diff --git a/giraph-core/src/main/java/org/apache/giraph/io/superstep_output/NoOpSuperstepOutput.java b/giraph-core/src/main/java/org/apache/giraph/io/superstep_output/NoOpSuperstepOutput.java
new file mode 100644
index 0000000..82684b2
--- /dev/null
+++ b/giraph-core/src/main/java/org/apache/giraph/io/superstep_output/NoOpSuperstepOutput.java
@@ -0,0 +1,57 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.giraph.io.superstep_output;
+
+import org.apache.giraph.graph.Vertex;
+import org.apache.giraph.io.SimpleVertexWriter;
+import org.apache.hadoop.io.Writable;
+import org.apache.hadoop.io.WritableComparable;
+
+import java.io.IOException;
+
+/**
+ * Class to use as {@link SuperstepOutput} when we don't have output during
+ * computation. All the methods are no-ops.
+ *
+ * @param <I> Vertex id
+ * @param <V> Vertex value
+ * @param <E> Edge value
+ */
+public class NoOpSuperstepOutput<I extends WritableComparable,
+    V extends Writable, E extends Writable> implements
+    SuperstepOutput<I, V, E> {
+  @Override
+  public SimpleVertexWriter<I, V, E> getVertexWriter() {
+    return new SimpleVertexWriter<I, V, E>() {
+      @Override
+      public void writeVertex(Vertex<I, V, E, ?> vertex) throws IOException,
+          InterruptedException {
+      }
+    };
+  }
+
+  @Override
+  public void returnVertexWriter(
+      SimpleVertexWriter<I, V, E> vertexWriter) {
+  }
+
+  @Override
+  public void postApplication() {
+  }
+}

http://git-wip-us.apache.org/repos/asf/giraph/blob/67f5f747/giraph-core/src/main/java/org/apache/giraph/io/superstep_output/SuperstepOutput.java
----------------------------------------------------------------------
diff --git a/giraph-core/src/main/java/org/apache/giraph/io/superstep_output/SuperstepOutput.java b/giraph-core/src/main/java/org/apache/giraph/io/superstep_output/SuperstepOutput.java
new file mode 100644
index 0000000..107039a
--- /dev/null
+++ b/giraph-core/src/main/java/org/apache/giraph/io/superstep_output/SuperstepOutput.java
@@ -0,0 +1,57 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.giraph.io.superstep_output;
+
+import org.apache.giraph.io.SimpleVertexWriter;
+import org.apache.hadoop.io.Writable;
+import org.apache.hadoop.io.WritableComparable;
+
+import java.io.IOException;
+
+/**
+ * Interface for outputing data during the computation.
+ *
+ * @param <I> Vertex id
+ * @param <V> Vertex value
+ * @param <E> Edge value
+ */
+public interface SuperstepOutput<I extends WritableComparable,
+    V extends Writable, E extends Writable> {
+
+  /**
+   * Get the Writer. You have to return it after usage in order for it to be
+   * properly closed.
+   *
+   * @return SimpleVertexWriter
+   */
+  SimpleVertexWriter<I, V, E> getVertexWriter();
+
+  /**
+   * Return the Writer after usage, which you got by calling
+   * {@link #getVertexWriter()}
+   *
+   * @param vertexWriter SimpleVertexWriter which you are returning
+   */
+  void returnVertexWriter(SimpleVertexWriter<I, V, E> vertexWriter);
+
+  /**
+   * Finalize this output in the end of the application
+   */
+  void postApplication() throws IOException, InterruptedException;
+}

http://git-wip-us.apache.org/repos/asf/giraph/blob/67f5f747/giraph-core/src/main/java/org/apache/giraph/io/superstep_output/SynchronizedSuperstepOutput.java
----------------------------------------------------------------------
diff --git a/giraph-core/src/main/java/org/apache/giraph/io/superstep_output/SynchronizedSuperstepOutput.java b/giraph-core/src/main/java/org/apache/giraph/io/superstep_output/SynchronizedSuperstepOutput.java
new file mode 100644
index 0000000..9617b24
--- /dev/null
+++ b/giraph-core/src/main/java/org/apache/giraph/io/superstep_output/SynchronizedSuperstepOutput.java
@@ -0,0 +1,95 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.giraph.io.superstep_output;
+
+import org.apache.giraph.conf.ImmutableClassesGiraphConfiguration;
+import org.apache.giraph.graph.Vertex;
+import org.apache.giraph.io.SimpleVertexWriter;
+import org.apache.giraph.io.VertexWriter;
+import org.apache.hadoop.io.Writable;
+import org.apache.hadoop.io.WritableComparable;
+import org.apache.hadoop.mapreduce.Mapper;
+
+import java.io.IOException;
+
+/**
+ * Class to use as {@link SuperstepOutput} when chosen VertexOutputFormat is
+ * not thread-safe.
+ *
+ * @param <I> Vertex id
+ * @param <V> Vertex value
+ * @param <E> Edge value
+ */
+public class SynchronizedSuperstepOutput<I extends WritableComparable,
+    V extends Writable, E extends Writable> implements
+    SuperstepOutput<I, V, E> {
+  /** Mapper context */
+  private final Mapper<?, ?, ?, ?>.Context context;
+  /** Main vertex writer */
+  private final VertexWriter<I, V, E> vertexWriter;
+  /**
+   * Simple vertex writer, wrapper for {@link #vertexWriter}.
+   * Call to writeVertex is thread-safe.
+   */
+  private final SimpleVertexWriter<I, V, E> simpleVertexWriter;
+
+  /**
+   * Constructor
+   *
+   * @param conf Configuration
+   * @param context Mapper context
+   */
+  public SynchronizedSuperstepOutput(
+      ImmutableClassesGiraphConfiguration<I, V, E, ?> conf,
+      Mapper<?, ?, ?, ?>.Context context) {
+    this.context = context;
+    try {
+      vertexWriter =
+          conf.createVertexOutputFormat().createVertexWriter(context);
+      vertexWriter.initialize(context);
+    } catch (IOException e) {
+      throw new IllegalStateException("SynchronizedSuperstepOutput: " +
+          "IOException occurred", e);
+    } catch (InterruptedException e) {
+      throw new IllegalStateException("SynchronizedSuperstepOutput: " +
+          "InterruptedException occurred", e);
+    }
+    simpleVertexWriter = new SimpleVertexWriter<I, V, E>() {
+      @Override
+      public synchronized void writeVertex(
+          Vertex<I, V, E, ?> vertex) throws IOException, InterruptedException {
+        vertexWriter.writeVertex(vertex);
+      }
+    };
+  }
+
+  @Override
+  public SimpleVertexWriter<I, V, E> getVertexWriter() {
+    return simpleVertexWriter;
+  }
+
+  @Override
+  public void returnVertexWriter(SimpleVertexWriter<I, V, E> vertexWriter) {
+  }
+
+  @Override
+  public void postApplication() throws IOException, InterruptedException {
+    vertexWriter.close(context);
+  }
+}

http://git-wip-us.apache.org/repos/asf/giraph/blob/67f5f747/giraph-core/src/main/java/org/apache/giraph/io/superstep_output/package-info.java
----------------------------------------------------------------------
diff --git a/giraph-core/src/main/java/org/apache/giraph/io/superstep_output/package-info.java b/giraph-core/src/main/java/org/apache/giraph/io/superstep_output/package-info.java
new file mode 100644
index 0000000..6523c74
--- /dev/null
+++ b/giraph-core/src/main/java/org/apache/giraph/io/superstep_output/package-info.java
@@ -0,0 +1,21 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+/**
+ * Classes related to output during computation
+ */
+package org.apache.giraph.io.superstep_output;

http://git-wip-us.apache.org/repos/asf/giraph/blob/67f5f747/giraph-core/src/main/java/org/apache/giraph/worker/BspServiceWorker.java
----------------------------------------------------------------------
diff --git a/giraph-core/src/main/java/org/apache/giraph/worker/BspServiceWorker.java b/giraph-core/src/main/java/org/apache/giraph/worker/BspServiceWorker.java
index 74c1f87..35db999 100644
--- a/giraph-core/src/main/java/org/apache/giraph/worker/BspServiceWorker.java
+++ b/giraph-core/src/main/java/org/apache/giraph/worker/BspServiceWorker.java
@@ -41,6 +41,7 @@ import org.apache.giraph.graph.InputSplitEvents;
 import org.apache.giraph.graph.FinishedSuperstepStats;
 import org.apache.giraph.graph.AddressesAndPartitionsWritable;
 import org.apache.giraph.graph.GlobalStats;
+import org.apache.giraph.io.superstep_output.SuperstepOutput;
 import org.apache.giraph.utils.JMapHistoDumper;
 import org.apache.giraph.graph.Vertex;
 import org.apache.giraph.io.VertexOutputFormat;
@@ -149,6 +150,9 @@ public class BspServiceWorker<I extends WritableComparable,
   /** Handler for aggregators */
   private final WorkerAggregatorHandler aggregatorHandler;
 
+  /** Superstep output */
+  private SuperstepOutput<I, V, E> superstepOutput;
+
   /** array of observers to call back to */
   private final WorkerObserver[] observers;
 
@@ -175,7 +179,7 @@ public class BspServiceWorker<I extends WritableComparable,
     GraphTaskManager<I, V, E, M> graphTaskManager)
     throws IOException, InterruptedException {
     super(serverPortList, sessionMsecTimeout, context, graphTaskManager);
-    ImmutableClassesGiraphConfiguration conf = getConfiguration();
+    ImmutableClassesGiraphConfiguration<I, V, E, M> conf = getConfiguration();
     partitionExchangeChildrenChanged = new PredicateLock(context);
     registerBspEvent(partitionExchangeChildrenChanged);
     workerGraphPartitioner =
@@ -193,6 +197,8 @@ public class BspServiceWorker<I extends WritableComparable,
 
     aggregatorHandler = new WorkerAggregatorHandler(this, conf, context);
 
+    superstepOutput = conf.createSuperstepOutput(context);
+
     if (conf.isJMapHistogramDumpEnabled()) {
       conf.addWorkerObserverClass(JMapHistoDumper.class);
     }
@@ -926,6 +932,14 @@ else[HADOOP_NON_SECURE]*/
           " not specified -- there will be no saved output");
       return;
     }
+    if (getConfiguration().doOutputDuringComputation()) {
+      if (LOG.isInfoEnabled()) {
+        LOG.info("saveVertices: The option for doing output during " +
+            "computation is selected, so there will be no saving of the " +
+            "output in the end of application");
+      }
+      return;
+    }
 
     LoggerUtils.setStatusAndLog(getContext(), LOG, Level.INFO,
         "saveVertices: Starting to save " + numLocalVertices + " vertices");
@@ -1464,4 +1478,9 @@ else[HADOOP_NON_SECURE]*/
       aggregatorHandler.prepareSuperstep(workerAggregatorRequestProcessor);
     }
   }
+
+  @Override
+  public SuperstepOutput<I, V, E> getSuperstepOutput() {
+    return superstepOutput;
+  }
 }