You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@hama.apache.org by ed...@apache.org on 2013/12/23 05:56:29 UTC
svn commit: r1553069 - in /hama/trunk: CHANGES.txt
graph/src/main/java/org/apache/hama/graph/GraphJobRunner.java
graph/src/test/java/org/apache/hama/graph/TestSubmitGraphJob.java
graph/src/test/java/org/apache/hama/graph/example/PageRank.java
Author: edwardyoon
Date: Mon Dec 23 04:56:29 2013
New Revision: 1553069
URL: http://svn.apache.org/r1553069
Log:
HAMA-831: Support for multi records with same vertexID (edwardyoon)
Modified:
hama/trunk/CHANGES.txt
hama/trunk/graph/src/main/java/org/apache/hama/graph/GraphJobRunner.java
hama/trunk/graph/src/test/java/org/apache/hama/graph/TestSubmitGraphJob.java
hama/trunk/graph/src/test/java/org/apache/hama/graph/example/PageRank.java
Modified: hama/trunk/CHANGES.txt
URL: http://svn.apache.org/viewvc/hama/trunk/CHANGES.txt?rev=1553069&r1=1553068&r2=1553069&view=diff
==============================================================================
--- hama/trunk/CHANGES.txt (original)
+++ hama/trunk/CHANGES.txt Mon Dec 23 04:56:29 2013
@@ -11,6 +11,7 @@ Release 0.7.0 (unreleased changes)
BUG FIXES
+ HAMA-831: Support for multi records with same vertexID (edwardyoon)
HAMA-830: KMeans and NeuralNetwork doesn't load config file (edwardyoon)
HAMA-812: In local mode BSPJobClient.close throws Exception (Martin Illecker)
HAMA-821: Fix bugs in KMeans example and make output more readable (edwardyoon)
Modified: hama/trunk/graph/src/main/java/org/apache/hama/graph/GraphJobRunner.java
URL: http://svn.apache.org/viewvc/hama/trunk/graph/src/main/java/org/apache/hama/graph/GraphJobRunner.java?rev=1553069&r1=1553068&r2=1553069&view=diff
==============================================================================
--- hama/trunk/graph/src/main/java/org/apache/hama/graph/GraphJobRunner.java (original)
+++ hama/trunk/graph/src/main/java/org/apache/hama/graph/GraphJobRunner.java Mon Dec 23 04:56:29 2013
@@ -71,11 +71,16 @@ public final class GraphJobRunner<V exte
public static final String S_FLAG_VERTEX_TOTAL_VERTICES = "hama.6";
public static final String S_FLAG_AGGREGATOR_SKIP = "hama.7";
public static final Text FLAG_MESSAGE_COUNTS = new Text(S_FLAG_MESSAGE_COUNTS);
- public static final Text FLAG_VERTEX_INCREASE = new Text(S_FLAG_VERTEX_INCREASE);
- public static final Text FLAG_VERTEX_DECREASE = new Text(S_FLAG_VERTEX_DECREASE);
- public static final Text FLAG_VERTEX_ALTER_COUNTER = new Text(S_FLAG_VERTEX_ALTER_COUNTER);
- public static final Text FLAG_VERTEX_TOTAL_VERTICES = new Text(S_FLAG_VERTEX_TOTAL_VERTICES);
- public static final Text FLAG_AGGREGATOR_SKIP = new Text(S_FLAG_AGGREGATOR_SKIP);
+ public static final Text FLAG_VERTEX_INCREASE = new Text(
+ S_FLAG_VERTEX_INCREASE);
+ public static final Text FLAG_VERTEX_DECREASE = new Text(
+ S_FLAG_VERTEX_DECREASE);
+ public static final Text FLAG_VERTEX_ALTER_COUNTER = new Text(
+ S_FLAG_VERTEX_ALTER_COUNTER);
+ public static final Text FLAG_VERTEX_TOTAL_VERTICES = new Text(
+ S_FLAG_VERTEX_TOTAL_VERTICES);
+ public static final Text FLAG_AGGREGATOR_SKIP = new Text(
+ S_FLAG_AGGREGATOR_SKIP);
public static final String MESSAGE_COMBINER_CLASS_KEY = "hama.vertex.message.combiner.class";
public static final String VERTEX_CLASS_KEY = "hama.graph.vertex.class";
@@ -180,18 +185,24 @@ public final class GraphJobRunner<V exte
if (isMasterTask(peer) && iteration == 1) {
MapWritable updatedCnt = new MapWritable();
- updatedCnt.put(FLAG_VERTEX_TOTAL_VERTICES, new LongWritable((peer.getCounter(GraphJobCounter.INPUT_VERTICES).getCounter())));
+ updatedCnt.put(
+ FLAG_VERTEX_TOTAL_VERTICES,
+ new LongWritable((peer.getCounter(GraphJobCounter.INPUT_VERTICES)
+ .getCounter())));
// send the updates from the master tasks back to the slaves
for (String peerName : peer.getAllPeerNames()) {
peer.send(peerName, new GraphJobMessage(updatedCnt));
}
}
-
+
// this is only done in every second iteration
if (isMasterTask(peer) && iteration > 1) {
MapWritable updatedCnt = new MapWritable();
- // send total number of vertices.
- updatedCnt.put(FLAG_VERTEX_TOTAL_VERTICES, new LongWritable((peer.getCounter(GraphJobCounter.INPUT_VERTICES).getCounter())));
+ // send total number of vertices.
+ updatedCnt.put(
+ FLAG_VERTEX_TOTAL_VERTICES,
+ new LongWritable((peer.getCounter(GraphJobCounter.INPUT_VERTICES)
+ .getCounter())));
// exit if there's no update made
if (globalUpdateCounts == 0) {
updatedCnt.put(FLAG_MESSAGE_COUNTS, new IntWritable(Integer.MIN_VALUE));
@@ -217,8 +228,8 @@ public final class GraphJobRunner<V exte
// now sync
peer.sync();
// now the map message must be read that might be send from the master
- updated = getAggregationRunner().receiveAggregatedValues(peer
- .getCurrentMessage().getMap(), iteration);
+ updated = getAggregationRunner().receiveAggregatedValues(
+ peer.getCurrentMessage().getMap(), iteration);
// set the first vertex message back to the message it had before sync
firstVertexMessage = peer.getCurrentMessage();
}
@@ -281,7 +292,8 @@ public final class GraphJobRunner<V exte
}
vertices.finishSuperstep();
- getAggregationRunner().sendAggregatorValues(peer, activeVertices, this.changedVertexCnt);
+ getAggregationRunner().sendAggregatorValues(peer, activeVertices,
+ this.changedVertexCnt);
iteration++;
}
@@ -305,15 +317,15 @@ public final class GraphJobRunner<V exte
if (conf.getBoolean("hama.check.missing.vertex", true)) {
if (comparision < 0) {
throw new IllegalArgumentException(
- "Messages must never be behind the vertex in ID! Current Message ID: "
- + firstMessageId + " vs. " + vertex.getVertexID());
- }
+ "Messages must never be behind the vertex in ID! Current Message ID: "
+ + firstMessageId + " vs. " + vertex.getVertexID());
+ }
} else {
while (comparision < 0) {
- VertexMessageIterable<V, M> messageIterable = new VertexMessageIterable<V, M>(currentMessage,
- firstMessageId, peer);
+ VertexMessageIterable<V, M> messageIterable = new VertexMessageIterable<V, M>(
+ currentMessage, firstMessageId, peer);
currentMessage = messageIterable.getOverflowMessage();
- firstMessageId = (V)currentMessage.getVertexId();
+ firstMessageId = (V) currentMessage.getVertexId();
comparision = firstMessageId.compareTo(vertex.getVertexID());
}
}
@@ -383,7 +395,9 @@ public final class GraphJobRunner<V exte
setAggregationRunner(new AggregationRunner<V, E, M>());
getAggregationRunner().setupAggregators(peer);
- Class<? extends VerticesInfo<V, E, M>> verticesInfoClass = (Class<? extends VerticesInfo<V, E, M>>) conf.getClass("hama.graph.vertices.info", ListVerticesInfo.class, VerticesInfo.class);
+ Class<? extends VerticesInfo<V, E, M>> verticesInfoClass = (Class<? extends VerticesInfo<V, E, M>>) conf
+ .getClass("hama.graph.vertices.info", ListVerticesInfo.class,
+ VerticesInfo.class);
vertices = ReflectionUtils.newInstance(verticesInfoClass);
vertices.init(this, conf, peer.getTaskId());
}
@@ -424,25 +438,44 @@ public final class GraphJobRunner<V exte
// our VertexInputReader ensures incoming vertices are sorted by their ID
Vertex<V, E, M> vertex = GraphJobRunner
.<V, E, M> newVertexInstance(VERTEX_CLASS);
+ Vertex<V, E, M> currentVertex = GraphJobRunner
+ .<V, E, M> newVertexInstance(VERTEX_CLASS);
+
KeyValuePair<Writable, Writable> record = null;
KeyValuePair<Writable, Writable> converted = null;
+
while ((record = peer.readNext()) != null) {
converted = converter.convertRecord(record, conf);
- vertex = (Vertex<V, E, M>) converted.getKey();
- vertex.setRunner(this);
- vertex.setup(conf);
+ currentVertex = (Vertex<V, E, M>) converted.getKey();
- if (selfReference) {
- vertex.addEdge(new Edge<V, E>(vertex.getVertexID(), null));
- }
+ if (vertex.getVertexID() == null) {
+ vertex = currentVertex;
+ } else {
+ if (vertex.getVertexID().equals(currentVertex.getVertexID())) {
+ for (Edge<V, E> edge : currentVertex.getEdges()) {
+ vertex.addEdge(edge);
+ }
+ } else {
+ vertex.setRunner(this);
+ vertex.setup(conf);
- vertices.addVertex(vertex);
+ if (selfReference) {
+ vertex.addEdge(new Edge<V, E>(vertex.getVertexID(), null));
+ }
- // Reinitializing vertex object for memory based implementations of
- // VerticesInfo
- vertex = GraphJobRunner.<V, E, M> newVertexInstance(VERTEX_CLASS);
- vertex.setRunner(this);
+ vertices.addVertex(vertex);
+ vertex = currentVertex;
+ }
+ }
}
+ // add last vertex.
+ vertex.setRunner(this);
+ vertex.setup(conf);
+ if (selfReference) {
+ vertex.addEdge(new Edge<V, E>(vertex.getVertexID(), null));
+ }
+ vertices.addVertex(vertex);
+
vertices.finishAdditions();
// finish the "superstep" because we have written a new file here
vertices.finishSuperstep();
@@ -454,32 +487,37 @@ public final class GraphJobRunner<V exte
/**
* Add new vertex into memory of each peer.
- * @throws IOException
+ *
+ * @throws IOException
*/
private void addVertex(Vertex<V, E, M> vertex) throws IOException {
vertex.setRunner(this);
vertex.setup(conf);
-
+
if (conf.getBoolean("hama.graph.self.ref", false)) {
vertex.addEdge(new Edge<V, E>(vertex.getVertexID(), null));
}
-
- LOG.debug("Added VertexID: " + vertex.getVertexID() + " in peer " + peer.getPeerName());
+
+ LOG.debug("Added VertexID: " + vertex.getVertexID() + " in peer "
+ + peer.getPeerName());
vertices.addVertex(vertex);
}
/**
* Remove vertex from this peer.
- * @throws IOException
+ *
+ * @throws IOException
*/
private void removeVertex(V vertexID) {
vertices.removeVertex(vertexID);
- LOG.debug("Removed VertexID: " + vertexID + " in peer " + peer.getPeerName());
+ LOG.debug("Removed VertexID: " + vertexID + " in peer "
+ + peer.getPeerName());
}
/**
* After all inserts are done, we must finalize the VertexInfo data structure.
- * @throws IOException
+ *
+ * @throws IOException
*/
private void finishAdditions() throws IOException {
vertices.finishAdditions();
@@ -489,7 +527,8 @@ public final class GraphJobRunner<V exte
/**
* After all inserts are done, we must finalize the VertexInfo data structure.
- * @throws IOException
+ *
+ * @throws IOException
*/
private void finishRemovals() throws IOException {
vertices.finishRemovals();
@@ -534,8 +573,8 @@ public final class GraphJobRunner<V exte
throws IOException, SyncException, InterruptedException {
GraphJobMessage msg = null;
boolean dynamicAdditions = false;
- boolean dynamicRemovals = false;
-
+ boolean dynamicRemovals = false;
+
while ((msg = peer.getCurrentMessage()) != null) {
// either this is a vertex message or a directive that must be read
// as map
@@ -559,8 +598,8 @@ public final class GraphJobRunner<V exte
(M) e.getValue());
} else if (getAggregationRunner().isEnabled()
&& vertexID.toString().startsWith(S_FLAG_AGGREGATOR_INCREMENT)) {
- getAggregationRunner().masterReadAggregatedIncrementalValue(vertexID,
- (M) e.getValue());
+ getAggregationRunner().masterReadAggregatedIncrementalValue(
+ vertexID, (M) e.getValue());
} else if (FLAG_VERTEX_INCREASE.equals(vertexID)) {
dynamicAdditions = true;
addVertex((Vertex<V, E, M>) e.getValue());
@@ -571,15 +610,20 @@ public final class GraphJobRunner<V exte
this.numberVertices = ((LongWritable) e.getValue()).get();
} else if (FLAG_VERTEX_ALTER_COUNTER.equals(vertexID)) {
if (isMasterTask(peer)) {
- peer.getCounter(GraphJobCounter.INPUT_VERTICES).increment(((LongWritable) e.getValue()).get());
+ peer.getCounter(GraphJobCounter.INPUT_VERTICES).increment(
+ ((LongWritable) e.getValue()).get());
} else {
- throw new UnsupportedOperationException("A message to increase vertex count is in a wrong place: " + peer);
+ throw new UnsupportedOperationException(
+ "A message to increase vertex count is in a wrong place: "
+ + peer);
}
} else if (FLAG_AGGREGATOR_SKIP.equals(vertexID)) {
if (isMasterTask(peer)) {
- this.getAggregationRunner().addSkipAggregator(((IntWritable) e.getValue()).get());
+ this.getAggregationRunner().addSkipAggregator(
+ ((IntWritable) e.getValue()).get());
} else {
- throw new UnsupportedOperationException("A message to skip aggregators is in a wrong peer: " + peer);
+ throw new UnsupportedOperationException(
+ "A message to skip aggregators is in a wrong peer: " + peer);
}
}
}
@@ -590,14 +634,15 @@ public final class GraphJobRunner<V exte
}
- // If we applied any changes to vertices, we need to call finishAdditions and finishRemovals in the end.
+ // If we applied any changes to vertices, we need to call finishAdditions
+ // and finishRemovals in the end.
if (dynamicAdditions) {
finishAdditions();
}
if (dynamicRemovals) {
finishRemovals();
}
-
+
return msg;
}
Modified: hama/trunk/graph/src/test/java/org/apache/hama/graph/TestSubmitGraphJob.java
URL: http://svn.apache.org/viewvc/hama/trunk/graph/src/test/java/org/apache/hama/graph/TestSubmitGraphJob.java?rev=1553069&r1=1553068&r2=1553069&view=diff
==============================================================================
--- hama/trunk/graph/src/test/java/org/apache/hama/graph/TestSubmitGraphJob.java (original)
+++ hama/trunk/graph/src/test/java/org/apache/hama/graph/TestSubmitGraphJob.java Mon Dec 23 04:56:29 2013
@@ -44,7 +44,8 @@ import org.junit.Before;
public class TestSubmitGraphJob extends TestBSPMasterGroomServer {
String[] input = new String[] { "stackoverflow.com\tyahoo.com",
- "facebook.com\ttwitter.com\tgoogle.com\tnasa.gov",
+ "facebook.com\ttwitter.com",
+ "facebook.com\tgoogle.com\tnasa.gov",
"yahoo.com\tnasa.gov\tstackoverflow.com",
"twitter.com\tgoogle.com\tfacebook.com",
"nasa.gov\tyahoo.com\tstackoverflow.com",
@@ -52,11 +53,11 @@ public class TestSubmitGraphJob extends
private static String INPUT = "/tmp/pagerank/real-tmp.seq";
private static String OUTPUT = "/tmp/pagerank/real-out";
-
+ @SuppressWarnings("rawtypes")
private static final List<Class<? extends VerticesInfo>> vi = new ArrayList<Class<? extends VerticesInfo>>();
@Before
- public void setUp() throws Exception {
+ public void setUp() throws Exception {
super.setUp();
vi.add(ListVerticesInfo.class);
vi.add(DiskVerticesInfo.class);
@@ -113,9 +114,11 @@ public class TestSubmitGraphJob extends
}
}
+ @SuppressWarnings("rawtypes")
protected void injectVerticesInfo() {
- Class<? extends VerticesInfo> verticesInfoClass = vi.get(Math.abs(new Random().nextInt() % 3));
- LOG.info("using vertices info of type : "+verticesInfoClass.getName());
+ Class<? extends VerticesInfo> verticesInfoClass = vi.get(Math
+ .abs(new Random().nextInt() % 3));
+ LOG.info("using vertices info of type : " + verticesInfoClass.getName());
}
private void verifyResult() throws IOException {
Modified: hama/trunk/graph/src/test/java/org/apache/hama/graph/example/PageRank.java
URL: http://svn.apache.org/viewvc/hama/trunk/graph/src/test/java/org/apache/hama/graph/example/PageRank.java?rev=1553069&r1=1553068&r2=1553069&view=diff
==============================================================================
--- hama/trunk/graph/src/test/java/org/apache/hama/graph/example/PageRank.java (original)
+++ hama/trunk/graph/src/test/java/org/apache/hama/graph/example/PageRank.java Mon Dec 23 04:56:29 2013
@@ -43,7 +43,7 @@ public class PageRank {
public static class PageRankVertex extends
Vertex<Text, NullWritable, DoubleWritable> {
-
+
static double DAMPING_FACTOR = 0.85;
static double MAXIMUM_CONVERGENCE_ERROR = 0.001;