You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@hama.apache.org by tj...@apache.org on 2012/05/18 17:35:30 UTC

svn commit: r1340131 [2/2] - in /incubator/hama/trunk: ./ core/src/main/java/org/apache/hama/bsp/ core/src/main/java/org/apache/hama/bsp/sync/ core/src/main/java/org/apache/hama/util/ core/src/test/java/org/apache/hama/bsp/ examples/src/main/java/org/a...

Modified: incubator/hama/trunk/graph/src/main/java/org/apache/hama/graph/MinAggregator.java
URL: http://svn.apache.org/viewvc/incubator/hama/trunk/graph/src/main/java/org/apache/hama/graph/MinAggregator.java?rev=1340131&r1=1340130&r2=1340131&view=diff
==============================================================================
--- incubator/hama/trunk/graph/src/main/java/org/apache/hama/graph/MinAggregator.java (original)
+++ incubator/hama/trunk/graph/src/main/java/org/apache/hama/graph/MinAggregator.java Fri May 18 15:35:28 2012
@@ -23,12 +23,14 @@ public class MinAggregator extends Abstr
 
   int min = Integer.MAX_VALUE;
 
+  @Override
   public void aggregate(IntWritable value) {
     if (value.get() < min) {
       min = value.get();
     }
   }
 
+  @Override
   public IntWritable getValue() {
     return new IntWritable(min);
   }

Modified: incubator/hama/trunk/graph/src/main/java/org/apache/hama/graph/Vertex.java
URL: http://svn.apache.org/viewvc/incubator/hama/trunk/graph/src/main/java/org/apache/hama/graph/Vertex.java?rev=1340131&r1=1340130&r2=1340131&view=diff
==============================================================================
--- incubator/hama/trunk/graph/src/main/java/org/apache/hama/graph/Vertex.java (original)
+++ incubator/hama/trunk/graph/src/main/java/org/apache/hama/graph/Vertex.java Fri May 18 15:35:28 2012
@@ -23,24 +23,24 @@ import java.util.List;
 import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.io.IntWritable;
 import org.apache.hadoop.io.MapWritable;
-import org.apache.hadoop.io.Text;
 import org.apache.hadoop.io.Writable;
 import org.apache.hama.bsp.BSPPeer;
 
-public abstract class Vertex<M extends Writable> implements VertexInterface<M> {
+public abstract class Vertex<ID_TYPE extends Writable, MSG_TYPE extends Writable, EDGE_VALUE_TYPE extends Writable>
+    implements VertexInterface<ID_TYPE, MSG_TYPE, EDGE_VALUE_TYPE> {
 
-  private M value;
-  private String vertexID;
-  protected GraphJobRunner runner;
-  protected BSPPeer<?, ?, ?, ?, MapWritable> peer;
-  public List<Edge> edges;
+  private MSG_TYPE value;
+  private ID_TYPE vertexID;
+  protected GraphJobRunner<ID_TYPE, MSG_TYPE, EDGE_VALUE_TYPE> runner;
+  protected BSPPeer<VertexWritable<ID_TYPE, MSG_TYPE>, VertexArrayWritable, Writable, Writable, Writable> peer;
+  public List<Edge<ID_TYPE, EDGE_VALUE_TYPE>> edges;
 
   public Configuration getConf() {
     return peer.getConfiguration();
   }
 
   @Override
-  public String getVertexID() {
+  public ID_TYPE getVertexID() {
     return vertexID;
   }
 
@@ -49,17 +49,17 @@ public abstract class Vertex<M extends W
   }
 
   @Override
-  public void sendMessage(Edge e, M msg) throws IOException {
+  public void sendMessage(Edge<ID_TYPE, EDGE_VALUE_TYPE> e, MSG_TYPE msg)
+      throws IOException {
     MapWritable message = new MapWritable();
-    message.put(new Text(e.getName()), msg);
-
-    peer.send(e.getDestVertexID(), message);
+    message.put(e.getDestinationVertexID(), msg);
+    peer.send(e.getDestinationPeerName(), message);
   }
 
   @Override
-  public void sendMessageToNeighbors(M msg) throws IOException {
-    final List<Edge> outEdges = this.getOutEdges();
-    for (Edge e : outEdges) {
+  public void sendMessageToNeighbors(MSG_TYPE msg) throws IOException {
+    final List<Edge<ID_TYPE, EDGE_VALUE_TYPE>> outEdges = this.getOutEdges();
+    for (Edge<ID_TYPE, EDGE_VALUE_TYPE> e : outEdges) {
       sendMessage(e, msg);
     }
   }
@@ -70,21 +70,21 @@ public abstract class Vertex<M extends W
   }
 
   @Override
-  public List<Edge> getOutEdges() {
+  public List<Edge<ID_TYPE, EDGE_VALUE_TYPE>> getOutEdges() {
     return edges;
   }
 
   @Override
-  public M getValue() {
+  public MSG_TYPE getValue() {
     return value;
   }
 
   @Override
-  public void setValue(M value) {
+  public void setValue(MSG_TYPE value) {
     this.value = value;
   }
 
-  public void setVertexID(String vertexID) {
+  public void setVertexID(ID_TYPE vertexID) {
     this.vertexID = vertexID;
   }
 
@@ -97,8 +97,8 @@ public abstract class Vertex<M extends W
    * was configured or not returned a result.
    */
   @SuppressWarnings("unchecked")
-  public M getLastAggregatedValue() {
-    return (M) runner.getLastAggregatedValue();
+  public MSG_TYPE getLastAggregatedValue() {
+    return (MSG_TYPE) runner.getLastAggregatedValue();
   }
 
   /**
@@ -113,6 +113,7 @@ public abstract class Vertex<M extends W
     return peer.getNumPeers();
   }
 
+  @Override
   public long getNumVertices() {
     return runner.getNumberVertices();
   }

Modified: incubator/hama/trunk/graph/src/main/java/org/apache/hama/graph/VertexInterface.java
URL: http://svn.apache.org/viewvc/incubator/hama/trunk/graph/src/main/java/org/apache/hama/graph/VertexInterface.java?rev=1340131&r1=1340130&r2=1340131&view=diff
==============================================================================
--- incubator/hama/trunk/graph/src/main/java/org/apache/hama/graph/VertexInterface.java (original)
+++ incubator/hama/trunk/graph/src/main/java/org/apache/hama/graph/VertexInterface.java Fri May 18 15:35:28 2012
@@ -24,7 +24,16 @@ import java.util.List;
 import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.io.Writable;
 
-public interface VertexInterface<MSGTYPE extends Writable> {
+/**
+ * The vertex interface.
+ * 
+ * @param <ID_TYPE> this type must be writable and should also implement equals
+ *          and hashcode.
+ * @param <MSG_TYPE> the type used for messaging, usually the value of a vertex.
+ * @param <EDGE_VALUE_TYPE> the type used for storing edge values, usually the
+ *          value of an edge.
+ */
+public interface VertexInterface<ID_TYPE extends Writable, MSG_TYPE extends Writable, EDGE_VALUE_TYPE extends Writable> {
 
   /**
    * Used to setup a vertex.
@@ -32,22 +41,23 @@ public interface VertexInterface<MSGTYPE
   public void setup(Configuration conf);
 
   /** @return the unique identification for the vertex. */
-  public String getVertexID();
+  public ID_TYPE getVertexID();
 
   /** @return the number of vertices in the input graph. */
   public long getNumVertices();
 
   /** The user-defined function */
-  public void compute(Iterator<MSGTYPE> messages) throws IOException;
+  public void compute(Iterator<MSG_TYPE> messages) throws IOException;
 
   /** @return a list of outgoing edges of this vertex in the input graph. */
-  public List<Edge> getOutEdges();
+  public List<Edge<ID_TYPE, EDGE_VALUE_TYPE>> getOutEdges();
 
   /** Sends a message to another vertex. */
-  public void sendMessage(Edge e, MSGTYPE msg) throws IOException;
+  public void sendMessage(Edge<ID_TYPE, EDGE_VALUE_TYPE> e, MSG_TYPE msg)
+      throws IOException;
 
   /** Sends a message to neighbors */
-  public void sendMessageToNeighbors(MSGTYPE msg) throws IOException;
+  public void sendMessageToNeighbors(MSG_TYPE msg) throws IOException;
 
   /** @return the superstep number of the current superstep (starting from 0). */
   public long getSuperstepCount();
@@ -57,13 +67,13 @@ public interface VertexInterface<MSGTYPE
    * 
    * @param value
    */
-  public void setValue(MSGTYPE value);
+  public void setValue(MSG_TYPE value);
 
   /**
    * Gets the vertex value
    * 
    * @return value
    */
-  public MSGTYPE getValue();
+  public MSG_TYPE getValue();
 
 }

Modified: incubator/hama/trunk/graph/src/main/java/org/apache/hama/graph/VertexWritable.java
URL: http://svn.apache.org/viewvc/incubator/hama/trunk/graph/src/main/java/org/apache/hama/graph/VertexWritable.java?rev=1340131&r1=1340130&r2=1340131&view=diff
==============================================================================
--- incubator/hama/trunk/graph/src/main/java/org/apache/hama/graph/VertexWritable.java (original)
+++ incubator/hama/trunk/graph/src/main/java/org/apache/hama/graph/VertexWritable.java Fri May 18 15:35:28 2012
@@ -21,61 +21,104 @@ import java.io.DataInput;
 import java.io.DataOutput;
 import java.io.IOException;
 
-import org.apache.hadoop.io.Writable;
+import org.apache.hadoop.conf.Configurable;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.io.IntWritable;
+import org.apache.hadoop.io.NullWritable;
+import org.apache.hadoop.io.ObjectWritable;
+import org.apache.hadoop.io.Text;
 import org.apache.hadoop.io.WritableComparable;
 
-public class VertexWritable implements Writable,
-    WritableComparable<VertexWritable> {
+public class VertexWritable<VERTEX_ID, VERTEX_VALUE> implements
+    WritableComparable<VertexWritable<VERTEX_ID, VERTEX_VALUE>>, Configurable {
 
-  public String name;
-  public int weight;
+  /**
+   * This field is static because it doesn't need to be an instance variable. It
+   * is written in upper case, because it is considered constant per launched
+   * process.
+   */
+  public static Configuration CONFIGURATION;
+
+  VERTEX_ID vertexId;
+  VERTEX_VALUE value;
+  Class<VERTEX_ID> idCls;
+  Class<VERTEX_VALUE> valCls;
 
   public VertexWritable() {
     super();
   }
 
-  public VertexWritable(String name) {
-    super();
-    this.name = name;
-    this.weight = 0;
+  @SuppressWarnings("unchecked")
+  public VertexWritable(VERTEX_ID name, Class<VERTEX_ID> idCls) {
+    this.vertexId = name;
+    this.value = (VERTEX_VALUE) new IntWritable(0);
+    this.idCls = idCls;
+    this.valCls = org.apache.hadoop.util.ReflectionUtils.getClass(value);
   }
 
+  @SuppressWarnings("unchecked")
   public VertexWritable(int weight, String name) {
-    super();
-    this.name = name;
-    this.weight = weight;
+    this.vertexId = (VERTEX_ID) new Text(name);
+    this.value = (VERTEX_VALUE) new IntWritable(weight);
+    this.idCls = org.apache.hadoop.util.ReflectionUtils.getClass(vertexId);
+    this.valCls = org.apache.hadoop.util.ReflectionUtils.getClass(value);
+  }
+
+  @SuppressWarnings("unchecked")
+  public VertexWritable(String name) {
+    this.vertexId = (VERTEX_ID) new Text(name);
+    this.value = (VERTEX_VALUE) NullWritable.get();
+    this.idCls = org.apache.hadoop.util.ReflectionUtils.getClass(vertexId);
+    this.valCls = org.apache.hadoop.util.ReflectionUtils.getClass(value);
+  }
+
+  public VertexWritable(VERTEX_VALUE weight, VERTEX_ID name,
+      Class<VERTEX_ID> idCls, Class<VERTEX_VALUE> valCls) {
+    this.vertexId = name;
+    this.value = weight;
+    this.idCls = idCls;
+    this.valCls = valCls;
   }
 
-  public String getName() {
-    return name;
+  public VERTEX_ID getVertexId() {
+    return vertexId;
   }
 
-  public int getWeight() {
-    return weight;
+  public VERTEX_VALUE getVertexValue() {
+    return value;
   }
 
   @Override
   public String toString() {
-    return getName();
+    return getVertexId().toString();
   }
 
+  @SuppressWarnings("unchecked")
   @Override
   public void readFields(DataInput in) throws IOException {
-    this.name = in.readUTF();
-    this.weight = in.readInt();
+    try {
+      idCls = (Class<VERTEX_ID>) CONFIGURATION.getClassByName(in.readUTF());
+      valCls = (Class<VERTEX_VALUE>) CONFIGURATION.getClassByName(in.readUTF());
+    } catch (ClassNotFoundException e) {
+      throw new IOException(e);
+    }
+    vertexId = (VERTEX_ID) ObjectWritable.readObject(in, CONFIGURATION);
+    value = (VERTEX_VALUE) ObjectWritable.readObject(in, CONFIGURATION);
   }
 
   @Override
   public void write(DataOutput out) throws IOException {
-    out.writeUTF(name);
-    out.writeInt(weight);
+    out.writeUTF(idCls.getName());
+    out.writeUTF(valCls.getName());
+    ObjectWritable.writeObject(out, vertexId, idCls, CONFIGURATION);
+    ObjectWritable.writeObject(out, value, valCls, CONFIGURATION);
   }
 
   @Override
   public int hashCode() {
     final int prime = 31;
     int result = 1;
-    result = prime * result + ((name == null) ? 0 : name.hashCode());
+    result = prime * result + ((vertexId == null) ? 0 : vertexId.hashCode());
     return result;
   }
 
@@ -87,19 +130,32 @@ public class VertexWritable implements W
       return false;
     if (getClass() != obj.getClass())
       return false;
-    VertexWritable other = (VertexWritable) obj;
-    if (name == null) {
-      if (other.name != null)
+    @SuppressWarnings("unchecked")
+    VertexWritable<VERTEX_ID, VERTEX_VALUE> other = (VertexWritable<VERTEX_ID, VERTEX_VALUE>) obj;
+    if (vertexId == null) {
+      if (other.vertexId != null)
         return false;
-    } else if (!name.equals(other.name))
+    } else if (!vertexId.equals(other.vertexId))
       return false;
     return true;
   }
 
+  @SuppressWarnings("unchecked")
+  @Override
+  public int compareTo(VertexWritable<VERTEX_ID, VERTEX_VALUE> o) {
+    VertexWritable<VERTEX_ID, VERTEX_VALUE> that = o;
+    return ((Comparable<VertexWritable<VERTEX_ID, VERTEX_VALUE>>) this.vertexId)
+        .compareTo((VertexWritable<VERTEX_ID, VERTEX_VALUE>) that.vertexId);
+  }
+
+  @Override
+  public void setConf(Configuration conf) {
+    VertexWritable.CONFIGURATION = conf;
+  }
+
   @Override
-  public int compareTo(VertexWritable o) {
-    VertexWritable that = (VertexWritable) o;
-    return this.name.compareTo(that.name);
+  public Configuration getConf() {
+    return CONFIGURATION;
   }
 
 }