You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@giraph.apache.org by ma...@apache.org on 2013/04/10 20:37:22 UTC

git commit: updated refs/heads/trunk to 621a022

Updated Branches:
  refs/heads/trunk 185b9ffc8 -> 621a022c0


GIRAPH-615: Add support for multithreaded output (majakabiljo)


Project: http://git-wip-us.apache.org/repos/asf/giraph/repo
Commit: http://git-wip-us.apache.org/repos/asf/giraph/commit/621a022c
Tree: http://git-wip-us.apache.org/repos/asf/giraph/tree/621a022c
Diff: http://git-wip-us.apache.org/repos/asf/giraph/diff/621a022c

Branch: refs/heads/trunk
Commit: 621a022c0e910f38d961ec50a29e2686d7ae49a0
Parents: 185b9ff
Author: Maja Kabiljo <ma...@maja-mbp.thefacebook.com>
Authored: Wed Apr 10 11:28:17 2013 -0700
Committer: Maja Kabiljo <ma...@maja-mbp.thefacebook.com>
Committed: Wed Apr 10 11:36:50 2013 -0700

----------------------------------------------------------------------
 CHANGELOG                                          |    2 +
 .../apache/giraph/conf/GiraphConfiguration.java    |   24 +++
 .../org/apache/giraph/conf/GiraphConstants.java    |    3 +
 .../java/org/apache/giraph/edge/EdgeStore.java     |   86 +++++------
 .../org/apache/giraph/graph/ComputeCallable.java   |    3 +-
 .../org/apache/giraph/graph/GraphTaskManager.java  |   70 ++++-----
 .../org/apache/giraph/utils/CallableFactory.java   |   36 +++++
 .../org/apache/giraph/utils/ProgressableUtils.java |   39 +++++
 .../org/apache/giraph/worker/BspServiceWorker.java |  122 +++++++--------
 .../worker/EdgeInputSplitsCallableFactory.java     |    4 +-
 .../giraph/worker/InputSplitsCallableFactory.java  |   41 -----
 .../worker/VertexInputSplitsCallableFactory.java   |    4 +-
 12 files changed, 240 insertions(+), 194 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/giraph/blob/621a022c/CHANGELOG
----------------------------------------------------------------------
diff --git a/CHANGELOG b/CHANGELOG
index e79c763..d9c88ec 100644
--- a/CHANGELOG
+++ b/CHANGELOG
@@ -1,6 +1,8 @@
 Giraph Change Log
 
 Release 0.2.0 - unreleased
+  GIRAPH-615: Add support for multithreaded output (majakabiljo)
+
   GIRAPH-612: Improve website for upcoming release (aching)
 
   GIRAPH-527: readVertexInputSplit is always reporting 0 vertices and 0 edges (nitay)

http://git-wip-us.apache.org/repos/asf/giraph/blob/621a022c/giraph-core/src/main/java/org/apache/giraph/conf/GiraphConfiguration.java
----------------------------------------------------------------------
diff --git a/giraph-core/src/main/java/org/apache/giraph/conf/GiraphConfiguration.java b/giraph-core/src/main/java/org/apache/giraph/conf/GiraphConfiguration.java
index 90b05e3..01f22da 100644
--- a/giraph-core/src/main/java/org/apache/giraph/conf/GiraphConfiguration.java
+++ b/giraph-core/src/main/java/org/apache/giraph/conf/GiraphConfiguration.java
@@ -302,6 +302,30 @@ public class GiraphConfiguration extends Configuration
   }
 
   /**
+   * Get the number of threads to use for writing output in the end of the
+   * application. If output format is not thread safe, returns 1.
+   *
+   * @return Number of output threads
+   */
+  public final int getNumOutputThreads() {
+    if (!vertexOutputFormatThreadSafe()) {
+      return 1;
+    } else {
+      return NUM_OUTPUT_THREADS.get(this);
+    }
+  }
+
+  /**
+   * Set the number of threads to use for writing output in the end of the
+   * application. Will be used only if {#vertexOutputFormatThreadSafe} is true.
+   *
+   * @param numOutputThreads Number of output threads
+   */
+  public void setNumOutputThreads(int numOutputThreads) {
+    NUM_OUTPUT_THREADS.set(this, numOutputThreads);
+  }
+
+  /**
    * Set the vertex combiner class (optional)
    *
    * @param vertexCombinerClass Determines how vertex messages are combined

http://git-wip-us.apache.org/repos/asf/giraph/blob/621a022c/giraph-core/src/main/java/org/apache/giraph/conf/GiraphConstants.java
----------------------------------------------------------------------
diff --git a/giraph-core/src/main/java/org/apache/giraph/conf/GiraphConstants.java b/giraph-core/src/main/java/org/apache/giraph/conf/GiraphConstants.java
index 95c9862..21e094d 100644
--- a/giraph-core/src/main/java/org/apache/giraph/conf/GiraphConstants.java
+++ b/giraph-core/src/main/java/org/apache/giraph/conf/GiraphConstants.java
@@ -140,6 +140,9 @@ public interface GiraphConstants {
    */
   BooleanConfOption VERTEX_OUTPUT_FORMAT_THREAD_SAFE =
       new BooleanConfOption("giraph.vertexOutputFormatThreadSafe", false);
+  /** Number of threads for writing output in the end of the application */
+  IntConfOption NUM_OUTPUT_THREADS =
+      new IntConfOption("giraph.numOutputThreads", 1);
 
   /** conf key for comma-separated list of jars to export to YARN workers */
   StrConfOption GIRAPH_YARN_LIBJARS =

http://git-wip-us.apache.org/repos/asf/giraph/blob/621a022c/giraph-core/src/main/java/org/apache/giraph/edge/EdgeStore.java
----------------------------------------------------------------------
diff --git a/giraph-core/src/main/java/org/apache/giraph/edge/EdgeStore.java b/giraph-core/src/main/java/org/apache/giraph/edge/EdgeStore.java
index 01a67dd..e8cb620 100644
--- a/giraph-core/src/main/java/org/apache/giraph/edge/EdgeStore.java
+++ b/giraph-core/src/main/java/org/apache/giraph/edge/EdgeStore.java
@@ -19,19 +19,16 @@
 package org.apache.giraph.edge;
 
 import com.google.common.collect.MapMaker;
-import com.google.common.util.concurrent.ThreadFactoryBuilder;
 import java.util.concurrent.ArrayBlockingQueue;
 import java.util.concurrent.BlockingQueue;
 import java.util.concurrent.Callable;
 import java.util.concurrent.ConcurrentMap;
-import java.util.concurrent.ExecutorService;
-import java.util.concurrent.Executors;
 import org.apache.giraph.bsp.CentralizedServiceWorker;
 import org.apache.giraph.conf.ImmutableClassesGiraphConfiguration;
 import org.apache.giraph.graph.Vertex;
 import org.apache.giraph.partition.Partition;
 import org.apache.giraph.utils.ByteArrayVertexIdEdges;
-import org.apache.giraph.utils.LogStacktraceCallable;
+import org.apache.giraph.utils.CallableFactory;
 import org.apache.giraph.utils.ProgressableUtils;
 import org.apache.hadoop.io.Writable;
 import org.apache.hadoop.io.WritableComparable;
@@ -178,53 +175,50 @@ public class EdgeStore<I extends WritableComparable,
         new ArrayBlockingQueue<Integer>(transientEdges.size());
     partitionIdQueue.addAll(transientEdges.keySet());
     int numThreads = configuration.getNumInputSplitsThreads();
-    ExecutorService movePartitionExecutor =
-        Executors.newFixedThreadPool(numThreads,
-            new ThreadFactoryBuilder().setNameFormat("move-edges-%d").build());
 
-    for (int i = 0; i < numThreads; ++i) {
-      Callable moveCallable = new Callable<Void>() {
-        @Override
-        public Void call() throws Exception {
-          Integer partitionId;
-          while ((partitionId = partitionIdQueue.poll()) != null) {
-            Partition<I, V, E, M> partition =
-                service.getPartitionStore().getPartition(partitionId);
-            ConcurrentMap<I, VertexEdges<I, E>> partitionEdges =
-                transientEdges.remove(partitionId);
-            for (I vertexId : partitionEdges.keySet()) {
-              VertexEdges<I, E> vertexEdges = convertInputToComputeEdges(
-                  partitionEdges.remove(vertexId));
-              Vertex<I, V, E, M> vertex = partition.getVertex(vertexId);
-              // If the source vertex doesn't exist, create it. Otherwise,
-              // just set the edges.
-              if (vertex == null) {
-                vertex = configuration.createVertex();
-                vertex.initialize(vertexId, configuration.createVertexValue(),
-                    vertexEdges);
-                partition.putVertex(vertex);
-              } else {
-                vertex.setEdges(vertexEdges);
-                // Some Partition implementations (e.g. ByteArrayPartition)
-                // require us to put back the vertex after modifying it.
-                partition.saveVertex(vertex);
+    CallableFactory<Void> callableFactory = new CallableFactory<Void>() {
+      @Override
+      public Callable<Void> newCallable(int callableId) {
+        return new Callable<Void>() {
+          @Override
+          public Void call() throws Exception {
+            Integer partitionId;
+            while ((partitionId = partitionIdQueue.poll()) != null) {
+              Partition<I, V, E, M> partition =
+                  service.getPartitionStore().getPartition(partitionId);
+              ConcurrentMap<I, VertexEdges<I, E>> partitionEdges =
+                  transientEdges.remove(partitionId);
+              for (I vertexId : partitionEdges.keySet()) {
+                VertexEdges<I, E> vertexEdges = convertInputToComputeEdges(
+                    partitionEdges.remove(vertexId));
+                Vertex<I, V, E, M> vertex = partition.getVertex(vertexId);
+                // If the source vertex doesn't exist, create it. Otherwise,
+                // just set the edges.
+                if (vertex == null) {
+                  vertex = configuration.createVertex();
+                  vertex.initialize(vertexId, configuration.createVertexValue(),
+                      vertexEdges);
+                  partition.putVertex(vertex);
+                } else {
+                  vertex.setEdges(vertexEdges);
+                  // Some Partition implementations (e.g. ByteArrayPartition)
+                  // require us to put back the vertex after modifying it.
+                  partition.saveVertex(vertex);
+                }
               }
+              // Some PartitionStore implementations
+              // (e.g. DiskBackedPartitionStore) require us to put back the
+              // partition after modifying it.
+              service.getPartitionStore().putPartition(partition);
             }
-            // Some PartitionStore implementations
-            // (e.g. DiskBackedPartitionStore) require us to put back the
-            // partition after modifying it.
-            service.getPartitionStore().putPartition(partition);
+            return null;
           }
-          return null;
-        }
-      };
-      movePartitionExecutor.submit(
-          new LogStacktraceCallable<Void>(moveCallable));
-    }
+        };
+      }
+    };
+    ProgressableUtils.getResultsWithNCallables(callableFactory, numThreads,
+        "move-edges-%d", progressable);
 
-    movePartitionExecutor.shutdown();
-    ProgressableUtils.awaitExecutorTermination(movePartitionExecutor,
-        progressable);
     transientEdges.clear();
 
     if (LOG.isInfoEnabled()) {

http://git-wip-us.apache.org/repos/asf/giraph/blob/621a022c/giraph-core/src/main/java/org/apache/giraph/graph/ComputeCallable.java
----------------------------------------------------------------------
diff --git a/giraph-core/src/main/java/org/apache/giraph/graph/ComputeCallable.java b/giraph-core/src/main/java/org/apache/giraph/graph/ComputeCallable.java
index 51ed4f6..0fc5fdf 100644
--- a/giraph-core/src/main/java/org/apache/giraph/graph/ComputeCallable.java
+++ b/giraph-core/src/main/java/org/apache/giraph/graph/ComputeCallable.java
@@ -68,7 +68,8 @@ import java.util.concurrent.Callable;
  * @param <M> Message data
  */
 public class ComputeCallable<I extends WritableComparable, V extends Writable,
-    E extends Writable, M extends Writable> implements Callable {
+    E extends Writable, M extends Writable>
+    implements Callable<Collection<PartitionStats>> {
   /** Class logger */
   private static final Logger LOG  = Logger.getLogger(ComputeCallable.class);
   /** Class time object */

http://git-wip-us.apache.org/repos/asf/giraph/blob/621a022c/giraph-core/src/main/java/org/apache/giraph/graph/GraphTaskManager.java
----------------------------------------------------------------------
diff --git a/giraph-core/src/main/java/org/apache/giraph/graph/GraphTaskManager.java b/giraph-core/src/main/java/org/apache/giraph/graph/GraphTaskManager.java
index abca4c4..97cf55d 100644
--- a/giraph-core/src/main/java/org/apache/giraph/graph/GraphTaskManager.java
+++ b/giraph-core/src/main/java/org/apache/giraph/graph/GraphTaskManager.java
@@ -37,7 +37,7 @@ import org.apache.giraph.partition.PartitionOwner;
 import org.apache.giraph.partition.PartitionStats;
 import org.apache.giraph.time.SystemTime;
 import org.apache.giraph.time.Time;
-import org.apache.giraph.utils.LogStacktraceCallable;
+import org.apache.giraph.utils.CallableFactory;
 import org.apache.giraph.utils.MemoryUtils;
 import org.apache.giraph.utils.ProgressableUtils;
 import org.apache.giraph.utils.ReflectionUtils;
@@ -56,9 +56,6 @@ import org.apache.log4j.Level;
 import org.apache.log4j.Logger;
 import org.apache.log4j.PatternLayout;
 
-import com.google.common.collect.Lists;
-import com.google.common.util.concurrent.ThreadFactoryBuilder;
-
 import java.io.IOException;
 import java.lang.reflect.Type;
 import java.net.URL;
@@ -69,9 +66,7 @@ import java.util.Enumeration;
 import java.util.List;
 import java.util.concurrent.ArrayBlockingQueue;
 import java.util.concurrent.BlockingQueue;
-import java.util.concurrent.ExecutorService;
-import java.util.concurrent.Executors;
-import java.util.concurrent.Future;
+import java.util.concurrent.Callable;
 import java.util.concurrent.TimeUnit;
 
 import static org.apache.giraph.conf.GiraphConstants.EDGE_VALUE_CLASS;
@@ -725,13 +720,13 @@ public class GraphTaskManager<I extends WritableComparable, V extends Writable,
    * @param numPartitions the number of data partitions (vertices) to process
    * @param numThreads number of concurrent threads to do processing
    */
-  private void processGraphPartitions(Mapper<?, ?, ?, ?>.Context context,
-    List<PartitionStats> partitionStatsList, GraphState<I, V, E, M> graphState,
-    MessageStoreByPartition<I, M> messageStore, int numPartitions,
-    int numThreads) {
-    List<Future<Collection<PartitionStats>>> partitionFutures =
-      Lists.newArrayListWithCapacity(numPartitions);
-    BlockingQueue<Integer> computePartitionIdQueue =
+  private void processGraphPartitions(final Mapper<?, ?, ?, ?>.Context context,
+      List<PartitionStats> partitionStatsList,
+      final GraphState<I, V, E, M> graphState,
+      final MessageStoreByPartition<I, M> messageStore,
+      int numPartitions,
+      int numThreads) {
+    final BlockingQueue<Integer> computePartitionIdQueue =
       new ArrayBlockingQueue<Integer>(numPartitions);
     for (Integer partitionId :
       serviceWorker.getPartitionStore().getPartitionIds()) {
@@ -741,32 +736,27 @@ public class GraphTaskManager<I extends WritableComparable, V extends Writable,
     GiraphTimerContext computeAllTimerContext = computeAll.time();
     timeToFirstMessageTimerContext = timeToFirstMessage.time();
 
-    ExecutorService partitionExecutor =
-      Executors.newFixedThreadPool(numThreads,
-        new ThreadFactoryBuilder().setNameFormat("compute-%d").build());
-    for (int i = 0; i < numThreads; ++i) {
-      ComputeCallable<I, V, E, M> computeCallable =
-        new ComputeCallable<I, V, E, M>(
-          context,
-          graphState,
-          messageStore,
-          computePartitionIdQueue,
-          conf,
-          serviceWorker);
-      LogStacktraceCallable<Collection<PartitionStats>> wrapped =
-          new LogStacktraceCallable<Collection<PartitionStats>>(
-              computeCallable);
-      partitionFutures.add(partitionExecutor.submit(wrapped));
-    }
-
-    // Wait until all the threads are done to wait on all requests
-    for (Future<Collection<PartitionStats>> partitionFuture :
-      partitionFutures) {
-      Collection<PartitionStats> stats =
-        ProgressableUtils.getFutureResult(partitionFuture, context);
-      partitionStatsList.addAll(stats);
-    }
-    partitionExecutor.shutdown();
+    CallableFactory<Collection<PartitionStats>> callableFactory =
+        new CallableFactory<Collection<PartitionStats>>() {
+          @Override
+          public Callable<Collection<PartitionStats>> newCallable(
+              int callableId) {
+            return new ComputeCallable<I, V, E, M>(
+                context,
+                graphState,
+                messageStore,
+                computePartitionIdQueue,
+                conf,
+                serviceWorker);
+          }
+        };
+    List<Collection<PartitionStats>> results =
+        ProgressableUtils.getResultsWithNCallables(callableFactory,
+            numThreads, "compute-%d", context);
+    for (Collection<PartitionStats> result : results) {
+      partitionStatsList.addAll(result);
+    }
+
     computeAllTimerContext.stop();
   }
 

http://git-wip-us.apache.org/repos/asf/giraph/blob/621a022c/giraph-core/src/main/java/org/apache/giraph/utils/CallableFactory.java
----------------------------------------------------------------------
diff --git a/giraph-core/src/main/java/org/apache/giraph/utils/CallableFactory.java b/giraph-core/src/main/java/org/apache/giraph/utils/CallableFactory.java
new file mode 100644
index 0000000..72d5b15
--- /dev/null
+++ b/giraph-core/src/main/java/org/apache/giraph/utils/CallableFactory.java
@@ -0,0 +1,36 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.giraph.utils;
+
+import java.util.concurrent.Callable;
+
+/**
+ * Factory for creating {@link Callable}s
+ *
+ * @param <R> Callable result type
+ */
+public interface CallableFactory<R> {
+  /**
+   * Create new callable
+   *
+   * @param callableId Id of the callable
+   * @return Callable
+   */
+  Callable<R> newCallable(int callableId);
+}

http://git-wip-us.apache.org/repos/asf/giraph/blob/621a022c/giraph-core/src/main/java/org/apache/giraph/utils/ProgressableUtils.java
----------------------------------------------------------------------
diff --git a/giraph-core/src/main/java/org/apache/giraph/utils/ProgressableUtils.java b/giraph-core/src/main/java/org/apache/giraph/utils/ProgressableUtils.java
index 77eb49a..3b06604 100644
--- a/giraph-core/src/main/java/org/apache/giraph/utils/ProgressableUtils.java
+++ b/giraph-core/src/main/java/org/apache/giraph/utils/ProgressableUtils.java
@@ -23,8 +23,14 @@ import org.apache.log4j.Logger;
 import org.jboss.netty.channel.ChannelFuture;
 import org.jboss.netty.channel.group.ChannelGroupFuture;
 
+import com.google.common.collect.Lists;
+import com.google.common.util.concurrent.ThreadFactoryBuilder;
+
+import java.util.List;
+import java.util.concurrent.Callable;
 import java.util.concurrent.ExecutionException;
 import java.util.concurrent.ExecutorService;
+import java.util.concurrent.Executors;
 import java.util.concurrent.Future;
 import java.util.concurrent.TimeUnit;
 import java.util.concurrent.TimeoutException;
@@ -157,6 +163,39 @@ public class ProgressableUtils {
   }
 
   /**
+   * Create {#link numThreads} callables from {#link callableFactory},
+   * execute them and gather results.
+   *
+   * @param callableFactory Factory for Callables
+   * @param numThreads Number of threads to use
+   * @param threadNameFormat Format for thread name
+   * @param progressable Progressable for reporting progress
+   * @param <R> Type of Callable's results
+   * @return List of results from Callables
+   */
+  public static <R> List<R> getResultsWithNCallables(
+      CallableFactory<R> callableFactory, int numThreads,
+      String threadNameFormat, Progressable progressable) {
+    ExecutorService executorService =
+        Executors.newFixedThreadPool(numThreads,
+            new ThreadFactoryBuilder().setNameFormat(threadNameFormat).build());
+    List<Future<R>> futures = Lists.newArrayListWithCapacity(numThreads);
+    for (int i = 0; i < numThreads; i++) {
+      Callable<R> callable = callableFactory.newCallable(i);
+      Future<R> future = executorService.submit(
+          new LogStacktraceCallable<R>(callable));
+      futures.add(future);
+    }
+    executorService.shutdown();
+    List<R> futureResults = Lists.newArrayListWithCapacity(numThreads);
+    for (Future<R> future : futures) {
+      R result = ProgressableUtils.getFutureResult(future, progressable);
+      futureResults.add(result);
+    }
+    return futureResults;
+  }
+
+  /**
    * Interface for waiting on a result from some operation.
    *
    * @param <T> Result type.

http://git-wip-us.apache.org/repos/asf/giraph/blob/621a022c/giraph-core/src/main/java/org/apache/giraph/worker/BspServiceWorker.java
----------------------------------------------------------------------
diff --git a/giraph-core/src/main/java/org/apache/giraph/worker/BspServiceWorker.java b/giraph-core/src/main/java/org/apache/giraph/worker/BspServiceWorker.java
index c20d06e..037cdfc 100644
--- a/giraph-core/src/main/java/org/apache/giraph/worker/BspServiceWorker.java
+++ b/giraph-core/src/main/java/org/apache/giraph/worker/BspServiceWorker.java
@@ -42,7 +42,7 @@ import org.apache.giraph.graph.FinishedSuperstepStats;
 import org.apache.giraph.graph.AddressesAndPartitionsWritable;
 import org.apache.giraph.graph.GlobalStats;
 import org.apache.giraph.io.superstep_output.SuperstepOutput;
-import org.apache.giraph.utils.LogStacktraceCallable;
+import org.apache.giraph.utils.CallableFactory;
 import org.apache.giraph.utils.JMapHistoDumper;
 import org.apache.giraph.graph.Vertex;
 import org.apache.giraph.io.VertexOutputFormat;
@@ -85,7 +85,6 @@ import org.json.JSONException;
 import org.json.JSONObject;
 
 import com.google.common.collect.Lists;
-import com.google.common.util.concurrent.ThreadFactoryBuilder;
 import net.iharder.Base64;
 
 import java.io.ByteArrayOutputStream;
@@ -102,9 +101,6 @@ import java.util.Map;
 import java.util.Map.Entry;
 import java.util.Set;
 import java.util.concurrent.Callable;
-import java.util.concurrent.ExecutorService;
-import java.util.concurrent.Executors;
-import java.util.concurrent.Future;
 import java.util.concurrent.TimeUnit;
 
 /**
@@ -263,7 +259,7 @@ public class BspServiceWorker<I extends WritableComparable,
    */
   private VertexEdgeCount loadInputSplits(
       List<String> inputSplitPathList,
-      InputSplitsCallableFactory<I, V, E, M> inputSplitsCallableFactory)
+      CallableFactory<VertexEdgeCount> inputSplitsCallableFactory)
     throws KeeperException, InterruptedException {
     VertexEdgeCount vertexEdgeCount = new VertexEdgeCount();
     // Determine how many threads to use based on the number of input splits
@@ -271,35 +267,20 @@ public class BspServiceWorker<I extends WritableComparable,
         getConfiguration().getMaxWorkers() + 1;
     int numThreads = Math.min(getConfiguration().getNumInputSplitsThreads(),
         maxInputSplitThreads);
-    ExecutorService inputSplitsExecutor =
-        Executors.newFixedThreadPool(numThreads,
-            new ThreadFactoryBuilder().setNameFormat("load-%d").build());
-    List<Future<VertexEdgeCount>> threadsFutures =
-        Lists.newArrayListWithCapacity(numThreads);
     if (LOG.isInfoEnabled()) {
       LOG.info("loadInputSplits: Using " + numThreads + " thread(s), " +
           "originally " + getConfiguration().getNumInputSplitsThreads() +
           " threads(s) for " + inputSplitPathList.size() + " total splits.");
     }
-    for (int i = 0; i < numThreads; ++i) {
-      Callable<VertexEdgeCount> inputSplitsCallable =
-          inputSplitsCallableFactory.newCallable(i);
-      LogStacktraceCallable<VertexEdgeCount> wrapped =
-          new LogStacktraceCallable<VertexEdgeCount>(
-              inputSplitsCallable);
-      threadsFutures.add(inputSplitsExecutor.submit(wrapped));
-    }
 
-    // Wait until all the threads are done to wait on all requests
-    for (Future<VertexEdgeCount> threadFuture : threadsFutures) {
-      VertexEdgeCount threadVertexEdgeCount =
-          ProgressableUtils.getFutureResult(threadFuture, getContext());
-      vertexEdgeCount =
-          vertexEdgeCount.incrVertexEdgeCount(threadVertexEdgeCount);
+    List<VertexEdgeCount> results =
+        ProgressableUtils.getResultsWithNCallables(inputSplitsCallableFactory,
+            numThreads, "load-%d", getContext());
+    for (VertexEdgeCount result : results) {
+      vertexEdgeCount = vertexEdgeCount.incrVertexEdgeCount(result);
     }
 
     workerClient.waitAllRequests();
-    inputSplitsExecutor.shutdown();
     return vertexEdgeCount;
   }
 
@@ -946,47 +927,60 @@ else[HADOOP_NON_SECURE]*/
       return;
     }
 
+    int numThreads = Math.min(getConfiguration().getNumOutputThreads(),
+        getPartitionStore().getNumPartitions());
     LoggerUtils.setStatusAndLog(getContext(), LOG, Level.INFO,
-        "saveVertices: Starting to save " + numLocalVertices + " vertices");
-    VertexOutputFormat<I, V, E> vertexOutputFormat =
+        "saveVertices: Starting to save " + numLocalVertices + " vertices " +
+            "using " + numThreads + " threads");
+    final VertexOutputFormat<I, V, E> vertexOutputFormat =
         getConfiguration().createVertexOutputFormat();
-    VertexWriter<I, V, E> vertexWriter =
-        vertexOutputFormat.createVertexWriter(getContext());
-    vertexWriter.setConf(
-        (ImmutableClassesGiraphConfiguration<I, V, E, Writable>)
-            getConfiguration());
-    vertexWriter.initialize(getContext());
-    long verticesWritten = 0;
-    long nextPrintVertices = 0;
-    long nextPrintMsecs = System.currentTimeMillis() + 15000;
-    int partitionIndex = 0;
-    int numPartitions = getPartitionStore().getNumPartitions();
-    for (Integer partitionId : getPartitionStore().getPartitionIds()) {
-      Partition<I, V, E, M> partition =
-          getPartitionStore().getPartition(partitionId);
-      for (Vertex<I, V, E, M> vertex : partition) {
-        getContext().progress();
-        vertexWriter.writeVertex(vertex);
-        ++verticesWritten;
-
-        // Update status at most every 250k vertices or 15 seconds
-        if (verticesWritten > nextPrintVertices &&
-            System.currentTimeMillis() > nextPrintMsecs) {
-          LoggerUtils.setStatusAndLog(
-              getContext(), LOG, Level.INFO,
-              "saveVertices: Saved " +
-                  verticesWritten + " out of " + numLocalVertices +
-                  " vertices, on partition " + partitionIndex + " out of " +
-                  numPartitions);
-          nextPrintMsecs = System.currentTimeMillis() + 15000;
-          nextPrintVertices = verticesWritten + 250000;
-        }
+    CallableFactory<Void> callableFactory = new CallableFactory<Void>() {
+      @Override
+      public Callable<Void> newCallable(int callableId) {
+        return new Callable<Void>() {
+          @Override
+          public Void call() throws Exception {
+            VertexWriter<I, V, E> vertexWriter =
+                vertexOutputFormat.createVertexWriter(getContext());
+            vertexWriter.setConf(
+                (ImmutableClassesGiraphConfiguration<I, V, E, Writable>)
+                    getConfiguration());
+            vertexWriter.initialize(getContext());
+            long verticesWritten = 0;
+            long nextPrintVertices = 0;
+            long nextPrintMsecs = System.currentTimeMillis() + 15000;
+            int partitionIndex = 0;
+            int numPartitions = getPartitionStore().getNumPartitions();
+            for (Integer partitionId : getPartitionStore().getPartitionIds()) {
+              Partition<I, V, E, M> partition =
+                  getPartitionStore().getPartition(partitionId);
+              for (Vertex<I, V, E, M> vertex : partition) {
+                vertexWriter.writeVertex(vertex);
+                ++verticesWritten;
+
+                // Update status at most every 250k vertices or 15 seconds
+                if (verticesWritten > nextPrintVertices &&
+                    System.currentTimeMillis() > nextPrintMsecs) {
+                  LoggerUtils.setStatusAndLog(getContext(), LOG, Level.INFO,
+                      "saveVertices: Saved " + verticesWritten + " out of " +
+                          partition.getVertexCount() + " partition vertices, " +
+                          "on partition " + partitionIndex +
+                          " out of " + numPartitions);
+                  nextPrintMsecs = System.currentTimeMillis() + 15000;
+                  nextPrintVertices = verticesWritten + 250000;
+                }
+              }
+              ++partitionIndex;
+            }
+            vertexWriter.close(getContext()); // the temp results are saved now
+            return null;
+          }
+        };
       }
-      getPartitionStore().putPartition(partition);
-      getContext().progress();
-      ++partitionIndex;
-    }
-    vertexWriter.close(getContext()); // the temp results are saved now
+    };
+    ProgressableUtils.getResultsWithNCallables(callableFactory, numThreads,
+        "save-vertices-%d", getContext());
+
     LoggerUtils.setStatusAndLog(getContext(), LOG, Level.INFO,
       "saveVertices: Done saving vertices.");
     // YARN: must complete the commit the "task" output, Hadoop isn't there.

http://git-wip-us.apache.org/repos/asf/giraph/blob/621a022c/giraph-core/src/main/java/org/apache/giraph/worker/EdgeInputSplitsCallableFactory.java
----------------------------------------------------------------------
diff --git a/giraph-core/src/main/java/org/apache/giraph/worker/EdgeInputSplitsCallableFactory.java b/giraph-core/src/main/java/org/apache/giraph/worker/EdgeInputSplitsCallableFactory.java
index 9297ac1..4a1705b 100644
--- a/giraph-core/src/main/java/org/apache/giraph/worker/EdgeInputSplitsCallableFactory.java
+++ b/giraph-core/src/main/java/org/apache/giraph/worker/EdgeInputSplitsCallableFactory.java
@@ -20,6 +20,8 @@ package org.apache.giraph.worker;
 
 import org.apache.giraph.conf.ImmutableClassesGiraphConfiguration;
 import org.apache.giraph.graph.GraphState;
+import org.apache.giraph.graph.VertexEdgeCount;
+import org.apache.giraph.utils.CallableFactory;
 import org.apache.giraph.zk.ZooKeeperExt;
 import org.apache.hadoop.io.Writable;
 import org.apache.hadoop.io.WritableComparable;
@@ -35,7 +37,7 @@ import org.apache.hadoop.mapreduce.Mapper;
  */
 public class EdgeInputSplitsCallableFactory<I extends WritableComparable,
     V extends Writable, E extends Writable, M extends Writable>
-    implements InputSplitsCallableFactory<I, V, E, M> {
+    implements CallableFactory<VertexEdgeCount> {
   /** Mapper context. */
   private final Mapper<?, ?, ?, ?>.Context context;
   /** Graph state. */

http://git-wip-us.apache.org/repos/asf/giraph/blob/621a022c/giraph-core/src/main/java/org/apache/giraph/worker/InputSplitsCallableFactory.java
----------------------------------------------------------------------
diff --git a/giraph-core/src/main/java/org/apache/giraph/worker/InputSplitsCallableFactory.java b/giraph-core/src/main/java/org/apache/giraph/worker/InputSplitsCallableFactory.java
deleted file mode 100644
index cdc6543..0000000
--- a/giraph-core/src/main/java/org/apache/giraph/worker/InputSplitsCallableFactory.java
+++ /dev/null
@@ -1,41 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.giraph.worker;
-
-import org.apache.hadoop.io.Writable;
-import org.apache.hadoop.io.WritableComparable;
-
-/**
- * Factory class for creating {@link InputSplitsCallable}s.
- *
- * @param <I> Vertex id
- * @param <V> Vertex value
- * @param <E> Edge value
- * @param <M> Message data
- */
-public interface InputSplitsCallableFactory<I extends WritableComparable,
-    V extends Writable, E extends Writable, M extends Writable> {
-  /**
-   * Return a newly-created {@link InputSplitsCallable}.
-   *
-   * @param threadId Id of input split thread
-   * @return A new {@link InputSplitsCallable}
-   */
-  InputSplitsCallable<I, V, E, M> newCallable(int threadId);
-}

http://git-wip-us.apache.org/repos/asf/giraph/blob/621a022c/giraph-core/src/main/java/org/apache/giraph/worker/VertexInputSplitsCallableFactory.java
----------------------------------------------------------------------
diff --git a/giraph-core/src/main/java/org/apache/giraph/worker/VertexInputSplitsCallableFactory.java b/giraph-core/src/main/java/org/apache/giraph/worker/VertexInputSplitsCallableFactory.java
index aebca81..4eff3b8 100644
--- a/giraph-core/src/main/java/org/apache/giraph/worker/VertexInputSplitsCallableFactory.java
+++ b/giraph-core/src/main/java/org/apache/giraph/worker/VertexInputSplitsCallableFactory.java
@@ -20,6 +20,8 @@ package org.apache.giraph.worker;
 
 import org.apache.giraph.conf.ImmutableClassesGiraphConfiguration;
 import org.apache.giraph.graph.GraphState;
+import org.apache.giraph.graph.VertexEdgeCount;
+import org.apache.giraph.utils.CallableFactory;
 import org.apache.giraph.zk.ZooKeeperExt;
 import org.apache.hadoop.io.Writable;
 import org.apache.hadoop.io.WritableComparable;
@@ -35,7 +37,7 @@ import org.apache.hadoop.mapreduce.Mapper;
  */
 public class VertexInputSplitsCallableFactory<I extends WritableComparable,
     V extends Writable, E extends Writable, M extends Writable>
-    implements InputSplitsCallableFactory<I, V, E, M> {
+    implements CallableFactory<VertexEdgeCount> {
   /** Mapper context. */
   private final Mapper<?, ?, ?, ?>.Context context;
   /** Graph state. */