You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@hama.apache.org by ed...@apache.org on 2012/02/28 06:01:59 UTC

svn commit: r1294462 - /incubator/hama/trunk/graph/src/main/java/org/apache/hama/graph/GraphJobRunner.java

Author: edwardyoon
Date: Tue Feb 28 05:01:58 2012
New Revision: 1294462

URL: http://svn.apache.org/viewvc?rev=1294462&view=rev
Log:
Exit if there's no update made

Modified:
    incubator/hama/trunk/graph/src/main/java/org/apache/hama/graph/GraphJobRunner.java

Modified: incubator/hama/trunk/graph/src/main/java/org/apache/hama/graph/GraphJobRunner.java
URL: http://svn.apache.org/viewvc/incubator/hama/trunk/graph/src/main/java/org/apache/hama/graph/GraphJobRunner.java?rev=1294462&r1=1294461&r2=1294462&view=diff
==============================================================================
--- incubator/hama/trunk/graph/src/main/java/org/apache/hama/graph/GraphJobRunner.java (original)
+++ incubator/hama/trunk/graph/src/main/java/org/apache/hama/graph/GraphJobRunner.java Tue Feb 28 05:01:58 2012
@@ -28,6 +28,7 @@ import java.util.Map.Entry;
 import org.apache.commons.logging.Log;
 import org.apache.commons.logging.LogFactory;
 import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.io.IntWritable;
 import org.apache.hadoop.io.MapWritable;
 import org.apache.hadoop.io.Text;
 import org.apache.hadoop.io.Writable;
@@ -41,6 +42,8 @@ import org.apache.hama.util.KeyValuePair
 public class GraphJobRunner extends BSP {
   public static final Log LOG = LogFactory.getLog(GraphJobRunner.class);
   private Map<String, Vertex> vertices = new HashMap<String, Vertex>();
+  private String masterTask;
+  private String FLAG_MESSAGE = "hama.graph.msg.counts";
 
   @SuppressWarnings("unchecked")
   @Override
@@ -52,6 +55,7 @@ public class GraphJobRunner extends BSP 
     boolean updated = true;
     int iteration = 0;
     while (updated && iteration < maxIteration) {
+      int globalUpdateCounts = 0;
       peer.sync();
 
       MapWritable msg = null;
@@ -60,27 +64,50 @@ public class GraphJobRunner extends BSP 
 
         for (Entry<Writable, Writable> e : msg.entrySet()) {
           String vertexID = ((Text) e.getKey()).toString();
-          Writable value = e.getValue();
 
-          if (msgMap.containsKey(vertexID)) {
-            LinkedList<Writable> msgs = msgMap.get(vertexID);
-            msgs.add(value);
-            msgMap.put(vertexID, msgs);
+          if (vertexID.toString().equals(FLAG_MESSAGE)) {
+            if (((IntWritable) e.getValue()).get() == Integer.MIN_VALUE) {
+              updated = false;
+            } else {
+              globalUpdateCounts += ((IntWritable) e.getValue()).get();
+            }
           } else {
-            LinkedList<Writable> msgs = new LinkedList<Writable>();
-            msgs.add(value);
-            msgMap.put(vertexID, msgs);
-          }
+            Writable value = e.getValue();
 
+            if (msgMap.containsKey(vertexID)) {
+              LinkedList<Writable> msgs = msgMap.get(vertexID);
+              msgs.add(value);
+              msgMap.put(vertexID, msgs);
+            } else {
+              LinkedList<Writable> msgs = new LinkedList<Writable>();
+              msgs.add(value);
+              msgMap.put(vertexID, msgs);
+            }
+          }
         }
       }
 
-      if (msgMap.size() < 1) {
-        updated = false;
+      // exit if there's no update made
+      if (globalUpdateCounts == 0 && peer.getPeerName().equals(masterTask)
+          && peer.getSuperstepCount() > 1) {
+        MapWritable updatedCnt = new MapWritable();
+        updatedCnt.put(new Text(FLAG_MESSAGE), new IntWritable(
+            Integer.MIN_VALUE));
+
+        for (String peerName : peer.getAllPeerNames()) {
+          peer.send(peerName, updatedCnt);
+        }
       }
 
+      // send msgCounts to the master task
+      MapWritable updatedCnt = new MapWritable();
+      updatedCnt.put(new Text(FLAG_MESSAGE), new IntWritable(msgMap.size()));
+      peer.send(masterTask, updatedCnt);
+
       for (Map.Entry<String, LinkedList<Writable>> e : msgMap.entrySet()) {
-        vertices.get(e.getKey()).compute(e.getValue().iterator());
+        if (e.getValue().size() > 0) {
+          vertices.get(e.getKey()).compute(e.getValue().iterator());
+        }
       }
       iteration++;
     }
@@ -90,6 +117,8 @@ public class GraphJobRunner extends BSP 
   public void setup(BSPPeer peer) throws IOException, SyncException,
       InterruptedException {
     Configuration conf = peer.getConfiguration();
+    // Choose one as a master to collect global updates
+    masterTask = peer.getPeerName(0);
     LOG.debug("vertex class: " + conf.get("hama.graph.vertex.class"));
 
     KeyValuePair<? extends VertexWritable, ? extends VertexArrayWritable> next = null;