You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@hama.apache.org by ed...@apache.org on 2013/01/08 02:45:49 UTC

svn commit: r1430109 - /hama/trunk/graph/src/main/java/org/apache/hama/graph/GraphJobRunner.java

Author: edwardyoon
Date: Tue Jan  8 01:45:49 2013
New Revision: 1430109

URL: http://svn.apache.org/viewvc?rev=1430109&view=rev
Log:
revert my test codes

Modified:
    hama/trunk/graph/src/main/java/org/apache/hama/graph/GraphJobRunner.java

Modified: hama/trunk/graph/src/main/java/org/apache/hama/graph/GraphJobRunner.java
URL: http://svn.apache.org/viewvc/hama/trunk/graph/src/main/java/org/apache/hama/graph/GraphJobRunner.java?rev=1430109&r1=1430108&r2=1430109&view=diff
==============================================================================
--- hama/trunk/graph/src/main/java/org/apache/hama/graph/GraphJobRunner.java (original)
+++ hama/trunk/graph/src/main/java/org/apache/hama/graph/GraphJobRunner.java Tue Jan  8 01:45:49 2013
@@ -105,8 +105,6 @@ public final class GraphJobRunner<V exte
 
   }
 
-  Map<V, List<M>> messages = null;
-
   @Override
   public final void bsp(
       BSPPeer<Writable, Writable, Writable, Writable, GraphJobMessage> peer)
@@ -120,7 +118,7 @@ public final class GraphJobRunner<V exte
       peer.sync();
 
       // note that the messages must be parsed here
-      messages = parseMessages(peer);
+      final Map<V, List<M>> messages = parseMessages(peer);
       // master needs to update
       doMasterUpdates(peer);
       // if aggregators say we don't have updates anymore, break
@@ -128,7 +126,7 @@ public final class GraphJobRunner<V exte
         break;
       }
       // loop over vertices and do their computation
-      doSuperstep(peer);
+      doSuperstep(messages, peer);
 
       if (isMasterTask(peer)) {
         peer.getCounter(GraphJobCounter.ITERATIONS).increment(1);
@@ -177,7 +175,7 @@ public final class GraphJobRunner<V exte
    * Do the main logic of a superstep, namely checking if vertices are active,
    * feeding compute with messages and controlling combiners/aggregators.
    */
-  private void doSuperstep(
+  private void doSuperstep(Map<V, List<M>> messages,
       BSPPeer<Writable, Writable, Writable, Writable, GraphJobMessage> peer)
       throws IOException {
     int activeVertices = 0;
@@ -204,14 +202,10 @@ public final class GraphJobRunner<V exte
           activeVertices++;
         }
       }
-
-      msgs = null;
-      messages.remove(vertex.getVertexID());
     }
 
     aggregationRunner.sendAggregatorValues(peer, activeVertices);
     iteration++;
-    messages = new HashMap<V, List<M>>();
   }
 
   /**
@@ -337,8 +331,8 @@ public final class GraphJobRunner<V exte
   @SuppressWarnings("unchecked")
   private void repair(
       BSPPeer<Writable, Writable, Writable, Writable, GraphJobMessage> peer,
-      boolean selfReference) throws IOException, SyncException,
-      InterruptedException {
+      boolean selfReference) throws IOException,
+      SyncException, InterruptedException {
 
     Map<V, Vertex<V, E, M>> tmp = new HashMap<V, Vertex<V, E, M>>();
 
@@ -414,8 +408,7 @@ public final class GraphJobRunner<V exte
       BSPPeer<Writable, Writable, Writable, Writable, GraphJobMessage> peer)
       throws IOException {
     GraphJobMessage msg = null;
-    Map<V, List<M>> msgMap = new HashMap<V, List<M>>();
-    
+    final Map<V, List<M>> msgMap = new HashMap<V, List<M>>();
     while ((msg = peer.getCurrentMessage()) != null) {
       // either this is a vertex message or a directive that must be read
       // as map
@@ -452,7 +445,6 @@ public final class GraphJobRunner<V exte
       }
 
     }
-    
     return msgMap;
   }
 
@@ -541,3 +533,4 @@ public final class GraphJobRunner<V exte
   }
 
 }
+