You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@hama.apache.org by ed...@apache.org on 2015/04/13 11:29:19 UTC

svn commit: r1673143 - in /hama/trunk/graph/src/main/java/org/apache/hama/graph: GraphJobRunner.java IncomingVertexMessageManager.java MapVerticesInfo.java

Author: edwardyoon
Date: Mon Apr 13 09:29:19 2015
New Revision: 1673143

URL: http://svn.apache.org/r1673143
Log:
load vertices concurrently

Modified:
    hama/trunk/graph/src/main/java/org/apache/hama/graph/GraphJobRunner.java
    hama/trunk/graph/src/main/java/org/apache/hama/graph/IncomingVertexMessageManager.java
    hama/trunk/graph/src/main/java/org/apache/hama/graph/MapVerticesInfo.java

Modified: hama/trunk/graph/src/main/java/org/apache/hama/graph/GraphJobRunner.java
URL: http://svn.apache.org/viewvc/hama/trunk/graph/src/main/java/org/apache/hama/graph/GraphJobRunner.java?rev=1673143&r1=1673142&r2=1673143&view=diff
==============================================================================
--- hama/trunk/graph/src/main/java/org/apache/hama/graph/GraphJobRunner.java (original)
+++ hama/trunk/graph/src/main/java/org/apache/hama/graph/GraphJobRunner.java Mon Apr 13 09:29:19 2015
@@ -437,6 +437,8 @@ public final class GraphJobRunner<V exte
     try {
       KeyValuePair<Writable, Writable> next = null;
       while ((next = peer.readNext()) != null) {
+        // TODO read sequentially, and convert records using thread.
+        
         Vertex<V, E, M> vertex = GraphJobRunner
             .<V, E, M> newVertexInstance(VERTEX_CLASS);
 
@@ -445,23 +447,64 @@ public final class GraphJobRunner<V exte
         if (!vertexFinished) {
           continue;
         }
-        peer.send(getHostName(vertex.getVertexID()),
-            new GraphJobMessage(vertex));
+
+        String dstHost = getHostName(vertex.getVertexID());
+        if (peer.getPeerName().equals(dstHost)) {
+          addVertex(vertex);
+        } else {
+          peer.send(dstHost, new GraphJobMessage(vertex));
+        }
       }
     } catch (Exception e) {
       e.printStackTrace();
     }
     peer.sync();
 
-    GraphJobMessage received;
-    while ((received = peer.getCurrentMessage()) != null) {
-      addVertex((Vertex<V, E, M>) received.getVertex());
+    List<List<GraphJobMessage>> subLists = peer.getSubLists(conf.getInt(
+        "hama.graph.thread.num", 100));
+    List<Thread> runners = new ArrayList<Thread>(subLists.size());
+
+    for (List<GraphJobMessage> subList : subLists) {
+      runners.add(new LoadReceivedMessage(subList));
+    }
+
+    for (Thread computer : runners) {
+      computer.start();
+    }
+
+    for (Thread computer : runners) {
+      try {
+        computer.join();
+      } catch (InterruptedException e) {
+        e.printStackTrace();
+      }
     }
 
     LOG.info(vertices.size() + " vertices are loaded into "
         + peer.getPeerName());
   }
 
+  class LoadReceivedMessage extends Thread {
+    List<GraphJobMessage> subList;
+
+    public LoadReceivedMessage(List<GraphJobMessage> subList) {
+      this.subList = subList;
+    }
+
+    @SuppressWarnings("unchecked")
+    @Override
+    public void run() {
+      try {
+        for (GraphJobMessage msg : subList) {
+          System.out.println("adding vertex ");
+          addVertex((Vertex<V, E, M>) msg.getVertex());
+        }
+      } catch (IOException e) {
+        e.printStackTrace();
+      }
+    }
+  }
+
   /**
    * Add new vertex into memory of each peer.
    * 

Modified: hama/trunk/graph/src/main/java/org/apache/hama/graph/IncomingVertexMessageManager.java
URL: http://svn.apache.org/viewvc/hama/trunk/graph/src/main/java/org/apache/hama/graph/IncomingVertexMessageManager.java?rev=1673143&r1=1673142&r2=1673143&view=diff
==============================================================================
--- hama/trunk/graph/src/main/java/org/apache/hama/graph/IncomingVertexMessageManager.java (original)
+++ hama/trunk/graph/src/main/java/org/apache/hama/graph/IncomingVertexMessageManager.java Mon Apr 13 09:29:19 2015
@@ -17,6 +17,7 @@
  */
 package org.apache.hama.graph;
 
+import java.util.ArrayList;
 import java.util.List;
 import java.util.concurrent.ConcurrentLinkedQueue;
 
@@ -27,6 +28,8 @@ import org.apache.hama.bsp.TaskAttemptID
 import org.apache.hama.bsp.message.queue.MessageQueue;
 import org.apache.hama.bsp.message.queue.SynchronizedQueue;
 
+import com.google.common.collect.Lists;
+
 public class IncomingVertexMessageManager<M extends WritableComparable<M>>
     implements SynchronizedQueue<GraphJobMessage> {
 
@@ -49,7 +52,7 @@ public class IncomingVertexMessageManage
   public void addBundle(BSPMessageBundle<GraphJobMessage> bundle) {
     addAll(bundle);
   }
-  
+
   @Override
   public void addAll(Iterable<GraphJobMessage> col) {
     for (GraphJobMessage m : col)
@@ -110,7 +113,10 @@ public class IncomingVertexMessageManage
 
   @Override
   public List<List<GraphJobMessage>> getSubLists(int num) {
-    return msgPerVertex.getSubLists(num);
+    if (mapMessages.size() > 0)
+      return Lists.partition(new ArrayList<GraphJobMessage>(mapMessages), num);
+    else
+      return msgPerVertex.getSubLists(num);
   }
 
 }

Modified: hama/trunk/graph/src/main/java/org/apache/hama/graph/MapVerticesInfo.java
URL: http://svn.apache.org/viewvc/hama/trunk/graph/src/main/java/org/apache/hama/graph/MapVerticesInfo.java?rev=1673143&r1=1673142&r2=1673143&view=diff
==============================================================================
--- hama/trunk/graph/src/main/java/org/apache/hama/graph/MapVerticesInfo.java (original)
+++ hama/trunk/graph/src/main/java/org/apache/hama/graph/MapVerticesInfo.java Mon Apr 13 09:29:19 2015
@@ -19,11 +19,11 @@ package org.apache.hama.graph;
 
 import java.io.IOException;
 import java.util.Collection;
-import java.util.HashMap;
 import java.util.HashSet;
 import java.util.Iterator;
 import java.util.Map;
 import java.util.Set;
+import java.util.concurrent.ConcurrentHashMap;
 
 import org.apache.hadoop.io.Writable;
 import org.apache.hadoop.io.WritableComparable;
@@ -44,7 +44,7 @@ import com.google.common.collect.Sets;
  */
 public final class MapVerticesInfo<V extends WritableComparable<V>, E extends Writable, M extends Writable>
     implements VerticesInfo<V, E, M> {
-  private final Map<V, Vertex<V, E, M>> vertices = new HashMap<V, Vertex<V, E, M>>();
+  private final Map<V, Vertex<V, E, M>> vertices = new ConcurrentHashMap<V, Vertex<V, E, M>>();
 
   private Set<V> computedVertices;