You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@hama.apache.org by tj...@apache.org on 2013/02/28 08:53:33 UTC

svn commit: r1451131 - in /hama/trunk: ./ graph/src/main/java/org/apache/hama/graph/

Author: tjungblut
Date: Thu Feb 28 07:53:32 2013
New Revision: 1451131

URL: http://svn.apache.org/r1451131
Log:
[HAMA-737]: Add graph output writer

Added:
    hama/trunk/graph/src/main/java/org/apache/hama/graph/DefaultVertexOutputWriter.java
    hama/trunk/graph/src/main/java/org/apache/hama/graph/VertexOutputWriter.java
Modified:
    hama/trunk/CHANGES.txt
    hama/trunk/graph/src/main/java/org/apache/hama/graph/GraphJob.java
    hama/trunk/graph/src/main/java/org/apache/hama/graph/GraphJobRunner.java

Modified: hama/trunk/CHANGES.txt
URL: http://svn.apache.org/viewvc/hama/trunk/CHANGES.txt?rev=1451131&r1=1451130&r2=1451131&view=diff
==============================================================================
--- hama/trunk/CHANGES.txt (original)
+++ hama/trunk/CHANGES.txt Thu Feb 28 07:53:32 2013
@@ -4,6 +4,7 @@ Release 0.7 (unreleased changes)
 
   NEW FEATURES
 
+   HAMA-737: Add graph output writer (tjungblut)	   
    HAMA-736: Add one pass graph generator (tjungblut)
    HAMA-722: Messaging queue should construct sender and receiver queue. (surajsmenon)
    HAMA-721: Added spilling queue with combiner. (surajsmenon)

Added: hama/trunk/graph/src/main/java/org/apache/hama/graph/DefaultVertexOutputWriter.java
URL: http://svn.apache.org/viewvc/hama/trunk/graph/src/main/java/org/apache/hama/graph/DefaultVertexOutputWriter.java?rev=1451131&view=auto
==============================================================================
--- hama/trunk/graph/src/main/java/org/apache/hama/graph/DefaultVertexOutputWriter.java (added)
+++ hama/trunk/graph/src/main/java/org/apache/hama/graph/DefaultVertexOutputWriter.java Thu Feb 28 07:53:32 2013
@@ -0,0 +1,51 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hama.graph;
+
+import java.io.IOException;
+
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.io.Writable;
+import org.apache.hadoop.io.WritableComparable;
+import org.apache.hama.bsp.BSPPeer;
+
+/**
+ * The VertexOutputWriter defines what parts of the vertex shall be written to
+ * the output format.
+ * 
+ * @param <V> the vertexID type.
+ * @param <E> the edge value type.
+ * @param <M> the vertex value type.
+ */
+@SuppressWarnings("rawtypes")
+public class DefaultVertexOutputWriter<V extends WritableComparable, E extends Writable, M extends Writable>
+    implements VertexOutputWriter<V, M, V, E, M> {
+
+  @Override
+  public void setup(Configuration conf) {
+    // do nothing
+  }
+
+  @Override
+  public void write(Vertex<V, E, M> vertex,
+      BSPPeer<Writable, Writable, V, M, GraphJobMessage> peer)
+      throws IOException {
+    peer.write(vertex.getVertexID(), vertex.getValue());
+  }
+
+}

Modified: hama/trunk/graph/src/main/java/org/apache/hama/graph/GraphJob.java
URL: http://svn.apache.org/viewvc/hama/trunk/graph/src/main/java/org/apache/hama/graph/GraphJob.java?rev=1451131&r1=1451130&r2=1451131&view=diff
==============================================================================
--- hama/trunk/graph/src/main/java/org/apache/hama/graph/GraphJob.java (original)
+++ hama/trunk/graph/src/main/java/org/apache/hama/graph/GraphJob.java Thu Feb 28 07:53:32 2013
@@ -43,6 +43,7 @@ public class GraphJob extends BSPJob {
   public final static String VERTEX_VALUE_CLASS_ATTR = "hama.graph.vertex.value.class";
   public final static String VERTEX_EDGE_VALUE_CLASS_ATTR = "hama.graph.vertex.edge.value.class";
 
+  public final static String VERTEX_OUTPUT_WRITER_CLASS_ATTR = "hama.graph.vertex.output.writer.class";
   public final static String AGGREGATOR_CLASS_ATTR = "hama.graph.aggregator.class";
   public final static String VERTEX_MESSAGE_COMBINER_CLASS_ATTR = "hama.vertex.message.combiner.class";
 
@@ -123,13 +124,24 @@ public class GraphJob extends BSPJob {
    * Sets the input reader for parsing the input to vertices.
    */
   public void setVertexInputReaderClass(
-      Class<? extends VertexInputReader<?, ?, ?, ?, ?>> cls) {
+      @SuppressWarnings("rawtypes") Class<? extends VertexInputReader> cls) {
     ensureState(JobState.DEFINE);
     conf.setClass(Constants.RUNTIME_PARTITION_RECORDCONVERTER, cls,
         RecordConverter.class);
     conf.setBoolean(Constants.ENABLE_RUNTIME_PARTITIONING, true);
   }
 
+  /**
+   * Sets the output writer for materializing vertices to the output sink. If
+   * not set, the default DefaultVertexOutputWriter will be used.
+   */
+  public void setVertexOutputWriterClass(
+      @SuppressWarnings("rawtypes") Class<? extends VertexOutputWriter> cls) {
+    ensureState(JobState.DEFINE);
+    conf.setClass(VERTEX_OUTPUT_WRITER_CLASS_ATTR, cls,
+        VertexOutputWriter.class);
+  }
+
   @SuppressWarnings("unchecked")
   public Class<? extends Vertex<? extends Writable, ? extends Writable, ? extends Writable>> getVertexClass() {
     return (Class<? extends Vertex<? extends Writable, ? extends Writable, ? extends Writable>>) conf
@@ -180,6 +192,10 @@ public class GraphJob extends BSPJob {
                 Constants.RUNTIME_PARTITION_RECORDCONVERTER) != null,
             "Please provide a converter class for your vertex by using GraphJob#setVertexInputReaderClass!");
 
+    if (this.getConfiguration().get(VERTEX_OUTPUT_WRITER_CLASS_ATTR) == null) {
+      this.setVertexOutputWriterClass(DefaultVertexOutputWriter.class);
+    }
+
     // add the default message queue to the sorted one
     this.getConfiguration().setClass(MessageManager.QUEUE_TYPE_CLASS,
         SortedMessageQueue.class, MessageQueue.class);

Modified: hama/trunk/graph/src/main/java/org/apache/hama/graph/GraphJobRunner.java
URL: http://svn.apache.org/viewvc/hama/trunk/graph/src/main/java/org/apache/hama/graph/GraphJobRunner.java?rev=1451131&r1=1451130&r2=1451131&view=diff
==============================================================================
--- hama/trunk/graph/src/main/java/org/apache/hama/graph/GraphJobRunner.java (original)
+++ hama/trunk/graph/src/main/java/org/apache/hama/graph/GraphJobRunner.java Thu Feb 28 07:53:32 2013
@@ -85,6 +85,7 @@ public final class GraphJobRunner<V exte
   private long iteration;
 
   private AggregationRunner<V, E, M> aggregationRunner;
+  private VertexOutputWriter<Writable, Writable, V, E, M> vertexOutputWriter;
 
   private BSPPeer<Writable, Writable, Writable, Writable, GraphJobMessage> peer;
 
@@ -143,10 +144,10 @@ public final class GraphJobRunner<V exte
   public final void cleanup(
       BSPPeer<Writable, Writable, Writable, Writable, GraphJobMessage> peer)
       throws IOException {
+    vertexOutputWriter.setup(conf);
     IDSkippingIterator<V, E, M> skippingIterator = vertices.skippingIterator();
     while (skippingIterator.hasNext()) {
-      Vertex<V, E, M> e = skippingIterator.next();
-      peer.write(e.getVertexID(), e.getValue());
+      vertexOutputWriter.write(skippingIterator.next(), peer);
     }
     vertices.cleanup(conf, peer.getTaskId());
   }
@@ -336,6 +337,11 @@ public final class GraphJobRunner<V exte
               Combiner.class), conf);
     }
 
+    Class<?> outputWriter = conf.getClass(
+        GraphJob.VERTEX_OUTPUT_WRITER_CLASS_ATTR, VertexOutputWriter.class);
+    vertexOutputWriter = (VertexOutputWriter<Writable, Writable, V, E, M>) ReflectionUtils
+        .newInstance(outputWriter);
+
     aggregationRunner = new AggregationRunner<V, E, M>();
     aggregationRunner.setupAggregators(peer);
 

Added: hama/trunk/graph/src/main/java/org/apache/hama/graph/VertexOutputWriter.java
URL: http://svn.apache.org/viewvc/hama/trunk/graph/src/main/java/org/apache/hama/graph/VertexOutputWriter.java?rev=1451131&view=auto
==============================================================================
--- hama/trunk/graph/src/main/java/org/apache/hama/graph/VertexOutputWriter.java (added)
+++ hama/trunk/graph/src/main/java/org/apache/hama/graph/VertexOutputWriter.java Thu Feb 28 07:53:32 2013
@@ -0,0 +1,56 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hama.graph;
+
+import java.io.IOException;
+
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.io.Writable;
+import org.apache.hadoop.io.WritableComparable;
+import org.apache.hama.bsp.BSPPeer;
+
+/**
+ * The VertexOutputWriter defines what parts of the vertex shall be written to
+ * the output format.
+ * 
+ * @param <KEYOUT> the key output type.
+ * @param <VALUEOUT> the value output type.
+ * @param <V> the vertexID type.
+ * @param <E> the edge value type.
+ * @param <M> the vertex value type.
+ */
+@SuppressWarnings("rawtypes")
+public interface VertexOutputWriter<KEYOUT extends Writable, VALUEOUT extends Writable, V extends WritableComparable, E extends Writable, M extends Writable> {
+
+  /**
+   * Used to setup this output writer. This will be called right before the
+   * first call to write.
+   */
+  public void setup(Configuration conf);
+
+  /**
+   * Write is called for every vertex in the list.
+   * 
+   * @param vertex the user defined vertex.
+   * @param peer the peer that has methods to write to the output sink.
+   */
+  public void write(Vertex<V, E, M> vertex,
+      BSPPeer<Writable, Writable, KEYOUT, VALUEOUT, GraphJobMessage> peer)
+      throws IOException;
+
+}