You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@hama.apache.org by tj...@apache.org on 2012/08/20 13:04:30 UTC

svn commit: r1374968 - /hama/trunk/graph/src/main/java/org/apache/hama/graph/GraphJobRunner.java

Author: tjungblut
Date: Mon Aug 20 11:04:30 2012
New Revision: 1374968

URL: http://svn.apache.org/viewvc?rev=1374968&view=rev
Log:
revert the revert

Modified:
    hama/trunk/graph/src/main/java/org/apache/hama/graph/GraphJobRunner.java

Modified: hama/trunk/graph/src/main/java/org/apache/hama/graph/GraphJobRunner.java
URL: http://svn.apache.org/viewvc/hama/trunk/graph/src/main/java/org/apache/hama/graph/GraphJobRunner.java?rev=1374968&r1=1374967&r2=1374968&view=diff
==============================================================================
--- hama/trunk/graph/src/main/java/org/apache/hama/graph/GraphJobRunner.java (original)
+++ hama/trunk/graph/src/main/java/org/apache/hama/graph/GraphJobRunner.java Mon Aug 20 11:04:30 2012
@@ -391,16 +391,16 @@ public final class GraphJobRunner<V exte
     Vertex<V, E, M> vertex = newVertexInstance(vertexClass, conf);
     vertex.setPeer(peer);
     vertex.runner = this;
-    while (true) {
-      KeyValuePair<Writable, Writable> next = peer.readNext();
-      if (next == null) {
-        break;
-      }
+
+    KeyValuePair<Writable, Writable> next = null;
+    int lines = 0;
+    while ((next = peer.readNext()) != null) {
       boolean vertexFinished = reader.parseVertex(next.getKey(),
           next.getValue(), vertex);
       if (!vertexFinished) {
         continue;
       }
+
       if (vertex.getEdges() == null) {
         vertex.setEdges(new ArrayList<Edge<V, E>>(0));
       }
@@ -420,12 +420,26 @@ public final class GraphJobRunner<V exte
         }
         peer.send(peer.getPeerName(partition), new GraphJobMessage(vertex));
       } else {
+        // FIXME need to set destination names
         vertex.setup(conf);
         vertices.put(vertex.getVertexID(), vertex);
       }
       vertex = newVertexInstance(vertexClass, conf);
       vertex.setPeer(peer);
       vertex.runner = this;
+
+      lines++;
+      if ((lines % 100000) == 0) {
+        peer.sync();
+        GraphJobMessage msg = null;
+        while ((msg = peer.getCurrentMessage()) != null) {
+          Vertex<V, E, M> messagedVertex = (Vertex<V, E, M>) msg.getVertex();
+          messagedVertex.setPeer(peer);
+          messagedVertex.runner = this;
+          messagedVertex.setup(conf);
+          vertices.put(messagedVertex.getVertexID(), messagedVertex);
+        }
+      }
     }
 
     if (runtimePartitioning) {
@@ -440,6 +454,8 @@ public final class GraphJobRunner<V exte
       }
     }
 
+    LOG.info("Loading finished at " + peer.getSuperstepCount() + " steps.");
+
     /*
      * If the user want to repair the graph, it should traverse through that
      * local chunk of adjancency list and message the corresponding peer to