You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@hama.apache.org by ed...@apache.org on 2015/04/13 11:29:19 UTC
svn commit: r1673143 - in
/hama/trunk/graph/src/main/java/org/apache/hama/graph: GraphJobRunner.java
IncomingVertexMessageManager.java MapVerticesInfo.java
Author: edwardyoon
Date: Mon Apr 13 09:29:19 2015
New Revision: 1673143
URL: http://svn.apache.org/r1673143
Log:
load vertices concurrently
Modified:
hama/trunk/graph/src/main/java/org/apache/hama/graph/GraphJobRunner.java
hama/trunk/graph/src/main/java/org/apache/hama/graph/IncomingVertexMessageManager.java
hama/trunk/graph/src/main/java/org/apache/hama/graph/MapVerticesInfo.java
Modified: hama/trunk/graph/src/main/java/org/apache/hama/graph/GraphJobRunner.java
URL: http://svn.apache.org/viewvc/hama/trunk/graph/src/main/java/org/apache/hama/graph/GraphJobRunner.java?rev=1673143&r1=1673142&r2=1673143&view=diff
==============================================================================
--- hama/trunk/graph/src/main/java/org/apache/hama/graph/GraphJobRunner.java (original)
+++ hama/trunk/graph/src/main/java/org/apache/hama/graph/GraphJobRunner.java Mon Apr 13 09:29:19 2015
@@ -437,6 +437,8 @@ public final class GraphJobRunner<V exte
try {
KeyValuePair<Writable, Writable> next = null;
while ((next = peer.readNext()) != null) {
+ // TODO read sequentially, and convert records using thread.
+
Vertex<V, E, M> vertex = GraphJobRunner
.<V, E, M> newVertexInstance(VERTEX_CLASS);
@@ -445,23 +447,64 @@ public final class GraphJobRunner<V exte
if (!vertexFinished) {
continue;
}
- peer.send(getHostName(vertex.getVertexID()),
- new GraphJobMessage(vertex));
+
+ String dstHost = getHostName(vertex.getVertexID());
+ if (peer.getPeerName().equals(dstHost)) {
+ addVertex(vertex);
+ } else {
+ peer.send(dstHost, new GraphJobMessage(vertex));
+ }
}
} catch (Exception e) {
e.printStackTrace();
}
peer.sync();
- GraphJobMessage received;
- while ((received = peer.getCurrentMessage()) != null) {
- addVertex((Vertex<V, E, M>) received.getVertex());
+ List<List<GraphJobMessage>> subLists = peer.getSubLists(conf.getInt(
+ "hama.graph.thread.num", 100));
+ List<Thread> runners = new ArrayList<Thread>(subLists.size());
+
+ for (List<GraphJobMessage> subList : subLists) {
+ runners.add(new LoadReceivedMessage(subList));
+ }
+
+ for (Thread computer : runners) {
+ computer.start();
+ }
+
+ for (Thread computer : runners) {
+ try {
+ computer.join();
+ } catch (InterruptedException e) {
+ e.printStackTrace();
+ }
}
LOG.info(vertices.size() + " vertices are loaded into "
+ peer.getPeerName());
}
+ class LoadReceivedMessage extends Thread {
+ List<GraphJobMessage> subList;
+
+ public LoadReceivedMessage(List<GraphJobMessage> subList) {
+ this.subList = subList;
+ }
+
+ @SuppressWarnings("unchecked")
+ @Override
+ public void run() {
+ try {
+ for (GraphJobMessage msg : subList) {
+ System.out.println("adding vertex ");
+ addVertex((Vertex<V, E, M>) msg.getVertex());
+ }
+ } catch (IOException e) {
+ e.printStackTrace();
+ }
+ }
+ }
+
/**
* Add new vertex into memory of each peer.
*
Modified: hama/trunk/graph/src/main/java/org/apache/hama/graph/IncomingVertexMessageManager.java
URL: http://svn.apache.org/viewvc/hama/trunk/graph/src/main/java/org/apache/hama/graph/IncomingVertexMessageManager.java?rev=1673143&r1=1673142&r2=1673143&view=diff
==============================================================================
--- hama/trunk/graph/src/main/java/org/apache/hama/graph/IncomingVertexMessageManager.java (original)
+++ hama/trunk/graph/src/main/java/org/apache/hama/graph/IncomingVertexMessageManager.java Mon Apr 13 09:29:19 2015
@@ -17,6 +17,7 @@
*/
package org.apache.hama.graph;
+import java.util.ArrayList;
import java.util.List;
import java.util.concurrent.ConcurrentLinkedQueue;
@@ -27,6 +28,8 @@ import org.apache.hama.bsp.TaskAttemptID
import org.apache.hama.bsp.message.queue.MessageQueue;
import org.apache.hama.bsp.message.queue.SynchronizedQueue;
+import com.google.common.collect.Lists;
+
public class IncomingVertexMessageManager<M extends WritableComparable<M>>
implements SynchronizedQueue<GraphJobMessage> {
@@ -49,7 +52,7 @@ public class IncomingVertexMessageManage
public void addBundle(BSPMessageBundle<GraphJobMessage> bundle) {
addAll(bundle);
}
-
+
@Override
public void addAll(Iterable<GraphJobMessage> col) {
for (GraphJobMessage m : col)
@@ -110,7 +113,10 @@ public class IncomingVertexMessageManage
@Override
public List<List<GraphJobMessage>> getSubLists(int num) {
- return msgPerVertex.getSubLists(num);
+ if (mapMessages.size() > 0)
+ return Lists.partition(new ArrayList<GraphJobMessage>(mapMessages), num);
+ else
+ return msgPerVertex.getSubLists(num);
}
}
Modified: hama/trunk/graph/src/main/java/org/apache/hama/graph/MapVerticesInfo.java
URL: http://svn.apache.org/viewvc/hama/trunk/graph/src/main/java/org/apache/hama/graph/MapVerticesInfo.java?rev=1673143&r1=1673142&r2=1673143&view=diff
==============================================================================
--- hama/trunk/graph/src/main/java/org/apache/hama/graph/MapVerticesInfo.java (original)
+++ hama/trunk/graph/src/main/java/org/apache/hama/graph/MapVerticesInfo.java Mon Apr 13 09:29:19 2015
@@ -19,11 +19,11 @@ package org.apache.hama.graph;
import java.io.IOException;
import java.util.Collection;
-import java.util.HashMap;
import java.util.HashSet;
import java.util.Iterator;
import java.util.Map;
import java.util.Set;
+import java.util.concurrent.ConcurrentHashMap;
import org.apache.hadoop.io.Writable;
import org.apache.hadoop.io.WritableComparable;
@@ -44,7 +44,7 @@ import com.google.common.collect.Sets;
*/
public final class MapVerticesInfo<V extends WritableComparable<V>, E extends Writable, M extends Writable>
implements VerticesInfo<V, E, M> {
- private final Map<V, Vertex<V, E, M>> vertices = new HashMap<V, Vertex<V, E, M>>();
+ private final Map<V, Vertex<V, E, M>> vertices = new ConcurrentHashMap<V, Vertex<V, E, M>>();
private Set<V> computedVertices;