You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@hama.apache.org by ed...@apache.org on 2015/04/23 03:40:31 UTC
svn commit: r1675531 -
/hama/trunk/graph/src/main/java/org/apache/hama/graph/GraphJobRunner.java
Author: edwardyoon
Date: Thu Apr 23 01:40:31 2015
New Revision: 1675531
URL: http://svn.apache.org/r1675531
Log:
minor improvements
Modified:
hama/trunk/graph/src/main/java/org/apache/hama/graph/GraphJobRunner.java
Modified: hama/trunk/graph/src/main/java/org/apache/hama/graph/GraphJobRunner.java
URL: http://svn.apache.org/viewvc/hama/trunk/graph/src/main/java/org/apache/hama/graph/GraphJobRunner.java?rev=1675531&r1=1675530&r2=1675531&view=diff
==============================================================================
--- hama/trunk/graph/src/main/java/org/apache/hama/graph/GraphJobRunner.java (original)
+++ hama/trunk/graph/src/main/java/org/apache/hama/graph/GraphJobRunner.java Thu Apr 23 01:40:31 2015
@@ -422,6 +422,10 @@ public final class GraphJobRunner<V exte
BSPPeer<Writable, Writable, Writable, Writable, GraphJobMessage> peer)
throws IOException, SyncException, InterruptedException {
+ for (int i = 0; i < peer.getNumPeers(); i++) {
+ messages.put(i, new GraphJobMessage());
+ }
+
VertexInputReader<Writable, Writable, V, E, M> reader = (VertexInputReader<Writable, Writable, V, E, M>) ReflectionUtils
.newInstance(conf.getClass(Constants.RUNTIME_PARTITION_RECORDCONVERTER,
VertexInputReader.class));
@@ -434,8 +438,16 @@ public final class GraphJobRunner<V exte
try {
KeyValuePair<Writable, Writable> next = null;
while ((next = peer.readNext()) != null) {
+ Vertex<V, E, M> vertex = GraphJobRunner
+ .<V, E, M> newVertexInstance(VERTEX_CLASS);
+
+ boolean vertexFinished = reader.parseVertex(next.getKey(),
+ next.getValue(), vertex);
- Runnable worker = new Parser(next.getKey(), next.getValue(), reader);
+ if (!vertexFinished) {
+ continue;
+ }
+ Runnable worker = new Parser(vertex);
executor.execute(worker);
}
@@ -501,36 +513,21 @@ public final class GraphJobRunner<V exte
}
class Parser implements Runnable {
- Writable key;
- Writable value;
- VertexInputReader<Writable, Writable, V, E, M> reader;
-
- public Parser(Writable key, Writable value,
- VertexInputReader<Writable, Writable, V, E, M> reader) {
- this.key = key;
- this.value = value;
- this.reader = reader;
+ Vertex<V, E, M> vertex;
+
+ public Parser(Vertex<V, E, M> vertex) {
+ this.vertex = vertex;
}
@Override
public void run() {
try {
- Vertex<V, E, M> vertex = GraphJobRunner
- .<V, E, M> newVertexInstance(VERTEX_CLASS);
+ int partition = getPartitionID(vertex.getVertexID());
- boolean vertexFinished = reader.parseVertex(key, value, vertex);
-
- if (vertexFinished) {
- int partition = getPartitionID(vertex.getVertexID());
-
- if (peer.getPeerIndex() == partition) {
- addVertex(vertex);
- } else {
- if (!messages.containsKey(partition)) {
- messages.putIfAbsent(partition, new GraphJobMessage());
- }
- messages.get(partition).add(serialize(vertex));
- }
+ if (peer.getPeerIndex() == partition) {
+ addVertex(vertex);
+ } else {
+ messages.get(partition).add(serialize(vertex));
}
} catch (Exception e) {
e.printStackTrace();