You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@hama.apache.org by ed...@apache.org on 2013/01/08 02:45:49 UTC
svn commit: r1430109 -
/hama/trunk/graph/src/main/java/org/apache/hama/graph/GraphJobRunner.java
Author: edwardyoon
Date: Tue Jan 8 01:45:49 2013
New Revision: 1430109
URL: http://svn.apache.org/viewvc?rev=1430109&view=rev
Log:
revert my test codes
Modified:
hama/trunk/graph/src/main/java/org/apache/hama/graph/GraphJobRunner.java
Modified: hama/trunk/graph/src/main/java/org/apache/hama/graph/GraphJobRunner.java
URL: http://svn.apache.org/viewvc/hama/trunk/graph/src/main/java/org/apache/hama/graph/GraphJobRunner.java?rev=1430109&r1=1430108&r2=1430109&view=diff
==============================================================================
--- hama/trunk/graph/src/main/java/org/apache/hama/graph/GraphJobRunner.java (original)
+++ hama/trunk/graph/src/main/java/org/apache/hama/graph/GraphJobRunner.java Tue Jan 8 01:45:49 2013
@@ -105,8 +105,6 @@ public final class GraphJobRunner<V exte
}
- Map<V, List<M>> messages = null;
-
@Override
public final void bsp(
BSPPeer<Writable, Writable, Writable, Writable, GraphJobMessage> peer)
@@ -120,7 +118,7 @@ public final class GraphJobRunner<V exte
peer.sync();
// note that the messages must be parsed here
- messages = parseMessages(peer);
+ final Map<V, List<M>> messages = parseMessages(peer);
// master needs to update
doMasterUpdates(peer);
// if aggregators say we don't have updates anymore, break
@@ -128,7 +126,7 @@ public final class GraphJobRunner<V exte
break;
}
// loop over vertices and do their computation
- doSuperstep(peer);
+ doSuperstep(messages, peer);
if (isMasterTask(peer)) {
peer.getCounter(GraphJobCounter.ITERATIONS).increment(1);
@@ -177,7 +175,7 @@ public final class GraphJobRunner<V exte
* Do the main logic of a superstep, namely checking if vertices are active,
* feeding compute with messages and controlling combiners/aggregators.
*/
- private void doSuperstep(
+ private void doSuperstep(Map<V, List<M>> messages,
BSPPeer<Writable, Writable, Writable, Writable, GraphJobMessage> peer)
throws IOException {
int activeVertices = 0;
@@ -204,14 +202,10 @@ public final class GraphJobRunner<V exte
activeVertices++;
}
}
-
- msgs = null;
- messages.remove(vertex.getVertexID());
}
aggregationRunner.sendAggregatorValues(peer, activeVertices);
iteration++;
- messages = new HashMap<V, List<M>>();
}
/**
@@ -337,8 +331,8 @@ public final class GraphJobRunner<V exte
@SuppressWarnings("unchecked")
private void repair(
BSPPeer<Writable, Writable, Writable, Writable, GraphJobMessage> peer,
- boolean selfReference) throws IOException, SyncException,
- InterruptedException {
+ boolean selfReference) throws IOException,
+ SyncException, InterruptedException {
Map<V, Vertex<V, E, M>> tmp = new HashMap<V, Vertex<V, E, M>>();
@@ -414,8 +408,7 @@ public final class GraphJobRunner<V exte
BSPPeer<Writable, Writable, Writable, Writable, GraphJobMessage> peer)
throws IOException {
GraphJobMessage msg = null;
- Map<V, List<M>> msgMap = new HashMap<V, List<M>>();
-
+ final Map<V, List<M>> msgMap = new HashMap<V, List<M>>();
while ((msg = peer.getCurrentMessage()) != null) {
// either this is a vertex message or a directive that must be read
// as map
@@ -452,7 +445,6 @@ public final class GraphJobRunner<V exte
}
}
-
return msgMap;
}
@@ -541,3 +533,4 @@ public final class GraphJobRunner<V exte
}
}
+