You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@hama.apache.org by tj...@apache.org on 2012/05/18 17:35:30 UTC
svn commit: r1340131 [2/2] - in /incubator/hama/trunk: ./
core/src/main/java/org/apache/hama/bsp/
core/src/main/java/org/apache/hama/bsp/sync/
core/src/main/java/org/apache/hama/util/
core/src/test/java/org/apache/hama/bsp/ examples/src/main/java/org/a...
Modified: incubator/hama/trunk/graph/src/main/java/org/apache/hama/graph/MinAggregator.java
URL: http://svn.apache.org/viewvc/incubator/hama/trunk/graph/src/main/java/org/apache/hama/graph/MinAggregator.java?rev=1340131&r1=1340130&r2=1340131&view=diff
==============================================================================
--- incubator/hama/trunk/graph/src/main/java/org/apache/hama/graph/MinAggregator.java (original)
+++ incubator/hama/trunk/graph/src/main/java/org/apache/hama/graph/MinAggregator.java Fri May 18 15:35:28 2012
@@ -23,12 +23,14 @@ public class MinAggregator extends Abstr
int min = Integer.MAX_VALUE;
+ @Override
public void aggregate(IntWritable value) {
if (value.get() < min) {
min = value.get();
}
}
+ @Override
public IntWritable getValue() {
return new IntWritable(min);
}
Modified: incubator/hama/trunk/graph/src/main/java/org/apache/hama/graph/Vertex.java
URL: http://svn.apache.org/viewvc/incubator/hama/trunk/graph/src/main/java/org/apache/hama/graph/Vertex.java?rev=1340131&r1=1340130&r2=1340131&view=diff
==============================================================================
--- incubator/hama/trunk/graph/src/main/java/org/apache/hama/graph/Vertex.java (original)
+++ incubator/hama/trunk/graph/src/main/java/org/apache/hama/graph/Vertex.java Fri May 18 15:35:28 2012
@@ -23,24 +23,24 @@ import java.util.List;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.io.MapWritable;
-import org.apache.hadoop.io.Text;
import org.apache.hadoop.io.Writable;
import org.apache.hama.bsp.BSPPeer;
-public abstract class Vertex<M extends Writable> implements VertexInterface<M> {
+public abstract class Vertex<ID_TYPE extends Writable, MSG_TYPE extends Writable, EDGE_VALUE_TYPE extends Writable>
+ implements VertexInterface<ID_TYPE, MSG_TYPE, EDGE_VALUE_TYPE> {
- private M value;
- private String vertexID;
- protected GraphJobRunner runner;
- protected BSPPeer<?, ?, ?, ?, MapWritable> peer;
- public List<Edge> edges;
+ private MSG_TYPE value;
+ private ID_TYPE vertexID;
+ protected GraphJobRunner<ID_TYPE, MSG_TYPE, EDGE_VALUE_TYPE> runner;
+ protected BSPPeer<VertexWritable<ID_TYPE, MSG_TYPE>, VertexArrayWritable, Writable, Writable, Writable> peer;
+ public List<Edge<ID_TYPE, EDGE_VALUE_TYPE>> edges;
public Configuration getConf() {
return peer.getConfiguration();
}
@Override
- public String getVertexID() {
+ public ID_TYPE getVertexID() {
return vertexID;
}
@@ -49,17 +49,17 @@ public abstract class Vertex<M extends W
}
@Override
- public void sendMessage(Edge e, M msg) throws IOException {
+ public void sendMessage(Edge<ID_TYPE, EDGE_VALUE_TYPE> e, MSG_TYPE msg)
+ throws IOException {
MapWritable message = new MapWritable();
- message.put(new Text(e.getName()), msg);
-
- peer.send(e.getDestVertexID(), message);
+ message.put(e.getDestinationVertexID(), msg);
+ peer.send(e.getDestinationPeerName(), message);
}
@Override
- public void sendMessageToNeighbors(M msg) throws IOException {
- final List<Edge> outEdges = this.getOutEdges();
- for (Edge e : outEdges) {
+ public void sendMessageToNeighbors(MSG_TYPE msg) throws IOException {
+ final List<Edge<ID_TYPE, EDGE_VALUE_TYPE>> outEdges = this.getOutEdges();
+ for (Edge<ID_TYPE, EDGE_VALUE_TYPE> e : outEdges) {
sendMessage(e, msg);
}
}
@@ -70,21 +70,21 @@ public abstract class Vertex<M extends W
}
@Override
- public List<Edge> getOutEdges() {
+ public List<Edge<ID_TYPE, EDGE_VALUE_TYPE>> getOutEdges() {
return edges;
}
@Override
- public M getValue() {
+ public MSG_TYPE getValue() {
return value;
}
@Override
- public void setValue(M value) {
+ public void setValue(MSG_TYPE value) {
this.value = value;
}
- public void setVertexID(String vertexID) {
+ public void setVertexID(ID_TYPE vertexID) {
this.vertexID = vertexID;
}
@@ -97,8 +97,8 @@ public abstract class Vertex<M extends W
* was configured or not returned a result.
*/
@SuppressWarnings("unchecked")
- public M getLastAggregatedValue() {
- return (M) runner.getLastAggregatedValue();
+ public MSG_TYPE getLastAggregatedValue() {
+ return (MSG_TYPE) runner.getLastAggregatedValue();
}
/**
@@ -113,6 +113,7 @@ public abstract class Vertex<M extends W
return peer.getNumPeers();
}
+ @Override
public long getNumVertices() {
return runner.getNumberVertices();
}
Modified: incubator/hama/trunk/graph/src/main/java/org/apache/hama/graph/VertexInterface.java
URL: http://svn.apache.org/viewvc/incubator/hama/trunk/graph/src/main/java/org/apache/hama/graph/VertexInterface.java?rev=1340131&r1=1340130&r2=1340131&view=diff
==============================================================================
--- incubator/hama/trunk/graph/src/main/java/org/apache/hama/graph/VertexInterface.java (original)
+++ incubator/hama/trunk/graph/src/main/java/org/apache/hama/graph/VertexInterface.java Fri May 18 15:35:28 2012
@@ -24,7 +24,16 @@ import java.util.List;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.io.Writable;
-public interface VertexInterface<MSGTYPE extends Writable> {
+/**
+ * The vertex interface.
+ *
+ * @param <ID_TYPE> this type must be writable and should also implement equals
+ * and hashcode.
+ * @param <MSG_TYPE> the type used for messaging, usually the value of a vertex.
+ * @param <EDGE_VALUE_TYPE> the type used for storing edge values, usually the
+ * value of an edge.
+ */
+public interface VertexInterface<ID_TYPE extends Writable, MSG_TYPE extends Writable, EDGE_VALUE_TYPE extends Writable> {
/**
* Used to setup a vertex.
@@ -32,22 +41,23 @@ public interface VertexInterface<MSGTYPE
public void setup(Configuration conf);
/** @return the unique identification for the vertex. */
- public String getVertexID();
+ public ID_TYPE getVertexID();
/** @return the number of vertices in the input graph. */
public long getNumVertices();
/** The user-defined function */
- public void compute(Iterator<MSGTYPE> messages) throws IOException;
+ public void compute(Iterator<MSG_TYPE> messages) throws IOException;
/** @return a list of outgoing edges of this vertex in the input graph. */
- public List<Edge> getOutEdges();
+ public List<Edge<ID_TYPE, EDGE_VALUE_TYPE>> getOutEdges();
/** Sends a message to another vertex. */
- public void sendMessage(Edge e, MSGTYPE msg) throws IOException;
+ public void sendMessage(Edge<ID_TYPE, EDGE_VALUE_TYPE> e, MSG_TYPE msg)
+ throws IOException;
/** Sends a message to neighbors */
- public void sendMessageToNeighbors(MSGTYPE msg) throws IOException;
+ public void sendMessageToNeighbors(MSG_TYPE msg) throws IOException;
/** @return the superstep number of the current superstep (starting from 0). */
public long getSuperstepCount();
@@ -57,13 +67,13 @@ public interface VertexInterface<MSGTYPE
*
* @param value
*/
- public void setValue(MSGTYPE value);
+ public void setValue(MSG_TYPE value);
/**
* Gets the vertex value
*
* @return value
*/
- public MSGTYPE getValue();
+ public MSG_TYPE getValue();
}
Modified: incubator/hama/trunk/graph/src/main/java/org/apache/hama/graph/VertexWritable.java
URL: http://svn.apache.org/viewvc/incubator/hama/trunk/graph/src/main/java/org/apache/hama/graph/VertexWritable.java?rev=1340131&r1=1340130&r2=1340131&view=diff
==============================================================================
--- incubator/hama/trunk/graph/src/main/java/org/apache/hama/graph/VertexWritable.java (original)
+++ incubator/hama/trunk/graph/src/main/java/org/apache/hama/graph/VertexWritable.java Fri May 18 15:35:28 2012
@@ -21,61 +21,104 @@ import java.io.DataInput;
import java.io.DataOutput;
import java.io.IOException;
-import org.apache.hadoop.io.Writable;
+import org.apache.hadoop.conf.Configurable;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.io.IntWritable;
+import org.apache.hadoop.io.NullWritable;
+import org.apache.hadoop.io.ObjectWritable;
+import org.apache.hadoop.io.Text;
import org.apache.hadoop.io.WritableComparable;
-public class VertexWritable implements Writable,
- WritableComparable<VertexWritable> {
+public class VertexWritable<VERTEX_ID, VERTEX_VALUE> implements
+ WritableComparable<VertexWritable<VERTEX_ID, VERTEX_VALUE>>, Configurable {
- public String name;
- public int weight;
+ /**
+ * This field is static because it doesn't need to be an instance variable. It
+ * is written in upper case, because it is considered constant per launched
+ * process.
+ */
+ public static Configuration CONFIGURATION;
+
+ VERTEX_ID vertexId;
+ VERTEX_VALUE value;
+ Class<VERTEX_ID> idCls;
+ Class<VERTEX_VALUE> valCls;
public VertexWritable() {
super();
}
- public VertexWritable(String name) {
- super();
- this.name = name;
- this.weight = 0;
+ @SuppressWarnings("unchecked")
+ public VertexWritable(VERTEX_ID name, Class<VERTEX_ID> idCls) {
+ this.vertexId = name;
+ this.value = (VERTEX_VALUE) new IntWritable(0);
+ this.idCls = idCls;
+ this.valCls = org.apache.hadoop.util.ReflectionUtils.getClass(value);
}
+ @SuppressWarnings("unchecked")
public VertexWritable(int weight, String name) {
- super();
- this.name = name;
- this.weight = weight;
+ this.vertexId = (VERTEX_ID) new Text(name);
+ this.value = (VERTEX_VALUE) new IntWritable(weight);
+ this.idCls = org.apache.hadoop.util.ReflectionUtils.getClass(vertexId);
+ this.valCls = org.apache.hadoop.util.ReflectionUtils.getClass(value);
+ }
+
+ @SuppressWarnings("unchecked")
+ public VertexWritable(String name) {
+ this.vertexId = (VERTEX_ID) new Text(name);
+ this.value = (VERTEX_VALUE) NullWritable.get();
+ this.idCls = org.apache.hadoop.util.ReflectionUtils.getClass(vertexId);
+ this.valCls = org.apache.hadoop.util.ReflectionUtils.getClass(value);
+ }
+
+ public VertexWritable(VERTEX_VALUE weight, VERTEX_ID name,
+ Class<VERTEX_ID> idCls, Class<VERTEX_VALUE> valCls) {
+ this.vertexId = name;
+ this.value = weight;
+ this.idCls = idCls;
+ this.valCls = valCls;
}
- public String getName() {
- return name;
+ public VERTEX_ID getVertexId() {
+ return vertexId;
}
- public int getWeight() {
- return weight;
+ public VERTEX_VALUE getVertexValue() {
+ return value;
}
@Override
public String toString() {
- return getName();
+ return getVertexId().toString();
}
+ @SuppressWarnings("unchecked")
@Override
public void readFields(DataInput in) throws IOException {
- this.name = in.readUTF();
- this.weight = in.readInt();
+ try {
+ idCls = (Class<VERTEX_ID>) CONFIGURATION.getClassByName(in.readUTF());
+ valCls = (Class<VERTEX_VALUE>) CONFIGURATION.getClassByName(in.readUTF());
+ } catch (ClassNotFoundException e) {
+ throw new IOException(e);
+ }
+ vertexId = (VERTEX_ID) ObjectWritable.readObject(in, CONFIGURATION);
+ value = (VERTEX_VALUE) ObjectWritable.readObject(in, CONFIGURATION);
}
@Override
public void write(DataOutput out) throws IOException {
- out.writeUTF(name);
- out.writeInt(weight);
+ out.writeUTF(idCls.getName());
+ out.writeUTF(valCls.getName());
+ ObjectWritable.writeObject(out, vertexId, idCls, CONFIGURATION);
+ ObjectWritable.writeObject(out, value, valCls, CONFIGURATION);
}
@Override
public int hashCode() {
final int prime = 31;
int result = 1;
- result = prime * result + ((name == null) ? 0 : name.hashCode());
+ result = prime * result + ((vertexId == null) ? 0 : vertexId.hashCode());
return result;
}
@@ -87,19 +130,32 @@ public class VertexWritable implements W
return false;
if (getClass() != obj.getClass())
return false;
- VertexWritable other = (VertexWritable) obj;
- if (name == null) {
- if (other.name != null)
+ @SuppressWarnings("unchecked")
+ VertexWritable<VERTEX_ID, VERTEX_VALUE> other = (VertexWritable<VERTEX_ID, VERTEX_VALUE>) obj;
+ if (vertexId == null) {
+ if (other.vertexId != null)
return false;
- } else if (!name.equals(other.name))
+ } else if (!vertexId.equals(other.vertexId))
return false;
return true;
}
+ @SuppressWarnings("unchecked")
+ @Override
+ public int compareTo(VertexWritable<VERTEX_ID, VERTEX_VALUE> o) {
+ VertexWritable<VERTEX_ID, VERTEX_VALUE> that = o;
+ return ((Comparable<VertexWritable<VERTEX_ID, VERTEX_VALUE>>) this.vertexId)
+ .compareTo((VertexWritable<VERTEX_ID, VERTEX_VALUE>) that.vertexId);
+ }
+
+ @Override
+ public void setConf(Configuration conf) {
+ VertexWritable.CONFIGURATION = conf;
+ }
+
@Override
- public int compareTo(VertexWritable o) {
- VertexWritable that = (VertexWritable) o;
- return this.name.compareTo(that.name);
+ public Configuration getConf() {
+ return CONFIGURATION;
}
}