You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@giraph.apache.org by ma...@apache.org on 2013/03/30 09:14:14 UTC
git commit: updated refs/heads/trunk to 67f5f74
Updated Branches:
refs/heads/trunk da2a68708 -> 67f5f7475
GIRAPH-600: Create an option to do output during computation (majakabiljo)
Project: http://git-wip-us.apache.org/repos/asf/giraph/repo
Commit: http://git-wip-us.apache.org/repos/asf/giraph/commit/67f5f747
Tree: http://git-wip-us.apache.org/repos/asf/giraph/tree/67f5f747
Diff: http://git-wip-us.apache.org/repos/asf/giraph/diff/67f5f747
Branch: refs/heads/trunk
Commit: 67f5f747578b0e67944da2fea9e4a3d3a22c4e09
Parents: da2a687
Author: Maja Kabiljo <ma...@maja-mbp.local>
Authored: Sat Mar 30 01:13:29 2013 -0700
Committer: Maja Kabiljo <ma...@maja-mbp.local>
Committed: Sat Mar 30 01:13:29 2013 -0700
----------------------------------------------------------------------
CHANGELOG | 2 +
.../giraph/bsp/CentralizedServiceWorker.java | 8 +
.../apache/giraph/conf/GiraphConfiguration.java | 40 +++++
.../org/apache/giraph/conf/GiraphConstants.java | 16 ++
.../conf/ImmutableClassesGiraphConfiguration.java | 24 +++
.../org/apache/giraph/graph/ComputeCallable.java | 15 ++-
.../org/apache/giraph/graph/GraphTaskManager.java | 3 +-
.../org/apache/giraph/io/SimpleVertexWriter.java | 45 ++++++
.../java/org/apache/giraph/io/VertexWriter.java | 13 +--
.../MultiThreadedSuperstepOutput.java | 119 +++++++++++++++
.../io/superstep_output/NoOpSuperstepOutput.java | 57 +++++++
.../io/superstep_output/SuperstepOutput.java | 57 +++++++
.../SynchronizedSuperstepOutput.java | 95 ++++++++++++
.../giraph/io/superstep_output/package-info.java | 21 +++
.../org/apache/giraph/worker/BspServiceWorker.java | 21 +++-
15 files changed, 521 insertions(+), 15 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/giraph/blob/67f5f747/CHANGELOG
----------------------------------------------------------------------
diff --git a/CHANGELOG b/CHANGELOG
index 8263026..e0a82a6 100644
--- a/CHANGELOG
+++ b/CHANGELOG
@@ -1,6 +1,8 @@
Giraph Change Log
Release 0.2.0 - unreleased
+ GIRAPH-600: Create an option to do output during computation (majakabiljo)
+
GIRAPH-599: Hive IO dependency issues with some Hadoop profiles (nitay via majakabiljo)
GIRAPH-577: Create a testing framework that doesn't require I/O formats (ves via apresta)
http://git-wip-us.apache.org/repos/asf/giraph/blob/67f5f747/giraph-core/src/main/java/org/apache/giraph/bsp/CentralizedServiceWorker.java
----------------------------------------------------------------------
diff --git a/giraph-core/src/main/java/org/apache/giraph/bsp/CentralizedServiceWorker.java b/giraph-core/src/main/java/org/apache/giraph/bsp/CentralizedServiceWorker.java
index 56b5d03..1c7bde4 100644
--- a/giraph-core/src/main/java/org/apache/giraph/bsp/CentralizedServiceWorker.java
+++ b/giraph-core/src/main/java/org/apache/giraph/bsp/CentralizedServiceWorker.java
@@ -24,6 +24,7 @@ import org.apache.giraph.graph.FinishedSuperstepStats;
import org.apache.giraph.graph.GraphState;
import org.apache.giraph.graph.GraphTaskManager;
import org.apache.giraph.graph.VertexEdgeCount;
+import org.apache.giraph.io.superstep_output.SuperstepOutput;
import org.apache.giraph.master.MasterInfo;
import org.apache.giraph.partition.PartitionOwner;
import org.apache.giraph.partition.PartitionStats;
@@ -223,6 +224,13 @@ public interface CentralizedServiceWorker<I extends WritableComparable,
void prepareSuperstep();
/**
+ * Get the superstep output class
+ *
+ * @return SuperstepOutput
+ */
+ SuperstepOutput<I, V, E> getSuperstepOutput();
+
+ /**
* Clean up the service (no calls may be issued after this)
*
* @param finishedSuperstepStats Finished supestep stats
http://git-wip-us.apache.org/repos/asf/giraph/blob/67f5f747/giraph-core/src/main/java/org/apache/giraph/conf/GiraphConfiguration.java
----------------------------------------------------------------------
diff --git a/giraph-core/src/main/java/org/apache/giraph/conf/GiraphConfiguration.java b/giraph-core/src/main/java/org/apache/giraph/conf/GiraphConfiguration.java
index 963b82a..040c26f 100644
--- a/giraph-core/src/main/java/org/apache/giraph/conf/GiraphConfiguration.java
+++ b/giraph-core/src/main/java/org/apache/giraph/conf/GiraphConfiguration.java
@@ -262,6 +262,46 @@ public class GiraphConfiguration extends Configuration
}
/**
+ * Check if output should be done during computation
+ *
+ * @return True iff output should be done during computation
+ */
+ public final boolean doOutputDuringComputation() {
+ return DO_OUTPUT_DURING_COMPUTATION.get(this);
+ }
+
+ /**
+ * Set whether or not we should do output during computation
+ *
+ * @param doOutputDuringComputation True iff we want output to happen
+ * during computation
+ */
+ public final void setDoOutputDuringComputation(
+ boolean doOutputDuringComputation) {
+ DO_OUTPUT_DURING_COMPUTATION.set(this, doOutputDuringComputation);
+ }
+
+ /**
+ * Check if VertexOutputFormat is thread-safe
+ *
+ * @return True iff VertexOutputFormat is thread-safe
+ */
+ public final boolean vertexOutputFormatThreadSafe() {
+ return VERTEX_OUTPUT_FORMAT_THREAD_SAFE.get(this);
+ }
+
+ /**
+ * Set whether or not selected VertexOutputFormat is thread-safe
+ *
+ * @param vertexOutputFormatThreadSafe True iff selected VertexOutputFormat
+ * is thread-safe
+ */
+ public final void setVertexOutputFormatThreadSafe(
+ boolean vertexOutputFormatThreadSafe) {
+ VERTEX_OUTPUT_FORMAT_THREAD_SAFE.set(this, vertexOutputFormatThreadSafe);
+ }
+
+ /**
* Set the vertex combiner class (optional)
*
* @param vertexCombinerClass Determines how vertex messages are combined
http://git-wip-us.apache.org/repos/asf/giraph/blob/67f5f747/giraph-core/src/main/java/org/apache/giraph/conf/GiraphConstants.java
----------------------------------------------------------------------
diff --git a/giraph-core/src/main/java/org/apache/giraph/conf/GiraphConstants.java b/giraph-core/src/main/java/org/apache/giraph/conf/GiraphConstants.java
index c5b9b93..eaa8363 100644
--- a/giraph-core/src/main/java/org/apache/giraph/conf/GiraphConstants.java
+++ b/giraph-core/src/main/java/org/apache/giraph/conf/GiraphConstants.java
@@ -124,6 +124,22 @@ public interface GiraphConstants {
ClassConfOption<VertexOutputFormat> VERTEX_OUTPUT_FORMAT_CLASS =
ClassConfOption.create("giraph.vertexOutputFormatClass", null,
VertexOutputFormat.class);
+ /**
+ * If you use this option, instead of having saving vertices in the end of
+ * application, saveVertex will be called right after each vertex.compute()
+ * is called.
+ * NOTE: This feature doesn't work well with checkpointing - if you restart
+ * from a checkpoint you won't have any ouptut from previous supresteps.
+ */
+ BooleanConfOption DO_OUTPUT_DURING_COMPUTATION =
+ new BooleanConfOption("giraph.doOutputDuringComputation", false);
+ /**
+ * Vertex output format thread-safe - if your VertexOutputFormat allows
+ * several vertexWriters to be created and written to in parallel,
+ * you should set this to true.
+ */
+ BooleanConfOption VERTEX_OUTPUT_FORMAT_THREAD_SAFE =
+ new BooleanConfOption("giraph.vertexOutputFormatThreadSafe", false);
/** Output Format Path (for Giraph-on-YARN) */
String GIRAPH_OUTPUT_DIR = "giraph.output.dir";
http://git-wip-us.apache.org/repos/asf/giraph/blob/67f5f747/giraph-core/src/main/java/org/apache/giraph/conf/ImmutableClassesGiraphConfiguration.java
----------------------------------------------------------------------
diff --git a/giraph-core/src/main/java/org/apache/giraph/conf/ImmutableClassesGiraphConfiguration.java b/giraph-core/src/main/java/org/apache/giraph/conf/ImmutableClassesGiraphConfiguration.java
index 76f6105..e290c57 100644
--- a/giraph-core/src/main/java/org/apache/giraph/conf/ImmutableClassesGiraphConfiguration.java
+++ b/giraph-core/src/main/java/org/apache/giraph/conf/ImmutableClassesGiraphConfiguration.java
@@ -31,6 +31,10 @@ import org.apache.giraph.graph.VertexValueFactory;
import org.apache.giraph.io.EdgeInputFormat;
import org.apache.giraph.io.VertexInputFormat;
import org.apache.giraph.io.VertexOutputFormat;
+import org.apache.giraph.io.superstep_output.MultiThreadedSuperstepOutput;
+import org.apache.giraph.io.superstep_output.NoOpSuperstepOutput;
+import org.apache.giraph.io.superstep_output.SuperstepOutput;
+import org.apache.giraph.io.superstep_output.SynchronizedSuperstepOutput;
import org.apache.giraph.job.GiraphJobObserver;
import org.apache.giraph.master.MasterCompute;
import org.apache.giraph.master.MasterObserver;
@@ -50,6 +54,7 @@ import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.io.NullWritable;
import org.apache.hadoop.io.Writable;
import org.apache.hadoop.io.WritableComparable;
+import org.apache.hadoop.mapreduce.Mapper;
import org.apache.hadoop.util.Progressable;
import static org.apache.giraph.conf.GiraphConstants.USE_UNSAFE_SERIALIZATION;
@@ -201,6 +206,25 @@ public class ImmutableClassesGiraphConfiguration<I extends WritableComparable,
}
/**
+ * Create the proper superstep output, based on the configuration settings.
+ *
+ * @param context Mapper context
+ * @return SuperstepOutput
+ */
+ public SuperstepOutput<I, V, E> createSuperstepOutput(
+ Mapper<?, ?, ?, ?>.Context context) {
+ if (doOutputDuringComputation()) {
+ if (vertexOutputFormatThreadSafe()) {
+ return new MultiThreadedSuperstepOutput<I, V, E>(this, context);
+ } else {
+ return new SynchronizedSuperstepOutput<I, V, E>(this, context);
+ }
+ } else {
+ return new NoOpSuperstepOutput<I, V, E>();
+ }
+ }
+
+ /**
* Does the job have an {@link EdgeInputFormat}?
*
* @return True iff an {@link EdgeInputFormat} has been specified.
http://git-wip-us.apache.org/repos/asf/giraph/blob/67f5f747/giraph-core/src/main/java/org/apache/giraph/graph/ComputeCallable.java
----------------------------------------------------------------------
diff --git a/giraph-core/src/main/java/org/apache/giraph/graph/ComputeCallable.java b/giraph-core/src/main/java/org/apache/giraph/graph/ComputeCallable.java
index 4840471..51ed4f6 100644
--- a/giraph-core/src/main/java/org/apache/giraph/graph/ComputeCallable.java
+++ b/giraph-core/src/main/java/org/apache/giraph/graph/ComputeCallable.java
@@ -22,6 +22,7 @@ import org.apache.giraph.comm.WorkerClientRequestProcessor;
import org.apache.giraph.comm.messages.MessageStoreByPartition;
import org.apache.giraph.comm.netty.NettyWorkerClientRequestProcessor;
import org.apache.giraph.conf.ImmutableClassesGiraphConfiguration;
+import org.apache.giraph.io.SimpleVertexWriter;
import org.apache.giraph.metrics.GiraphMetrics;
import org.apache.giraph.metrics.MetricNames;
import org.apache.giraph.metrics.SuperstepMetricsRegistry;
@@ -89,6 +90,8 @@ public class ComputeCallable<I extends WritableComparable, V extends Writable,
/** Sends the messages (unique per Callable) */
private WorkerClientRequestProcessor<I, V, E, M>
workerClientRequestProcessor;
+ /** VertexWriter for this ComputeCallable */
+ private SimpleVertexWriter<I, V, E> vertexWriter;
/** Get the start time in nanos */
private final long startNanos = TIME.getNanoseconds();
@@ -143,6 +146,8 @@ public class ComputeCallable<I extends WritableComparable, V extends Writable,
context, graphState.getGraphTaskManager(), workerClientRequestProcessor,
aggregatorUsage);
+ vertexWriter = serviceWorker.getSuperstepOutput().getVertexWriter();
+
List<PartitionStats> partitionStatsList = Lists.newArrayList();
while (!partitionIdQueue.isEmpty()) {
Integer partitionId = partitionIdQueue.poll();
@@ -165,11 +170,17 @@ public class ComputeCallable<I extends WritableComparable, V extends Writable,
} catch (IOException e) {
throw new IllegalStateException("call: Caught unexpected IOException," +
" failing.", e);
+ } catch (InterruptedException e) {
+ throw new IllegalStateException("call: Caught unexpected " +
+ "InterruptedException, failing.", e);
} finally {
serviceWorker.getPartitionStore().putPartition(partition);
}
}
+ // Return VertexWriter after the usage
+ serviceWorker.getSuperstepOutput().returnVertexWriter(vertexWriter);
+
if (LOG.isInfoEnabled()) {
float seconds = Times.getNanosSince(TIME, startNanos) /
Time.NS_PER_SECOND_AS_FLOAT;
@@ -193,7 +204,7 @@ public class ComputeCallable<I extends WritableComparable, V extends Writable,
* @return Partition stats for this computed partition
*/
private PartitionStats computePartition(Partition<I, V, E, M> partition)
- throws IOException {
+ throws IOException, InterruptedException {
PartitionStats partitionStats =
new PartitionStats(partition.getId(), 0, 0, 0, 0);
// Make sure this is thread-safe across runs
@@ -225,6 +236,8 @@ public class ComputeCallable<I extends WritableComparable, V extends Writable,
}
// Need to unwrap the mutated edges (possibly)
vertex.unwrapMutableEdges();
+ // Write vertex to superstep output (no-op if it is not used)
+ vertexWriter.writeVertex(vertex);
// Need to save the vertex changes (possibly)
partition.saveVertex(vertex);
}
http://git-wip-us.apache.org/repos/asf/giraph/blob/67f5f747/giraph-core/src/main/java/org/apache/giraph/graph/GraphTaskManager.java
----------------------------------------------------------------------
diff --git a/giraph-core/src/main/java/org/apache/giraph/graph/GraphTaskManager.java b/giraph-core/src/main/java/org/apache/giraph/graph/GraphTaskManager.java
index 57f7dff..3ae5ed3 100644
--- a/giraph-core/src/main/java/org/apache/giraph/graph/GraphTaskManager.java
+++ b/giraph-core/src/main/java/org/apache/giraph/graph/GraphTaskManager.java
@@ -288,9 +288,10 @@ public class GraphTaskManager<I extends WritableComparable, V extends Writable,
/**
* Handle post-application callbacks.
*/
- private void postApplication() {
+ private void postApplication() throws IOException, InterruptedException {
GiraphTimerContext postAppTimerContext = wcPostAppTimer.time();
serviceWorker.getWorkerContext().postApplication();
+ serviceWorker.getSuperstepOutput().postApplication();
postAppTimerContext.stop();
context.progress();
http://git-wip-us.apache.org/repos/asf/giraph/blob/67f5f747/giraph-core/src/main/java/org/apache/giraph/io/SimpleVertexWriter.java
----------------------------------------------------------------------
diff --git a/giraph-core/src/main/java/org/apache/giraph/io/SimpleVertexWriter.java b/giraph-core/src/main/java/org/apache/giraph/io/SimpleVertexWriter.java
new file mode 100644
index 0000000..e4c3496
--- /dev/null
+++ b/giraph-core/src/main/java/org/apache/giraph/io/SimpleVertexWriter.java
@@ -0,0 +1,45 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.giraph.io;
+
+import org.apache.giraph.graph.Vertex;
+import org.apache.hadoop.io.Writable;
+import org.apache.hadoop.io.WritableComparable;
+
+import java.io.IOException;
+
+/**
+ * Interface which can only write vertices
+ *
+ * @param <I> Vertex id
+ * @param <V> Vertex value
+ * @param <E> Edge value
+ */
+public interface SimpleVertexWriter<I extends WritableComparable,
+ V extends Writable, E extends Writable> {
+ /**
+ * Writes the next vertex and associated data
+ *
+ * @param vertex set the properties of this vertex
+ * @throws IOException
+ * @throws InterruptedException
+ */
+ void writeVertex(Vertex<I, V, E, ?> vertex) throws IOException,
+ InterruptedException;
+}
http://git-wip-us.apache.org/repos/asf/giraph/blob/67f5f747/giraph-core/src/main/java/org/apache/giraph/io/VertexWriter.java
----------------------------------------------------------------------
diff --git a/giraph-core/src/main/java/org/apache/giraph/io/VertexWriter.java b/giraph-core/src/main/java/org/apache/giraph/io/VertexWriter.java
index 38c5548..2aa3a71 100644
--- a/giraph-core/src/main/java/org/apache/giraph/io/VertexWriter.java
+++ b/giraph-core/src/main/java/org/apache/giraph/io/VertexWriter.java
@@ -20,7 +20,6 @@ package org.apache.giraph.io;
import java.io.IOException;
-import org.apache.giraph.graph.Vertex;
import org.apache.hadoop.io.Writable;
import org.apache.hadoop.io.WritableComparable;
import org.apache.hadoop.mapreduce.TaskAttemptContext;
@@ -34,7 +33,7 @@ import org.apache.hadoop.mapreduce.TaskAttemptContext;
*/
@SuppressWarnings("rawtypes")
public interface VertexWriter<I extends WritableComparable, V extends Writable,
- E extends Writable> {
+ E extends Writable> extends SimpleVertexWriter<I, V, E> {
/**
* Use the context to setup writing the vertices.
* Guaranteed to be called prior to any other function.
@@ -47,16 +46,6 @@ public interface VertexWriter<I extends WritableComparable, V extends Writable,
InterruptedException;
/**
- * Writes the next vertex and associated data
- *
- * @param vertex set the properties of this vertex
- * @throws IOException
- * @throws InterruptedException
- */
- void writeVertex(Vertex<I, V, E, ?> vertex)
- throws IOException, InterruptedException;
-
- /**
* Close this {@link VertexWriter} to future operations.
*
* @param context the context of the task
http://git-wip-us.apache.org/repos/asf/giraph/blob/67f5f747/giraph-core/src/main/java/org/apache/giraph/io/superstep_output/MultiThreadedSuperstepOutput.java
----------------------------------------------------------------------
diff --git a/giraph-core/src/main/java/org/apache/giraph/io/superstep_output/MultiThreadedSuperstepOutput.java b/giraph-core/src/main/java/org/apache/giraph/io/superstep_output/MultiThreadedSuperstepOutput.java
new file mode 100644
index 0000000..a09f915
--- /dev/null
+++ b/giraph-core/src/main/java/org/apache/giraph/io/superstep_output/MultiThreadedSuperstepOutput.java
@@ -0,0 +1,119 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.giraph.io.superstep_output;
+
+import org.apache.giraph.conf.ImmutableClassesGiraphConfiguration;
+import org.apache.giraph.io.SimpleVertexWriter;
+import org.apache.giraph.io.VertexOutputFormat;
+import org.apache.giraph.io.VertexWriter;
+import org.apache.hadoop.io.Writable;
+import org.apache.hadoop.io.WritableComparable;
+import org.apache.hadoop.mapreduce.Mapper;
+
+import com.google.common.collect.Lists;
+import com.google.common.collect.Sets;
+
+import java.io.IOException;
+import java.util.List;
+import java.util.Set;
+
+/**
+ * Class to use as {@link SuperstepOutput} when chosen VertexOutputFormat is
+ * thread-safe.
+ *
+ * @param <I> Vertex id
+ * @param <V> Vertex value
+ * @param <E> Edge value
+ */
+public class MultiThreadedSuperstepOutput<I extends WritableComparable,
+ V extends Writable, E extends Writable> implements
+ SuperstepOutput<I, V, E> {
+ /** Mapper context */
+ private final Mapper<?, ?, ?, ?>.Context context;
+ /** Vertex output format, used to get new vertex writers */
+ private final VertexOutputFormat<I, V, E> vertexOutputFormat;
+ /**
+ * List of returned vertex writers, these can be reused and will all be
+ * closed in the end of the application
+ */
+ private final List<VertexWriter<I, V, E>> availableVertexWriters;
+ /** Vertex writes which were created by this class and are currently used */
+ private final Set<VertexWriter<I, V, E>> occupiedVertexWriters;
+
+ /**
+ * Constructor
+ *
+ * @param conf Configuration
+ * @param context Mapper context
+ */
+ public MultiThreadedSuperstepOutput(
+ ImmutableClassesGiraphConfiguration<I, V, E, ?> conf,
+ Mapper<?, ?, ?, ?>.Context context) {
+ vertexOutputFormat = conf.createVertexOutputFormat();
+ this.context = context;
+ availableVertexWriters = Lists.newArrayList();
+ occupiedVertexWriters = Sets.newHashSet();
+ }
+
+ @Override
+ public synchronized SimpleVertexWriter<I, V, E> getVertexWriter() {
+ VertexWriter<I, V, E> vertexWriter;
+ if (availableVertexWriters.isEmpty()) {
+ try {
+ vertexWriter = vertexOutputFormat.createVertexWriter(context);
+ vertexWriter.initialize(context);
+ } catch (IOException e) {
+ throw new IllegalStateException("getVertexWriter: " +
+ "IOException occurred", e);
+ } catch (InterruptedException e) {
+ throw new IllegalStateException("getVertexWriter: " +
+ "InterruptedException occurred", e);
+ }
+ } else {
+ vertexWriter =
+ availableVertexWriters.remove(availableVertexWriters.size() - 1);
+ }
+ occupiedVertexWriters.add(vertexWriter);
+ return vertexWriter;
+ }
+
+ @Override
+ public synchronized void returnVertexWriter(
+ SimpleVertexWriter<I, V, E> vertexWriter) {
+ VertexWriter<I, V, E> returnedWriter = (VertexWriter<I, V, E>) vertexWriter;
+ if (!occupiedVertexWriters.remove(returnedWriter)) {
+ throw new IllegalStateException("returnVertexWriter: " +
+ "Returned vertex writer which is not currently occupied!");
+ }
+ availableVertexWriters.add(returnedWriter);
+ }
+
+ @Override
+ public synchronized void postApplication() throws IOException,
+ InterruptedException {
+ if (!occupiedVertexWriters.isEmpty()) {
+ throw new IllegalStateException("postApplication: " +
+ occupiedVertexWriters.size() +
+ " vertex writers were not returned!");
+ }
+ for (VertexWriter<I, V, E> vertexWriter : availableVertexWriters) {
+ vertexWriter.close(context);
+ }
+ }
+}
http://git-wip-us.apache.org/repos/asf/giraph/blob/67f5f747/giraph-core/src/main/java/org/apache/giraph/io/superstep_output/NoOpSuperstepOutput.java
----------------------------------------------------------------------
diff --git a/giraph-core/src/main/java/org/apache/giraph/io/superstep_output/NoOpSuperstepOutput.java b/giraph-core/src/main/java/org/apache/giraph/io/superstep_output/NoOpSuperstepOutput.java
new file mode 100644
index 0000000..82684b2
--- /dev/null
+++ b/giraph-core/src/main/java/org/apache/giraph/io/superstep_output/NoOpSuperstepOutput.java
@@ -0,0 +1,57 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.giraph.io.superstep_output;
+
+import org.apache.giraph.graph.Vertex;
+import org.apache.giraph.io.SimpleVertexWriter;
+import org.apache.hadoop.io.Writable;
+import org.apache.hadoop.io.WritableComparable;
+
+import java.io.IOException;
+
+/**
+ * Class to use as {@link SuperstepOutput} when we don't have output during
+ * computation. All the methods are no-ops.
+ *
+ * @param <I> Vertex id
+ * @param <V> Vertex value
+ * @param <E> Edge value
+ */
+public class NoOpSuperstepOutput<I extends WritableComparable,
+ V extends Writable, E extends Writable> implements
+ SuperstepOutput<I, V, E> {
+ @Override
+ public SimpleVertexWriter<I, V, E> getVertexWriter() {
+ return new SimpleVertexWriter<I, V, E>() {
+ @Override
+ public void writeVertex(Vertex<I, V, E, ?> vertex) throws IOException,
+ InterruptedException {
+ }
+ };
+ }
+
+ @Override
+ public void returnVertexWriter(
+ SimpleVertexWriter<I, V, E> vertexWriter) {
+ }
+
+ @Override
+ public void postApplication() {
+ }
+}
http://git-wip-us.apache.org/repos/asf/giraph/blob/67f5f747/giraph-core/src/main/java/org/apache/giraph/io/superstep_output/SuperstepOutput.java
----------------------------------------------------------------------
diff --git a/giraph-core/src/main/java/org/apache/giraph/io/superstep_output/SuperstepOutput.java b/giraph-core/src/main/java/org/apache/giraph/io/superstep_output/SuperstepOutput.java
new file mode 100644
index 0000000..107039a
--- /dev/null
+++ b/giraph-core/src/main/java/org/apache/giraph/io/superstep_output/SuperstepOutput.java
@@ -0,0 +1,57 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.giraph.io.superstep_output;
+
+import org.apache.giraph.io.SimpleVertexWriter;
+import org.apache.hadoop.io.Writable;
+import org.apache.hadoop.io.WritableComparable;
+
+import java.io.IOException;
+
+/**
+ * Interface for outputing data during the computation.
+ *
+ * @param <I> Vertex id
+ * @param <V> Vertex value
+ * @param <E> Edge value
+ */
+public interface SuperstepOutput<I extends WritableComparable,
+ V extends Writable, E extends Writable> {
+
+ /**
+ * Get the Writer. You have to return it after usage in order for it to be
+ * properly closed.
+ *
+ * @return SimpleVertexWriter
+ */
+ SimpleVertexWriter<I, V, E> getVertexWriter();
+
+ /**
+ * Return the Writer after usage, which you got by calling
+ * {@link #getVertexWriter()}
+ *
+ * @param vertexWriter SimpleVertexWriter which you are returning
+ */
+ void returnVertexWriter(SimpleVertexWriter<I, V, E> vertexWriter);
+
+ /**
+ * Finalize this output in the end of the application
+ */
+ void postApplication() throws IOException, InterruptedException;
+}
http://git-wip-us.apache.org/repos/asf/giraph/blob/67f5f747/giraph-core/src/main/java/org/apache/giraph/io/superstep_output/SynchronizedSuperstepOutput.java
----------------------------------------------------------------------
diff --git a/giraph-core/src/main/java/org/apache/giraph/io/superstep_output/SynchronizedSuperstepOutput.java b/giraph-core/src/main/java/org/apache/giraph/io/superstep_output/SynchronizedSuperstepOutput.java
new file mode 100644
index 0000000..9617b24
--- /dev/null
+++ b/giraph-core/src/main/java/org/apache/giraph/io/superstep_output/SynchronizedSuperstepOutput.java
@@ -0,0 +1,95 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.giraph.io.superstep_output;
+
+import org.apache.giraph.conf.ImmutableClassesGiraphConfiguration;
+import org.apache.giraph.graph.Vertex;
+import org.apache.giraph.io.SimpleVertexWriter;
+import org.apache.giraph.io.VertexWriter;
+import org.apache.hadoop.io.Writable;
+import org.apache.hadoop.io.WritableComparable;
+import org.apache.hadoop.mapreduce.Mapper;
+
+import java.io.IOException;
+
+/**
+ * Class to use as {@link SuperstepOutput} when chosen VertexOutputFormat is
+ * not thread-safe.
+ *
+ * @param <I> Vertex id
+ * @param <V> Vertex value
+ * @param <E> Edge value
+ */
+public class SynchronizedSuperstepOutput<I extends WritableComparable,
+ V extends Writable, E extends Writable> implements
+ SuperstepOutput<I, V, E> {
+ /** Mapper context */
+ private final Mapper<?, ?, ?, ?>.Context context;
+ /** Main vertex writer */
+ private final VertexWriter<I, V, E> vertexWriter;
+ /**
+ * Simple vertex writer, wrapper for {@link #vertexWriter}.
+ * Call to writeVertex is thread-safe.
+ */
+ private final SimpleVertexWriter<I, V, E> simpleVertexWriter;
+
+ /**
+ * Constructor
+ *
+ * @param conf Configuration
+ * @param context Mapper context
+ */
+ public SynchronizedSuperstepOutput(
+ ImmutableClassesGiraphConfiguration<I, V, E, ?> conf,
+ Mapper<?, ?, ?, ?>.Context context) {
+ this.context = context;
+ try {
+ vertexWriter =
+ conf.createVertexOutputFormat().createVertexWriter(context);
+ vertexWriter.initialize(context);
+ } catch (IOException e) {
+ throw new IllegalStateException("SynchronizedSuperstepOutput: " +
+ "IOException occurred", e);
+ } catch (InterruptedException e) {
+ throw new IllegalStateException("SynchronizedSuperstepOutput: " +
+ "InterruptedException occurred", e);
+ }
+ simpleVertexWriter = new SimpleVertexWriter<I, V, E>() {
+ @Override
+ public synchronized void writeVertex(
+ Vertex<I, V, E, ?> vertex) throws IOException, InterruptedException {
+ vertexWriter.writeVertex(vertex);
+ }
+ };
+ }
+
+ @Override
+ public SimpleVertexWriter<I, V, E> getVertexWriter() {
+ return simpleVertexWriter;
+ }
+
+ @Override
+ public void returnVertexWriter(SimpleVertexWriter<I, V, E> vertexWriter) {
+ }
+
+ @Override
+ public void postApplication() throws IOException, InterruptedException {
+ vertexWriter.close(context);
+ }
+}
http://git-wip-us.apache.org/repos/asf/giraph/blob/67f5f747/giraph-core/src/main/java/org/apache/giraph/io/superstep_output/package-info.java
----------------------------------------------------------------------
diff --git a/giraph-core/src/main/java/org/apache/giraph/io/superstep_output/package-info.java b/giraph-core/src/main/java/org/apache/giraph/io/superstep_output/package-info.java
new file mode 100644
index 0000000..6523c74
--- /dev/null
+++ b/giraph-core/src/main/java/org/apache/giraph/io/superstep_output/package-info.java
@@ -0,0 +1,21 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+/**
+ * Classes related to output during computation
+ */
+package org.apache.giraph.io.superstep_output;
http://git-wip-us.apache.org/repos/asf/giraph/blob/67f5f747/giraph-core/src/main/java/org/apache/giraph/worker/BspServiceWorker.java
----------------------------------------------------------------------
diff --git a/giraph-core/src/main/java/org/apache/giraph/worker/BspServiceWorker.java b/giraph-core/src/main/java/org/apache/giraph/worker/BspServiceWorker.java
index 74c1f87..35db999 100644
--- a/giraph-core/src/main/java/org/apache/giraph/worker/BspServiceWorker.java
+++ b/giraph-core/src/main/java/org/apache/giraph/worker/BspServiceWorker.java
@@ -41,6 +41,7 @@ import org.apache.giraph.graph.InputSplitEvents;
import org.apache.giraph.graph.FinishedSuperstepStats;
import org.apache.giraph.graph.AddressesAndPartitionsWritable;
import org.apache.giraph.graph.GlobalStats;
+import org.apache.giraph.io.superstep_output.SuperstepOutput;
import org.apache.giraph.utils.JMapHistoDumper;
import org.apache.giraph.graph.Vertex;
import org.apache.giraph.io.VertexOutputFormat;
@@ -149,6 +150,9 @@ public class BspServiceWorker<I extends WritableComparable,
/** Handler for aggregators */
private final WorkerAggregatorHandler aggregatorHandler;
+ /** Superstep output */
+ private SuperstepOutput<I, V, E> superstepOutput;
+
/** array of observers to call back to */
private final WorkerObserver[] observers;
@@ -175,7 +179,7 @@ public class BspServiceWorker<I extends WritableComparable,
GraphTaskManager<I, V, E, M> graphTaskManager)
throws IOException, InterruptedException {
super(serverPortList, sessionMsecTimeout, context, graphTaskManager);
- ImmutableClassesGiraphConfiguration conf = getConfiguration();
+ ImmutableClassesGiraphConfiguration<I, V, E, M> conf = getConfiguration();
partitionExchangeChildrenChanged = new PredicateLock(context);
registerBspEvent(partitionExchangeChildrenChanged);
workerGraphPartitioner =
@@ -193,6 +197,8 @@ public class BspServiceWorker<I extends WritableComparable,
aggregatorHandler = new WorkerAggregatorHandler(this, conf, context);
+ superstepOutput = conf.createSuperstepOutput(context);
+
if (conf.isJMapHistogramDumpEnabled()) {
conf.addWorkerObserverClass(JMapHistoDumper.class);
}
@@ -926,6 +932,14 @@ else[HADOOP_NON_SECURE]*/
" not specified -- there will be no saved output");
return;
}
+ if (getConfiguration().doOutputDuringComputation()) {
+ if (LOG.isInfoEnabled()) {
+ LOG.info("saveVertices: The option for doing output during " +
+ "computation is selected, so there will be no saving of the " +
+ "output in the end of application");
+ }
+ return;
+ }
LoggerUtils.setStatusAndLog(getContext(), LOG, Level.INFO,
"saveVertices: Starting to save " + numLocalVertices + " vertices");
@@ -1464,4 +1478,9 @@ else[HADOOP_NON_SECURE]*/
aggregatorHandler.prepareSuperstep(workerAggregatorRequestProcessor);
}
}
+
+ @Override
+ public SuperstepOutput<I, V, E> getSuperstepOutput() {
+ return superstepOutput;
+ }
}