You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@giraph.apache.org by ma...@apache.org on 2013/04/10 20:37:22 UTC
git commit: updated refs/heads/trunk to 621a022
Updated Branches:
refs/heads/trunk 185b9ffc8 -> 621a022c0
GIRAPH-615: Add support for multithreaded output (majakabiljo)
Project: http://git-wip-us.apache.org/repos/asf/giraph/repo
Commit: http://git-wip-us.apache.org/repos/asf/giraph/commit/621a022c
Tree: http://git-wip-us.apache.org/repos/asf/giraph/tree/621a022c
Diff: http://git-wip-us.apache.org/repos/asf/giraph/diff/621a022c
Branch: refs/heads/trunk
Commit: 621a022c0e910f38d961ec50a29e2686d7ae49a0
Parents: 185b9ff
Author: Maja Kabiljo <ma...@maja-mbp.thefacebook.com>
Authored: Wed Apr 10 11:28:17 2013 -0700
Committer: Maja Kabiljo <ma...@maja-mbp.thefacebook.com>
Committed: Wed Apr 10 11:36:50 2013 -0700
----------------------------------------------------------------------
CHANGELOG | 2 +
.../apache/giraph/conf/GiraphConfiguration.java | 24 +++
.../org/apache/giraph/conf/GiraphConstants.java | 3 +
.../java/org/apache/giraph/edge/EdgeStore.java | 86 +++++------
.../org/apache/giraph/graph/ComputeCallable.java | 3 +-
.../org/apache/giraph/graph/GraphTaskManager.java | 70 ++++-----
.../org/apache/giraph/utils/CallableFactory.java | 36 +++++
.../org/apache/giraph/utils/ProgressableUtils.java | 39 +++++
.../org/apache/giraph/worker/BspServiceWorker.java | 122 +++++++--------
.../worker/EdgeInputSplitsCallableFactory.java | 4 +-
.../giraph/worker/InputSplitsCallableFactory.java | 41 -----
.../worker/VertexInputSplitsCallableFactory.java | 4 +-
12 files changed, 240 insertions(+), 194 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/giraph/blob/621a022c/CHANGELOG
----------------------------------------------------------------------
diff --git a/CHANGELOG b/CHANGELOG
index e79c763..d9c88ec 100644
--- a/CHANGELOG
+++ b/CHANGELOG
@@ -1,6 +1,8 @@
Giraph Change Log
Release 0.2.0 - unreleased
+ GIRAPH-615: Add support for multithreaded output (majakabiljo)
+
GIRAPH-612: Improve website for upcoming release (aching)
GIRAPH-527: readVertexInputSplit is always reporting 0 vertices and 0 edges (nitay)
http://git-wip-us.apache.org/repos/asf/giraph/blob/621a022c/giraph-core/src/main/java/org/apache/giraph/conf/GiraphConfiguration.java
----------------------------------------------------------------------
diff --git a/giraph-core/src/main/java/org/apache/giraph/conf/GiraphConfiguration.java b/giraph-core/src/main/java/org/apache/giraph/conf/GiraphConfiguration.java
index 90b05e3..01f22da 100644
--- a/giraph-core/src/main/java/org/apache/giraph/conf/GiraphConfiguration.java
+++ b/giraph-core/src/main/java/org/apache/giraph/conf/GiraphConfiguration.java
@@ -302,6 +302,30 @@ public class GiraphConfiguration extends Configuration
}
/**
+ * Get the number of threads to use for writing output in the end of the
+ * application. If output format is not thread safe, returns 1.
+ *
+ * @return Number of output threads
+ */
+ public final int getNumOutputThreads() {
+ if (!vertexOutputFormatThreadSafe()) {
+ return 1;
+ } else {
+ return NUM_OUTPUT_THREADS.get(this);
+ }
+ }
+
+ /**
+ * Set the number of threads to use for writing output in the end of the
+ * application. Will be used only if {#vertexOutputFormatThreadSafe} is true.
+ *
+ * @param numOutputThreads Number of output threads
+ */
+ public void setNumOutputThreads(int numOutputThreads) {
+ NUM_OUTPUT_THREADS.set(this, numOutputThreads);
+ }
+
+ /**
* Set the vertex combiner class (optional)
*
* @param vertexCombinerClass Determines how vertex messages are combined
http://git-wip-us.apache.org/repos/asf/giraph/blob/621a022c/giraph-core/src/main/java/org/apache/giraph/conf/GiraphConstants.java
----------------------------------------------------------------------
diff --git a/giraph-core/src/main/java/org/apache/giraph/conf/GiraphConstants.java b/giraph-core/src/main/java/org/apache/giraph/conf/GiraphConstants.java
index 95c9862..21e094d 100644
--- a/giraph-core/src/main/java/org/apache/giraph/conf/GiraphConstants.java
+++ b/giraph-core/src/main/java/org/apache/giraph/conf/GiraphConstants.java
@@ -140,6 +140,9 @@ public interface GiraphConstants {
*/
BooleanConfOption VERTEX_OUTPUT_FORMAT_THREAD_SAFE =
new BooleanConfOption("giraph.vertexOutputFormatThreadSafe", false);
+ /** Number of threads for writing output in the end of the application */
+ IntConfOption NUM_OUTPUT_THREADS =
+ new IntConfOption("giraph.numOutputThreads", 1);
/** conf key for comma-separated list of jars to export to YARN workers */
StrConfOption GIRAPH_YARN_LIBJARS =
http://git-wip-us.apache.org/repos/asf/giraph/blob/621a022c/giraph-core/src/main/java/org/apache/giraph/edge/EdgeStore.java
----------------------------------------------------------------------
diff --git a/giraph-core/src/main/java/org/apache/giraph/edge/EdgeStore.java b/giraph-core/src/main/java/org/apache/giraph/edge/EdgeStore.java
index 01a67dd..e8cb620 100644
--- a/giraph-core/src/main/java/org/apache/giraph/edge/EdgeStore.java
+++ b/giraph-core/src/main/java/org/apache/giraph/edge/EdgeStore.java
@@ -19,19 +19,16 @@
package org.apache.giraph.edge;
import com.google.common.collect.MapMaker;
-import com.google.common.util.concurrent.ThreadFactoryBuilder;
import java.util.concurrent.ArrayBlockingQueue;
import java.util.concurrent.BlockingQueue;
import java.util.concurrent.Callable;
import java.util.concurrent.ConcurrentMap;
-import java.util.concurrent.ExecutorService;
-import java.util.concurrent.Executors;
import org.apache.giraph.bsp.CentralizedServiceWorker;
import org.apache.giraph.conf.ImmutableClassesGiraphConfiguration;
import org.apache.giraph.graph.Vertex;
import org.apache.giraph.partition.Partition;
import org.apache.giraph.utils.ByteArrayVertexIdEdges;
-import org.apache.giraph.utils.LogStacktraceCallable;
+import org.apache.giraph.utils.CallableFactory;
import org.apache.giraph.utils.ProgressableUtils;
import org.apache.hadoop.io.Writable;
import org.apache.hadoop.io.WritableComparable;
@@ -178,53 +175,50 @@ public class EdgeStore<I extends WritableComparable,
new ArrayBlockingQueue<Integer>(transientEdges.size());
partitionIdQueue.addAll(transientEdges.keySet());
int numThreads = configuration.getNumInputSplitsThreads();
- ExecutorService movePartitionExecutor =
- Executors.newFixedThreadPool(numThreads,
- new ThreadFactoryBuilder().setNameFormat("move-edges-%d").build());
- for (int i = 0; i < numThreads; ++i) {
- Callable moveCallable = new Callable<Void>() {
- @Override
- public Void call() throws Exception {
- Integer partitionId;
- while ((partitionId = partitionIdQueue.poll()) != null) {
- Partition<I, V, E, M> partition =
- service.getPartitionStore().getPartition(partitionId);
- ConcurrentMap<I, VertexEdges<I, E>> partitionEdges =
- transientEdges.remove(partitionId);
- for (I vertexId : partitionEdges.keySet()) {
- VertexEdges<I, E> vertexEdges = convertInputToComputeEdges(
- partitionEdges.remove(vertexId));
- Vertex<I, V, E, M> vertex = partition.getVertex(vertexId);
- // If the source vertex doesn't exist, create it. Otherwise,
- // just set the edges.
- if (vertex == null) {
- vertex = configuration.createVertex();
- vertex.initialize(vertexId, configuration.createVertexValue(),
- vertexEdges);
- partition.putVertex(vertex);
- } else {
- vertex.setEdges(vertexEdges);
- // Some Partition implementations (e.g. ByteArrayPartition)
- // require us to put back the vertex after modifying it.
- partition.saveVertex(vertex);
+ CallableFactory<Void> callableFactory = new CallableFactory<Void>() {
+ @Override
+ public Callable<Void> newCallable(int callableId) {
+ return new Callable<Void>() {
+ @Override
+ public Void call() throws Exception {
+ Integer partitionId;
+ while ((partitionId = partitionIdQueue.poll()) != null) {
+ Partition<I, V, E, M> partition =
+ service.getPartitionStore().getPartition(partitionId);
+ ConcurrentMap<I, VertexEdges<I, E>> partitionEdges =
+ transientEdges.remove(partitionId);
+ for (I vertexId : partitionEdges.keySet()) {
+ VertexEdges<I, E> vertexEdges = convertInputToComputeEdges(
+ partitionEdges.remove(vertexId));
+ Vertex<I, V, E, M> vertex = partition.getVertex(vertexId);
+ // If the source vertex doesn't exist, create it. Otherwise,
+ // just set the edges.
+ if (vertex == null) {
+ vertex = configuration.createVertex();
+ vertex.initialize(vertexId, configuration.createVertexValue(),
+ vertexEdges);
+ partition.putVertex(vertex);
+ } else {
+ vertex.setEdges(vertexEdges);
+ // Some Partition implementations (e.g. ByteArrayPartition)
+ // require us to put back the vertex after modifying it.
+ partition.saveVertex(vertex);
+ }
}
+ // Some PartitionStore implementations
+ // (e.g. DiskBackedPartitionStore) require us to put back the
+ // partition after modifying it.
+ service.getPartitionStore().putPartition(partition);
}
- // Some PartitionStore implementations
- // (e.g. DiskBackedPartitionStore) require us to put back the
- // partition after modifying it.
- service.getPartitionStore().putPartition(partition);
+ return null;
}
- return null;
- }
- };
- movePartitionExecutor.submit(
- new LogStacktraceCallable<Void>(moveCallable));
- }
+ };
+ }
+ };
+ ProgressableUtils.getResultsWithNCallables(callableFactory, numThreads,
+ "move-edges-%d", progressable);
- movePartitionExecutor.shutdown();
- ProgressableUtils.awaitExecutorTermination(movePartitionExecutor,
- progressable);
transientEdges.clear();
if (LOG.isInfoEnabled()) {
http://git-wip-us.apache.org/repos/asf/giraph/blob/621a022c/giraph-core/src/main/java/org/apache/giraph/graph/ComputeCallable.java
----------------------------------------------------------------------
diff --git a/giraph-core/src/main/java/org/apache/giraph/graph/ComputeCallable.java b/giraph-core/src/main/java/org/apache/giraph/graph/ComputeCallable.java
index 51ed4f6..0fc5fdf 100644
--- a/giraph-core/src/main/java/org/apache/giraph/graph/ComputeCallable.java
+++ b/giraph-core/src/main/java/org/apache/giraph/graph/ComputeCallable.java
@@ -68,7 +68,8 @@ import java.util.concurrent.Callable;
* @param <M> Message data
*/
public class ComputeCallable<I extends WritableComparable, V extends Writable,
- E extends Writable, M extends Writable> implements Callable {
+ E extends Writable, M extends Writable>
+ implements Callable<Collection<PartitionStats>> {
/** Class logger */
private static final Logger LOG = Logger.getLogger(ComputeCallable.class);
/** Class time object */
http://git-wip-us.apache.org/repos/asf/giraph/blob/621a022c/giraph-core/src/main/java/org/apache/giraph/graph/GraphTaskManager.java
----------------------------------------------------------------------
diff --git a/giraph-core/src/main/java/org/apache/giraph/graph/GraphTaskManager.java b/giraph-core/src/main/java/org/apache/giraph/graph/GraphTaskManager.java
index abca4c4..97cf55d 100644
--- a/giraph-core/src/main/java/org/apache/giraph/graph/GraphTaskManager.java
+++ b/giraph-core/src/main/java/org/apache/giraph/graph/GraphTaskManager.java
@@ -37,7 +37,7 @@ import org.apache.giraph.partition.PartitionOwner;
import org.apache.giraph.partition.PartitionStats;
import org.apache.giraph.time.SystemTime;
import org.apache.giraph.time.Time;
-import org.apache.giraph.utils.LogStacktraceCallable;
+import org.apache.giraph.utils.CallableFactory;
import org.apache.giraph.utils.MemoryUtils;
import org.apache.giraph.utils.ProgressableUtils;
import org.apache.giraph.utils.ReflectionUtils;
@@ -56,9 +56,6 @@ import org.apache.log4j.Level;
import org.apache.log4j.Logger;
import org.apache.log4j.PatternLayout;
-import com.google.common.collect.Lists;
-import com.google.common.util.concurrent.ThreadFactoryBuilder;
-
import java.io.IOException;
import java.lang.reflect.Type;
import java.net.URL;
@@ -69,9 +66,7 @@ import java.util.Enumeration;
import java.util.List;
import java.util.concurrent.ArrayBlockingQueue;
import java.util.concurrent.BlockingQueue;
-import java.util.concurrent.ExecutorService;
-import java.util.concurrent.Executors;
-import java.util.concurrent.Future;
+import java.util.concurrent.Callable;
import java.util.concurrent.TimeUnit;
import static org.apache.giraph.conf.GiraphConstants.EDGE_VALUE_CLASS;
@@ -725,13 +720,13 @@ public class GraphTaskManager<I extends WritableComparable, V extends Writable,
* @param numPartitions the number of data partitions (vertices) to process
* @param numThreads number of concurrent threads to do processing
*/
- private void processGraphPartitions(Mapper<?, ?, ?, ?>.Context context,
- List<PartitionStats> partitionStatsList, GraphState<I, V, E, M> graphState,
- MessageStoreByPartition<I, M> messageStore, int numPartitions,
- int numThreads) {
- List<Future<Collection<PartitionStats>>> partitionFutures =
- Lists.newArrayListWithCapacity(numPartitions);
- BlockingQueue<Integer> computePartitionIdQueue =
+ private void processGraphPartitions(final Mapper<?, ?, ?, ?>.Context context,
+ List<PartitionStats> partitionStatsList,
+ final GraphState<I, V, E, M> graphState,
+ final MessageStoreByPartition<I, M> messageStore,
+ int numPartitions,
+ int numThreads) {
+ final BlockingQueue<Integer> computePartitionIdQueue =
new ArrayBlockingQueue<Integer>(numPartitions);
for (Integer partitionId :
serviceWorker.getPartitionStore().getPartitionIds()) {
@@ -741,32 +736,27 @@ public class GraphTaskManager<I extends WritableComparable, V extends Writable,
GiraphTimerContext computeAllTimerContext = computeAll.time();
timeToFirstMessageTimerContext = timeToFirstMessage.time();
- ExecutorService partitionExecutor =
- Executors.newFixedThreadPool(numThreads,
- new ThreadFactoryBuilder().setNameFormat("compute-%d").build());
- for (int i = 0; i < numThreads; ++i) {
- ComputeCallable<I, V, E, M> computeCallable =
- new ComputeCallable<I, V, E, M>(
- context,
- graphState,
- messageStore,
- computePartitionIdQueue,
- conf,
- serviceWorker);
- LogStacktraceCallable<Collection<PartitionStats>> wrapped =
- new LogStacktraceCallable<Collection<PartitionStats>>(
- computeCallable);
- partitionFutures.add(partitionExecutor.submit(wrapped));
- }
-
- // Wait until all the threads are done to wait on all requests
- for (Future<Collection<PartitionStats>> partitionFuture :
- partitionFutures) {
- Collection<PartitionStats> stats =
- ProgressableUtils.getFutureResult(partitionFuture, context);
- partitionStatsList.addAll(stats);
- }
- partitionExecutor.shutdown();
+ CallableFactory<Collection<PartitionStats>> callableFactory =
+ new CallableFactory<Collection<PartitionStats>>() {
+ @Override
+ public Callable<Collection<PartitionStats>> newCallable(
+ int callableId) {
+ return new ComputeCallable<I, V, E, M>(
+ context,
+ graphState,
+ messageStore,
+ computePartitionIdQueue,
+ conf,
+ serviceWorker);
+ }
+ };
+ List<Collection<PartitionStats>> results =
+ ProgressableUtils.getResultsWithNCallables(callableFactory,
+ numThreads, "compute-%d", context);
+ for (Collection<PartitionStats> result : results) {
+ partitionStatsList.addAll(result);
+ }
+
computeAllTimerContext.stop();
}
http://git-wip-us.apache.org/repos/asf/giraph/blob/621a022c/giraph-core/src/main/java/org/apache/giraph/utils/CallableFactory.java
----------------------------------------------------------------------
diff --git a/giraph-core/src/main/java/org/apache/giraph/utils/CallableFactory.java b/giraph-core/src/main/java/org/apache/giraph/utils/CallableFactory.java
new file mode 100644
index 0000000..72d5b15
--- /dev/null
+++ b/giraph-core/src/main/java/org/apache/giraph/utils/CallableFactory.java
@@ -0,0 +1,36 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.giraph.utils;
+
+import java.util.concurrent.Callable;
+
+/**
+ * Factory for creating {@link Callable}s
+ *
+ * @param <R> Callable result type
+ */
+public interface CallableFactory<R> {
+ /**
+ * Create new callable
+ *
+ * @param callableId Id of the callable
+ * @return Callable
+ */
+ Callable<R> newCallable(int callableId);
+}
http://git-wip-us.apache.org/repos/asf/giraph/blob/621a022c/giraph-core/src/main/java/org/apache/giraph/utils/ProgressableUtils.java
----------------------------------------------------------------------
diff --git a/giraph-core/src/main/java/org/apache/giraph/utils/ProgressableUtils.java b/giraph-core/src/main/java/org/apache/giraph/utils/ProgressableUtils.java
index 77eb49a..3b06604 100644
--- a/giraph-core/src/main/java/org/apache/giraph/utils/ProgressableUtils.java
+++ b/giraph-core/src/main/java/org/apache/giraph/utils/ProgressableUtils.java
@@ -23,8 +23,14 @@ import org.apache.log4j.Logger;
import org.jboss.netty.channel.ChannelFuture;
import org.jboss.netty.channel.group.ChannelGroupFuture;
+import com.google.common.collect.Lists;
+import com.google.common.util.concurrent.ThreadFactoryBuilder;
+
+import java.util.List;
+import java.util.concurrent.Callable;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.ExecutorService;
+import java.util.concurrent.Executors;
import java.util.concurrent.Future;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.TimeoutException;
@@ -157,6 +163,39 @@ public class ProgressableUtils {
}
/**
+ * Create {#link numThreads} callables from {#link callableFactory},
+ * execute them and gather results.
+ *
+ * @param callableFactory Factory for Callables
+ * @param numThreads Number of threads to use
+ * @param threadNameFormat Format for thread name
+ * @param progressable Progressable for reporting progress
+ * @param <R> Type of Callable's results
+ * @return List of results from Callables
+ */
+ public static <R> List<R> getResultsWithNCallables(
+ CallableFactory<R> callableFactory, int numThreads,
+ String threadNameFormat, Progressable progressable) {
+ ExecutorService executorService =
+ Executors.newFixedThreadPool(numThreads,
+ new ThreadFactoryBuilder().setNameFormat(threadNameFormat).build());
+ List<Future<R>> futures = Lists.newArrayListWithCapacity(numThreads);
+ for (int i = 0; i < numThreads; i++) {
+ Callable<R> callable = callableFactory.newCallable(i);
+ Future<R> future = executorService.submit(
+ new LogStacktraceCallable<R>(callable));
+ futures.add(future);
+ }
+ executorService.shutdown();
+ List<R> futureResults = Lists.newArrayListWithCapacity(numThreads);
+ for (Future<R> future : futures) {
+ R result = ProgressableUtils.getFutureResult(future, progressable);
+ futureResults.add(result);
+ }
+ return futureResults;
+ }
+
+ /**
* Interface for waiting on a result from some operation.
*
* @param <T> Result type.
http://git-wip-us.apache.org/repos/asf/giraph/blob/621a022c/giraph-core/src/main/java/org/apache/giraph/worker/BspServiceWorker.java
----------------------------------------------------------------------
diff --git a/giraph-core/src/main/java/org/apache/giraph/worker/BspServiceWorker.java b/giraph-core/src/main/java/org/apache/giraph/worker/BspServiceWorker.java
index c20d06e..037cdfc 100644
--- a/giraph-core/src/main/java/org/apache/giraph/worker/BspServiceWorker.java
+++ b/giraph-core/src/main/java/org/apache/giraph/worker/BspServiceWorker.java
@@ -42,7 +42,7 @@ import org.apache.giraph.graph.FinishedSuperstepStats;
import org.apache.giraph.graph.AddressesAndPartitionsWritable;
import org.apache.giraph.graph.GlobalStats;
import org.apache.giraph.io.superstep_output.SuperstepOutput;
-import org.apache.giraph.utils.LogStacktraceCallable;
+import org.apache.giraph.utils.CallableFactory;
import org.apache.giraph.utils.JMapHistoDumper;
import org.apache.giraph.graph.Vertex;
import org.apache.giraph.io.VertexOutputFormat;
@@ -85,7 +85,6 @@ import org.json.JSONException;
import org.json.JSONObject;
import com.google.common.collect.Lists;
-import com.google.common.util.concurrent.ThreadFactoryBuilder;
import net.iharder.Base64;
import java.io.ByteArrayOutputStream;
@@ -102,9 +101,6 @@ import java.util.Map;
import java.util.Map.Entry;
import java.util.Set;
import java.util.concurrent.Callable;
-import java.util.concurrent.ExecutorService;
-import java.util.concurrent.Executors;
-import java.util.concurrent.Future;
import java.util.concurrent.TimeUnit;
/**
@@ -263,7 +259,7 @@ public class BspServiceWorker<I extends WritableComparable,
*/
private VertexEdgeCount loadInputSplits(
List<String> inputSplitPathList,
- InputSplitsCallableFactory<I, V, E, M> inputSplitsCallableFactory)
+ CallableFactory<VertexEdgeCount> inputSplitsCallableFactory)
throws KeeperException, InterruptedException {
VertexEdgeCount vertexEdgeCount = new VertexEdgeCount();
// Determine how many threads to use based on the number of input splits
@@ -271,35 +267,20 @@ public class BspServiceWorker<I extends WritableComparable,
getConfiguration().getMaxWorkers() + 1;
int numThreads = Math.min(getConfiguration().getNumInputSplitsThreads(),
maxInputSplitThreads);
- ExecutorService inputSplitsExecutor =
- Executors.newFixedThreadPool(numThreads,
- new ThreadFactoryBuilder().setNameFormat("load-%d").build());
- List<Future<VertexEdgeCount>> threadsFutures =
- Lists.newArrayListWithCapacity(numThreads);
if (LOG.isInfoEnabled()) {
LOG.info("loadInputSplits: Using " + numThreads + " thread(s), " +
"originally " + getConfiguration().getNumInputSplitsThreads() +
" threads(s) for " + inputSplitPathList.size() + " total splits.");
}
- for (int i = 0; i < numThreads; ++i) {
- Callable<VertexEdgeCount> inputSplitsCallable =
- inputSplitsCallableFactory.newCallable(i);
- LogStacktraceCallable<VertexEdgeCount> wrapped =
- new LogStacktraceCallable<VertexEdgeCount>(
- inputSplitsCallable);
- threadsFutures.add(inputSplitsExecutor.submit(wrapped));
- }
- // Wait until all the threads are done to wait on all requests
- for (Future<VertexEdgeCount> threadFuture : threadsFutures) {
- VertexEdgeCount threadVertexEdgeCount =
- ProgressableUtils.getFutureResult(threadFuture, getContext());
- vertexEdgeCount =
- vertexEdgeCount.incrVertexEdgeCount(threadVertexEdgeCount);
+ List<VertexEdgeCount> results =
+ ProgressableUtils.getResultsWithNCallables(inputSplitsCallableFactory,
+ numThreads, "load-%d", getContext());
+ for (VertexEdgeCount result : results) {
+ vertexEdgeCount = vertexEdgeCount.incrVertexEdgeCount(result);
}
workerClient.waitAllRequests();
- inputSplitsExecutor.shutdown();
return vertexEdgeCount;
}
@@ -946,47 +927,60 @@ else[HADOOP_NON_SECURE]*/
return;
}
+ int numThreads = Math.min(getConfiguration().getNumOutputThreads(),
+ getPartitionStore().getNumPartitions());
LoggerUtils.setStatusAndLog(getContext(), LOG, Level.INFO,
- "saveVertices: Starting to save " + numLocalVertices + " vertices");
- VertexOutputFormat<I, V, E> vertexOutputFormat =
+ "saveVertices: Starting to save " + numLocalVertices + " vertices " +
+ "using " + numThreads + " threads");
+ final VertexOutputFormat<I, V, E> vertexOutputFormat =
getConfiguration().createVertexOutputFormat();
- VertexWriter<I, V, E> vertexWriter =
- vertexOutputFormat.createVertexWriter(getContext());
- vertexWriter.setConf(
- (ImmutableClassesGiraphConfiguration<I, V, E, Writable>)
- getConfiguration());
- vertexWriter.initialize(getContext());
- long verticesWritten = 0;
- long nextPrintVertices = 0;
- long nextPrintMsecs = System.currentTimeMillis() + 15000;
- int partitionIndex = 0;
- int numPartitions = getPartitionStore().getNumPartitions();
- for (Integer partitionId : getPartitionStore().getPartitionIds()) {
- Partition<I, V, E, M> partition =
- getPartitionStore().getPartition(partitionId);
- for (Vertex<I, V, E, M> vertex : partition) {
- getContext().progress();
- vertexWriter.writeVertex(vertex);
- ++verticesWritten;
-
- // Update status at most every 250k vertices or 15 seconds
- if (verticesWritten > nextPrintVertices &&
- System.currentTimeMillis() > nextPrintMsecs) {
- LoggerUtils.setStatusAndLog(
- getContext(), LOG, Level.INFO,
- "saveVertices: Saved " +
- verticesWritten + " out of " + numLocalVertices +
- " vertices, on partition " + partitionIndex + " out of " +
- numPartitions);
- nextPrintMsecs = System.currentTimeMillis() + 15000;
- nextPrintVertices = verticesWritten + 250000;
- }
+ CallableFactory<Void> callableFactory = new CallableFactory<Void>() {
+ @Override
+ public Callable<Void> newCallable(int callableId) {
+ return new Callable<Void>() {
+ @Override
+ public Void call() throws Exception {
+ VertexWriter<I, V, E> vertexWriter =
+ vertexOutputFormat.createVertexWriter(getContext());
+ vertexWriter.setConf(
+ (ImmutableClassesGiraphConfiguration<I, V, E, Writable>)
+ getConfiguration());
+ vertexWriter.initialize(getContext());
+ long verticesWritten = 0;
+ long nextPrintVertices = 0;
+ long nextPrintMsecs = System.currentTimeMillis() + 15000;
+ int partitionIndex = 0;
+ int numPartitions = getPartitionStore().getNumPartitions();
+ for (Integer partitionId : getPartitionStore().getPartitionIds()) {
+ Partition<I, V, E, M> partition =
+ getPartitionStore().getPartition(partitionId);
+ for (Vertex<I, V, E, M> vertex : partition) {
+ vertexWriter.writeVertex(vertex);
+ ++verticesWritten;
+
+ // Update status at most every 250k vertices or 15 seconds
+ if (verticesWritten > nextPrintVertices &&
+ System.currentTimeMillis() > nextPrintMsecs) {
+ LoggerUtils.setStatusAndLog(getContext(), LOG, Level.INFO,
+ "saveVertices: Saved " + verticesWritten + " out of " +
+ partition.getVertexCount() + " partition vertices, " +
+ "on partition " + partitionIndex +
+ " out of " + numPartitions);
+ nextPrintMsecs = System.currentTimeMillis() + 15000;
+ nextPrintVertices = verticesWritten + 250000;
+ }
+ }
+ ++partitionIndex;
+ }
+ vertexWriter.close(getContext()); // the temp results are saved now
+ return null;
+ }
+ };
}
- getPartitionStore().putPartition(partition);
- getContext().progress();
- ++partitionIndex;
- }
- vertexWriter.close(getContext()); // the temp results are saved now
+ };
+ ProgressableUtils.getResultsWithNCallables(callableFactory, numThreads,
+ "save-vertices-%d", getContext());
+
LoggerUtils.setStatusAndLog(getContext(), LOG, Level.INFO,
"saveVertices: Done saving vertices.");
// YARN: must complete the commit the "task" output, Hadoop isn't there.
http://git-wip-us.apache.org/repos/asf/giraph/blob/621a022c/giraph-core/src/main/java/org/apache/giraph/worker/EdgeInputSplitsCallableFactory.java
----------------------------------------------------------------------
diff --git a/giraph-core/src/main/java/org/apache/giraph/worker/EdgeInputSplitsCallableFactory.java b/giraph-core/src/main/java/org/apache/giraph/worker/EdgeInputSplitsCallableFactory.java
index 9297ac1..4a1705b 100644
--- a/giraph-core/src/main/java/org/apache/giraph/worker/EdgeInputSplitsCallableFactory.java
+++ b/giraph-core/src/main/java/org/apache/giraph/worker/EdgeInputSplitsCallableFactory.java
@@ -20,6 +20,8 @@ package org.apache.giraph.worker;
import org.apache.giraph.conf.ImmutableClassesGiraphConfiguration;
import org.apache.giraph.graph.GraphState;
+import org.apache.giraph.graph.VertexEdgeCount;
+import org.apache.giraph.utils.CallableFactory;
import org.apache.giraph.zk.ZooKeeperExt;
import org.apache.hadoop.io.Writable;
import org.apache.hadoop.io.WritableComparable;
@@ -35,7 +37,7 @@ import org.apache.hadoop.mapreduce.Mapper;
*/
public class EdgeInputSplitsCallableFactory<I extends WritableComparable,
V extends Writable, E extends Writable, M extends Writable>
- implements InputSplitsCallableFactory<I, V, E, M> {
+ implements CallableFactory<VertexEdgeCount> {
/** Mapper context. */
private final Mapper<?, ?, ?, ?>.Context context;
/** Graph state. */
http://git-wip-us.apache.org/repos/asf/giraph/blob/621a022c/giraph-core/src/main/java/org/apache/giraph/worker/InputSplitsCallableFactory.java
----------------------------------------------------------------------
diff --git a/giraph-core/src/main/java/org/apache/giraph/worker/InputSplitsCallableFactory.java b/giraph-core/src/main/java/org/apache/giraph/worker/InputSplitsCallableFactory.java
deleted file mode 100644
index cdc6543..0000000
--- a/giraph-core/src/main/java/org/apache/giraph/worker/InputSplitsCallableFactory.java
+++ /dev/null
@@ -1,41 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.giraph.worker;
-
-import org.apache.hadoop.io.Writable;
-import org.apache.hadoop.io.WritableComparable;
-
-/**
- * Factory class for creating {@link InputSplitsCallable}s.
- *
- * @param <I> Vertex id
- * @param <V> Vertex value
- * @param <E> Edge value
- * @param <M> Message data
- */
-public interface InputSplitsCallableFactory<I extends WritableComparable,
- V extends Writable, E extends Writable, M extends Writable> {
- /**
- * Return a newly-created {@link InputSplitsCallable}.
- *
- * @param threadId Id of input split thread
- * @return A new {@link InputSplitsCallable}
- */
- InputSplitsCallable<I, V, E, M> newCallable(int threadId);
-}
http://git-wip-us.apache.org/repos/asf/giraph/blob/621a022c/giraph-core/src/main/java/org/apache/giraph/worker/VertexInputSplitsCallableFactory.java
----------------------------------------------------------------------
diff --git a/giraph-core/src/main/java/org/apache/giraph/worker/VertexInputSplitsCallableFactory.java b/giraph-core/src/main/java/org/apache/giraph/worker/VertexInputSplitsCallableFactory.java
index aebca81..4eff3b8 100644
--- a/giraph-core/src/main/java/org/apache/giraph/worker/VertexInputSplitsCallableFactory.java
+++ b/giraph-core/src/main/java/org/apache/giraph/worker/VertexInputSplitsCallableFactory.java
@@ -20,6 +20,8 @@ package org.apache.giraph.worker;
import org.apache.giraph.conf.ImmutableClassesGiraphConfiguration;
import org.apache.giraph.graph.GraphState;
+import org.apache.giraph.graph.VertexEdgeCount;
+import org.apache.giraph.utils.CallableFactory;
import org.apache.giraph.zk.ZooKeeperExt;
import org.apache.hadoop.io.Writable;
import org.apache.hadoop.io.WritableComparable;
@@ -35,7 +37,7 @@ import org.apache.hadoop.mapreduce.Mapper;
*/
public class VertexInputSplitsCallableFactory<I extends WritableComparable,
V extends Writable, E extends Writable, M extends Writable>
- implements InputSplitsCallableFactory<I, V, E, M> {
+ implements CallableFactory<VertexEdgeCount> {
/** Mapper context. */
private final Mapper<?, ?, ?, ?>.Context context;
/** Graph state. */