You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@hama.apache.org by ed...@apache.org on 2015/04/23 01:24:05 UTC
svn commit: r1675515 -
/hama/trunk/graph/src/main/java/org/apache/hama/graph/GraphJobRunner.java
Author: edwardyoon
Date: Wed Apr 22 23:24:05 2015
New Revision: 1675515
URL: http://svn.apache.org/r1675515
Log:
minor improvements
Modified:
hama/trunk/graph/src/main/java/org/apache/hama/graph/GraphJobRunner.java
Modified: hama/trunk/graph/src/main/java/org/apache/hama/graph/GraphJobRunner.java
URL: http://svn.apache.org/viewvc/hama/trunk/graph/src/main/java/org/apache/hama/graph/GraphJobRunner.java?rev=1675515&r1=1675514&r2=1675515&view=diff
==============================================================================
--- hama/trunk/graph/src/main/java/org/apache/hama/graph/GraphJobRunner.java (original)
+++ hama/trunk/graph/src/main/java/org/apache/hama/graph/GraphJobRunner.java Wed Apr 22 23:24:05 2015
@@ -264,7 +264,7 @@ public final class GraphJobRunner<V exte
executor.shutdown();
try {
- executor.awaitTermination(10000L, TimeUnit.MILLISECONDS);
+ executor.awaitTermination(60, TimeUnit.SECONDS);
} catch (InterruptedException e) {
LOG.error(e);
}
@@ -313,7 +313,7 @@ public final class GraphJobRunner<V exte
executor.shutdown();
try {
- executor.awaitTermination(10000L, TimeUnit.MILLISECONDS);
+ executor.awaitTermination(60, TimeUnit.SECONDS);
} catch (InterruptedException e) {
LOG.error(e);
}
@@ -412,8 +412,7 @@ public final class GraphJobRunner<V exte
EDGE_VALUE_CLASS = edgeValueClass;
}
- private final ConcurrentHashMap<String, GraphJobMessage> messages = new ConcurrentHashMap<String, GraphJobMessage>();
- private VertexInputReader<Writable, Writable, V, E, M> reader;
+ private final ConcurrentHashMap<Integer, GraphJobMessage> messages = new ConcurrentHashMap<Integer, GraphJobMessage>();
/**
* Loads vertices into memory of each peer.
@@ -423,7 +422,7 @@ public final class GraphJobRunner<V exte
BSPPeer<Writable, Writable, Writable, Writable, GraphJobMessage> peer)
throws IOException, SyncException, InterruptedException {
- reader = (VertexInputReader<Writable, Writable, V, E, M>) ReflectionUtils
+ VertexInputReader<Writable, Writable, V, E, M> reader = (VertexInputReader<Writable, Writable, V, E, M>) ReflectionUtils
.newInstance(conf.getClass(Constants.RUNTIME_PARTITION_RECORDCONVERTER,
VertexInputReader.class));
@@ -435,17 +434,8 @@ public final class GraphJobRunner<V exte
try {
KeyValuePair<Writable, Writable> next = null;
while ((next = peer.readNext()) != null) {
- Vertex<V, E, M> vertex = GraphJobRunner
- .<V, E, M> newVertexInstance(VERTEX_CLASS);
-
- boolean vertexFinished = reader.parseVertex(next.getKey(),
- next.getValue(), vertex);
- if (!vertexFinished) {
- continue;
- }
-
- Runnable worker = new LoadWorker(vertex);
+ Runnable worker = new Parser(next.getKey(), next.getValue(), reader);
executor.execute(worker);
}
@@ -454,58 +444,95 @@ public final class GraphJobRunner<V exte
}
executor.shutdown();
- executor.awaitTermination(10000L, TimeUnit.MILLISECONDS);
+ executor.awaitTermination(60, TimeUnit.SECONDS);
- Iterator<Entry<String, GraphJobMessage>> it;
+ Iterator<Entry<Integer, GraphJobMessage>> it;
it = messages.entrySet().iterator();
while (it.hasNext()) {
- Entry<String, GraphJobMessage> e = it.next();
+ Entry<Integer, GraphJobMessage> e = it.next();
it.remove();
GraphJobMessage msg = e.getValue();
msg.setFlag(GraphJobMessage.PARTITION_FLAG);
- peer.send(e.getKey(), msg);
+ peer.send(getHostName(e.getKey()), msg);
}
peer.sync();
+ executor = (ThreadPoolExecutor) Executors.newCachedThreadPool();
+ executor.setMaximumPoolSize(conf.getInt(DEFAULT_THREAD_POOL_SIZE, 256));
+ executor.setRejectedExecutionHandler(retryHandler);
+
GraphJobMessage msg;
while ((msg = peer.getCurrentMessage()) != null) {
+ executor.execute(new AddVertex(msg));
+ }
+
+ executor.shutdown();
+ executor.awaitTermination(60, TimeUnit.SECONDS);
+
+ LOG.info(vertices.size() + " vertices are loaded into "
+ + peer.getPeerName());
+ }
+
+ class AddVertex implements Runnable {
+ GraphJobMessage msg;
+
+ public AddVertex(GraphJobMessage msg) {
+ this.msg = msg;
+ }
+
+ @Override
+ public void run() {
ByteArrayInputStream bis = new ByteArrayInputStream(msg.getValuesBytes());
DataInputStream dis = new DataInputStream(bis);
for (int i = 0; i < msg.getNumOfValues(); i++) {
- Vertex<V, E, M> vertex = newVertexInstance(VERTEX_CLASS);
- vertex.readFields(dis);
+ try {
+ Vertex<V, E, M> vertex = newVertexInstance(VERTEX_CLASS);
+ vertex.readFields(dis);
- addVertex(vertex);
+ addVertex(vertex);
+ } catch (IOException e) {
+ e.printStackTrace();
+ }
}
}
- LOG.info(vertices.size() + " vertices are loaded into "
- + peer.getPeerName());
}
- class LoadWorker implements Runnable {
- Vertex<V, E, M> vertex;
-
- public LoadWorker(Vertex<V, E, M> vertex) {
- this.vertex = vertex;
+ class Parser implements Runnable {
+ Writable key;
+ Writable value;
+ VertexInputReader<Writable, Writable, V, E, M> reader;
+
+ public Parser(Writable key, Writable value,
+ VertexInputReader<Writable, Writable, V, E, M> reader) {
+ this.key = key;
+ this.value = value;
+ this.reader = reader;
}
@Override
public void run() {
try {
- String dstHost = getHostName(vertex.getVertexID());
- if (peer.getPeerName().equals(dstHost)) {
- addVertex(vertex);
- } else {
- if (!messages.containsKey(dstHost)) {
- messages.putIfAbsent(dstHost, new GraphJobMessage());
+ Vertex<V, E, M> vertex = GraphJobRunner
+ .<V, E, M> newVertexInstance(VERTEX_CLASS);
+
+ boolean vertexFinished = reader.parseVertex(key, value, vertex);
+
+ if (vertexFinished) {
+ int partition = getPartitionID(vertex.getVertexID());
+
+ if (peer.getPeerIndex() == partition) {
+ addVertex(vertex);
+ } else {
+ if (!messages.containsKey(partition)) {
+ messages.putIfAbsent(partition, new GraphJobMessage());
+ }
+ messages.get(partition).add(serialize(vertex));
}
- messages.get(dstHost).add(serialize(vertex));
}
-
- } catch (IOException e) {
+ } catch (Exception e) {
e.printStackTrace();
}
}
@@ -751,6 +778,14 @@ public final class GraphJobRunner<V exte
peer.getNumPeers()));
}
+ public int getPartitionID(V vertexID) {
+ return partitioner.getPartition(vertexID, null, peer.getNumPeers());
+ }
+
+ public String getHostName(int partitionID) {
+ return peer.getPeerName(partitionID);
+ }
+
/**
* @return the number of vertices, globally accumulated.
*/