You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@hama.apache.org by ed...@apache.org on 2012/02/28 06:01:59 UTC
svn commit: r1294462 -
/incubator/hama/trunk/graph/src/main/java/org/apache/hama/graph/GraphJobRunner.java
Author: edwardyoon
Date: Tue Feb 28 05:01:58 2012
New Revision: 1294462
URL: http://svn.apache.org/viewvc?rev=1294462&view=rev
Log:
Exit if there's no update made
Modified:
incubator/hama/trunk/graph/src/main/java/org/apache/hama/graph/GraphJobRunner.java
Modified: incubator/hama/trunk/graph/src/main/java/org/apache/hama/graph/GraphJobRunner.java
URL: http://svn.apache.org/viewvc/incubator/hama/trunk/graph/src/main/java/org/apache/hama/graph/GraphJobRunner.java?rev=1294462&r1=1294461&r2=1294462&view=diff
==============================================================================
--- incubator/hama/trunk/graph/src/main/java/org/apache/hama/graph/GraphJobRunner.java (original)
+++ incubator/hama/trunk/graph/src/main/java/org/apache/hama/graph/GraphJobRunner.java Tue Feb 28 05:01:58 2012
@@ -28,6 +28,7 @@ import java.util.Map.Entry;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.io.MapWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.io.Writable;
@@ -41,6 +42,8 @@ import org.apache.hama.util.KeyValuePair
public class GraphJobRunner extends BSP {
public static final Log LOG = LogFactory.getLog(GraphJobRunner.class);
private Map<String, Vertex> vertices = new HashMap<String, Vertex>();
+ private String masterTask;
+ private String FLAG_MESSAGE = "hama.graph.msg.counts";
@SuppressWarnings("unchecked")
@Override
@@ -52,6 +55,7 @@ public class GraphJobRunner extends BSP
boolean updated = true;
int iteration = 0;
while (updated && iteration < maxIteration) {
+ int globalUpdateCounts = 0;
peer.sync();
MapWritable msg = null;
@@ -60,27 +64,50 @@ public class GraphJobRunner extends BSP
for (Entry<Writable, Writable> e : msg.entrySet()) {
String vertexID = ((Text) e.getKey()).toString();
- Writable value = e.getValue();
- if (msgMap.containsKey(vertexID)) {
- LinkedList<Writable> msgs = msgMap.get(vertexID);
- msgs.add(value);
- msgMap.put(vertexID, msgs);
+ if (vertexID.toString().equals(FLAG_MESSAGE)) {
+ if (((IntWritable) e.getValue()).get() == Integer.MIN_VALUE) {
+ updated = false;
+ } else {
+ globalUpdateCounts += ((IntWritable) e.getValue()).get();
+ }
} else {
- LinkedList<Writable> msgs = new LinkedList<Writable>();
- msgs.add(value);
- msgMap.put(vertexID, msgs);
- }
+ Writable value = e.getValue();
+ if (msgMap.containsKey(vertexID)) {
+ LinkedList<Writable> msgs = msgMap.get(vertexID);
+ msgs.add(value);
+ msgMap.put(vertexID, msgs);
+ } else {
+ LinkedList<Writable> msgs = new LinkedList<Writable>();
+ msgs.add(value);
+ msgMap.put(vertexID, msgs);
+ }
+ }
}
}
- if (msgMap.size() < 1) {
- updated = false;
+ // exit if there's no update made
+ if (globalUpdateCounts == 0 && peer.getPeerName().equals(masterTask)
+ && peer.getSuperstepCount() > 1) {
+ MapWritable updatedCnt = new MapWritable();
+ updatedCnt.put(new Text(FLAG_MESSAGE), new IntWritable(
+ Integer.MIN_VALUE));
+
+ for (String peerName : peer.getAllPeerNames()) {
+ peer.send(peerName, updatedCnt);
+ }
}
+ // send msgCounts to the master task
+ MapWritable updatedCnt = new MapWritable();
+ updatedCnt.put(new Text(FLAG_MESSAGE), new IntWritable(msgMap.size()));
+ peer.send(masterTask, updatedCnt);
+
for (Map.Entry<String, LinkedList<Writable>> e : msgMap.entrySet()) {
- vertices.get(e.getKey()).compute(e.getValue().iterator());
+ if (e.getValue().size() > 0) {
+ vertices.get(e.getKey()).compute(e.getValue().iterator());
+ }
}
iteration++;
}
@@ -90,6 +117,8 @@ public class GraphJobRunner extends BSP
public void setup(BSPPeer peer) throws IOException, SyncException,
InterruptedException {
Configuration conf = peer.getConfiguration();
+ // Choose one as a master to collect global updates
+ masterTask = peer.getPeerName(0);
LOG.debug("vertex class: " + conf.get("hama.graph.vertex.class"));
KeyValuePair<? extends VertexWritable, ? extends VertexArrayWritable> next = null;