You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@hama.apache.org by tj...@apache.org on 2013/02/28 08:53:33 UTC
svn commit: r1451131 - in /hama/trunk: ./
graph/src/main/java/org/apache/hama/graph/
Author: tjungblut
Date: Thu Feb 28 07:53:32 2013
New Revision: 1451131
URL: http://svn.apache.org/r1451131
Log:
[HAMA-737]: Add graph output writer
Added:
hama/trunk/graph/src/main/java/org/apache/hama/graph/DefaultVertexOutputWriter.java
hama/trunk/graph/src/main/java/org/apache/hama/graph/VertexOutputWriter.java
Modified:
hama/trunk/CHANGES.txt
hama/trunk/graph/src/main/java/org/apache/hama/graph/GraphJob.java
hama/trunk/graph/src/main/java/org/apache/hama/graph/GraphJobRunner.java
Modified: hama/trunk/CHANGES.txt
URL: http://svn.apache.org/viewvc/hama/trunk/CHANGES.txt?rev=1451131&r1=1451130&r2=1451131&view=diff
==============================================================================
--- hama/trunk/CHANGES.txt (original)
+++ hama/trunk/CHANGES.txt Thu Feb 28 07:53:32 2013
@@ -4,6 +4,7 @@ Release 0.7 (unreleased changes)
NEW FEATURES
+ HAMA-737: Add graph output writer (tjungblut)
HAMA-736: Add one pass graph generator (tjungblut)
HAMA-722: Messaging queue should construct sender and receiver queue. (surajsmenon)
HAMA-721: Added spilling queue with combiner. (surajsmenon)
Added: hama/trunk/graph/src/main/java/org/apache/hama/graph/DefaultVertexOutputWriter.java
URL: http://svn.apache.org/viewvc/hama/trunk/graph/src/main/java/org/apache/hama/graph/DefaultVertexOutputWriter.java?rev=1451131&view=auto
==============================================================================
--- hama/trunk/graph/src/main/java/org/apache/hama/graph/DefaultVertexOutputWriter.java (added)
+++ hama/trunk/graph/src/main/java/org/apache/hama/graph/DefaultVertexOutputWriter.java Thu Feb 28 07:53:32 2013
@@ -0,0 +1,51 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hama.graph;
+
+import java.io.IOException;
+
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.io.Writable;
+import org.apache.hadoop.io.WritableComparable;
+import org.apache.hama.bsp.BSPPeer;
+
+/**
+ * The VertexOutputWriter defines what parts of the vertex shall be written to
+ * the output format.
+ *
+ * @param <V> the vertexID type.
+ * @param <E> the edge value type.
+ * @param <M> the vertex value type.
+ */
+@SuppressWarnings("rawtypes")
+public class DefaultVertexOutputWriter<V extends WritableComparable, E extends Writable, M extends Writable>
+ implements VertexOutputWriter<V, M, V, E, M> {
+
+ @Override
+ public void setup(Configuration conf) {
+ // do nothing
+ }
+
+ @Override
+ public void write(Vertex<V, E, M> vertex,
+ BSPPeer<Writable, Writable, V, M, GraphJobMessage> peer)
+ throws IOException {
+ peer.write(vertex.getVertexID(), vertex.getValue());
+ }
+
+}
Modified: hama/trunk/graph/src/main/java/org/apache/hama/graph/GraphJob.java
URL: http://svn.apache.org/viewvc/hama/trunk/graph/src/main/java/org/apache/hama/graph/GraphJob.java?rev=1451131&r1=1451130&r2=1451131&view=diff
==============================================================================
--- hama/trunk/graph/src/main/java/org/apache/hama/graph/GraphJob.java (original)
+++ hama/trunk/graph/src/main/java/org/apache/hama/graph/GraphJob.java Thu Feb 28 07:53:32 2013
@@ -43,6 +43,7 @@ public class GraphJob extends BSPJob {
public final static String VERTEX_VALUE_CLASS_ATTR = "hama.graph.vertex.value.class";
public final static String VERTEX_EDGE_VALUE_CLASS_ATTR = "hama.graph.vertex.edge.value.class";
+ public final static String VERTEX_OUTPUT_WRITER_CLASS_ATTR = "hama.graph.vertex.output.writer.class";
public final static String AGGREGATOR_CLASS_ATTR = "hama.graph.aggregator.class";
public final static String VERTEX_MESSAGE_COMBINER_CLASS_ATTR = "hama.vertex.message.combiner.class";
@@ -123,13 +124,24 @@ public class GraphJob extends BSPJob {
* Sets the input reader for parsing the input to vertices.
*/
public void setVertexInputReaderClass(
- Class<? extends VertexInputReader<?, ?, ?, ?, ?>> cls) {
+ @SuppressWarnings("rawtypes") Class<? extends VertexInputReader> cls) {
ensureState(JobState.DEFINE);
conf.setClass(Constants.RUNTIME_PARTITION_RECORDCONVERTER, cls,
RecordConverter.class);
conf.setBoolean(Constants.ENABLE_RUNTIME_PARTITIONING, true);
}
+ /**
+ * Sets the output writer for materializing vertices to the output sink. If
+ * not set, the default DefaultVertexOutputWriter will be used.
+ */
+ public void setVertexOutputWriterClass(
+ @SuppressWarnings("rawtypes") Class<? extends VertexOutputWriter> cls) {
+ ensureState(JobState.DEFINE);
+ conf.setClass(VERTEX_OUTPUT_WRITER_CLASS_ATTR, cls,
+ VertexOutputWriter.class);
+ }
+
@SuppressWarnings("unchecked")
public Class<? extends Vertex<? extends Writable, ? extends Writable, ? extends Writable>> getVertexClass() {
return (Class<? extends Vertex<? extends Writable, ? extends Writable, ? extends Writable>>) conf
@@ -180,6 +192,10 @@ public class GraphJob extends BSPJob {
Constants.RUNTIME_PARTITION_RECORDCONVERTER) != null,
"Please provide a converter class for your vertex by using GraphJob#setVertexInputReaderClass!");
+ if (this.getConfiguration().get(VERTEX_OUTPUT_WRITER_CLASS_ATTR) == null) {
+ this.setVertexOutputWriterClass(DefaultVertexOutputWriter.class);
+ }
+
// add the default message queue to the sorted one
this.getConfiguration().setClass(MessageManager.QUEUE_TYPE_CLASS,
SortedMessageQueue.class, MessageQueue.class);
Modified: hama/trunk/graph/src/main/java/org/apache/hama/graph/GraphJobRunner.java
URL: http://svn.apache.org/viewvc/hama/trunk/graph/src/main/java/org/apache/hama/graph/GraphJobRunner.java?rev=1451131&r1=1451130&r2=1451131&view=diff
==============================================================================
--- hama/trunk/graph/src/main/java/org/apache/hama/graph/GraphJobRunner.java (original)
+++ hama/trunk/graph/src/main/java/org/apache/hama/graph/GraphJobRunner.java Thu Feb 28 07:53:32 2013
@@ -85,6 +85,7 @@ public final class GraphJobRunner<V exte
private long iteration;
private AggregationRunner<V, E, M> aggregationRunner;
+ private VertexOutputWriter<Writable, Writable, V, E, M> vertexOutputWriter;
private BSPPeer<Writable, Writable, Writable, Writable, GraphJobMessage> peer;
@@ -143,10 +144,10 @@ public final class GraphJobRunner<V exte
public final void cleanup(
BSPPeer<Writable, Writable, Writable, Writable, GraphJobMessage> peer)
throws IOException {
+ vertexOutputWriter.setup(conf);
IDSkippingIterator<V, E, M> skippingIterator = vertices.skippingIterator();
while (skippingIterator.hasNext()) {
- Vertex<V, E, M> e = skippingIterator.next();
- peer.write(e.getVertexID(), e.getValue());
+ vertexOutputWriter.write(skippingIterator.next(), peer);
}
vertices.cleanup(conf, peer.getTaskId());
}
@@ -336,6 +337,11 @@ public final class GraphJobRunner<V exte
Combiner.class), conf);
}
+ Class<?> outputWriter = conf.getClass(
+ GraphJob.VERTEX_OUTPUT_WRITER_CLASS_ATTR, VertexOutputWriter.class);
+ vertexOutputWriter = (VertexOutputWriter<Writable, Writable, V, E, M>) ReflectionUtils
+ .newInstance(outputWriter);
+
aggregationRunner = new AggregationRunner<V, E, M>();
aggregationRunner.setupAggregators(peer);
Added: hama/trunk/graph/src/main/java/org/apache/hama/graph/VertexOutputWriter.java
URL: http://svn.apache.org/viewvc/hama/trunk/graph/src/main/java/org/apache/hama/graph/VertexOutputWriter.java?rev=1451131&view=auto
==============================================================================
--- hama/trunk/graph/src/main/java/org/apache/hama/graph/VertexOutputWriter.java (added)
+++ hama/trunk/graph/src/main/java/org/apache/hama/graph/VertexOutputWriter.java Thu Feb 28 07:53:32 2013
@@ -0,0 +1,56 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hama.graph;
+
+import java.io.IOException;
+
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.io.Writable;
+import org.apache.hadoop.io.WritableComparable;
+import org.apache.hama.bsp.BSPPeer;
+
+/**
+ * The VertexOutputWriter defines what parts of the vertex shall be written to
+ * the output format.
+ *
+ * @param <KEYOUT> the key output type.
+ * @param <VALUEOUT> the value output type.
+ * @param <V> the vertexID type.
+ * @param <E> the edge value type.
+ * @param <M> the vertex value type.
+ */
+@SuppressWarnings("rawtypes")
+public interface VertexOutputWriter<KEYOUT extends Writable, VALUEOUT extends Writable, V extends WritableComparable, E extends Writable, M extends Writable> {
+
+ /**
+ * Used to setup this output writer. This will be called right before the
+ * first call to write.
+ */
+ public void setup(Configuration conf);
+
+ /**
+ * Write is called for every vertex in the list.
+ *
+ * @param vertex the user defined vertex.
+ * @param peer the peer that has methods to write to the output sink.
+ */
+ public void write(Vertex<V, E, M> vertex,
+ BSPPeer<Writable, Writable, KEYOUT, VALUEOUT, GraphJobMessage> peer)
+ throws IOException;
+
+}