You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@hama.apache.org by ed...@apache.org on 2015/04/23 03:40:31 UTC

svn commit: r1675531 - /hama/trunk/graph/src/main/java/org/apache/hama/graph/GraphJobRunner.java

Author: edwardyoon
Date: Thu Apr 23 01:40:31 2015
New Revision: 1675531

URL: http://svn.apache.org/r1675531
Log:
minor improvements

Modified:
    hama/trunk/graph/src/main/java/org/apache/hama/graph/GraphJobRunner.java

Modified: hama/trunk/graph/src/main/java/org/apache/hama/graph/GraphJobRunner.java
URL: http://svn.apache.org/viewvc/hama/trunk/graph/src/main/java/org/apache/hama/graph/GraphJobRunner.java?rev=1675531&r1=1675530&r2=1675531&view=diff
==============================================================================
--- hama/trunk/graph/src/main/java/org/apache/hama/graph/GraphJobRunner.java (original)
+++ hama/trunk/graph/src/main/java/org/apache/hama/graph/GraphJobRunner.java Thu Apr 23 01:40:31 2015
@@ -422,6 +422,10 @@ public final class GraphJobRunner<V exte
       BSPPeer<Writable, Writable, Writable, Writable, GraphJobMessage> peer)
       throws IOException, SyncException, InterruptedException {
 
+    for (int i = 0; i < peer.getNumPeers(); i++) {
+      messages.put(i, new GraphJobMessage());
+    }
+
     VertexInputReader<Writable, Writable, V, E, M> reader = (VertexInputReader<Writable, Writable, V, E, M>) ReflectionUtils
         .newInstance(conf.getClass(Constants.RUNTIME_PARTITION_RECORDCONVERTER,
             VertexInputReader.class));
@@ -434,8 +438,16 @@ public final class GraphJobRunner<V exte
     try {
       KeyValuePair<Writable, Writable> next = null;
       while ((next = peer.readNext()) != null) {
+        Vertex<V, E, M> vertex = GraphJobRunner
+            .<V, E, M> newVertexInstance(VERTEX_CLASS);
+
+        boolean vertexFinished = reader.parseVertex(next.getKey(),
+            next.getValue(), vertex);
 
-        Runnable worker = new Parser(next.getKey(), next.getValue(), reader);
+        if (!vertexFinished) {
+          continue;
+        }
+        Runnable worker = new Parser(vertex);
         executor.execute(worker);
 
       }
@@ -501,36 +513,21 @@ public final class GraphJobRunner<V exte
   }
 
   class Parser implements Runnable {
-    Writable key;
-    Writable value;
-    VertexInputReader<Writable, Writable, V, E, M> reader;
-
-    public Parser(Writable key, Writable value,
-        VertexInputReader<Writable, Writable, V, E, M> reader) {
-      this.key = key;
-      this.value = value;
-      this.reader = reader;
+    Vertex<V, E, M> vertex;
+
+    public Parser(Vertex<V, E, M> vertex) {
+      this.vertex = vertex;
     }
 
     @Override
     public void run() {
       try {
-        Vertex<V, E, M> vertex = GraphJobRunner
-            .<V, E, M> newVertexInstance(VERTEX_CLASS);
+        int partition = getPartitionID(vertex.getVertexID());
 
-        boolean vertexFinished = reader.parseVertex(key, value, vertex);
-
-        if (vertexFinished) {
-          int partition = getPartitionID(vertex.getVertexID());
-
-          if (peer.getPeerIndex() == partition) {
-            addVertex(vertex);
-          } else {
-            if (!messages.containsKey(partition)) {
-              messages.putIfAbsent(partition, new GraphJobMessage());
-            }
-            messages.get(partition).add(serialize(vertex));
-          }
+        if (peer.getPeerIndex() == partition) {
+          addVertex(vertex);
+        } else {
+          messages.get(partition).add(serialize(vertex));
         }
       } catch (Exception e) {
         e.printStackTrace();