You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@hama.apache.org by ed...@apache.org on 2015/04/17 02:25:42 UTC
svn commit: r1674173 - in /hama/trunk:
core/src/main/java/org/apache/hama/bsp/
core/src/main/java/org/apache/hama/bsp/message/
core/src/test/java/org/apache/hama/bsp/
core/src/test/java/org/apache/hama/bsp/message/
core/src/test/java/org/apache/hama/bs...
Author: edwardyoon
Date: Fri Apr 17 00:25:42 2015
New Revision: 1674173
URL: http://svn.apache.org/r1674173
Log:
HAMA-951: Make BSPMessageBundle thread-safe
Modified:
hama/trunk/core/src/main/java/org/apache/hama/bsp/BSPMessageBundle.java
hama/trunk/core/src/main/java/org/apache/hama/bsp/BSPMessageBundleInterface.java
hama/trunk/core/src/main/java/org/apache/hama/bsp/LocalBSPRunner.java
hama/trunk/core/src/main/java/org/apache/hama/bsp/message/HamaAsyncMessageManagerImpl.java
hama/trunk/core/src/main/java/org/apache/hama/bsp/message/HamaMessageManagerImpl.java
hama/trunk/core/src/main/java/org/apache/hama/bsp/message/OutgoingPOJOMessageBundle.java
hama/trunk/core/src/test/java/org/apache/hama/bsp/TestBSPMessageBundle.java
hama/trunk/core/src/test/java/org/apache/hama/bsp/TestCheckpoint.java
hama/trunk/core/src/test/java/org/apache/hama/bsp/message/TestHamaAsyncMessageManager.java
hama/trunk/core/src/test/java/org/apache/hama/bsp/message/TestHamaMessageManager.java
hama/trunk/core/src/test/java/org/apache/hama/bsp/message/compress/TestBSPMessageCompressor.java
hama/trunk/examples/src/test/java/org/apache/hama/examples/PageRankTest.java
hama/trunk/graph/src/main/java/org/apache/hama/graph/GraphJobMessage.java
hama/trunk/graph/src/main/java/org/apache/hama/graph/GraphJobRunner.java
hama/trunk/graph/src/main/java/org/apache/hama/graph/MessagePerVertex.java
hama/trunk/graph/src/main/java/org/apache/hama/graph/OutgoingVertexMessageManager.java
Modified: hama/trunk/core/src/main/java/org/apache/hama/bsp/BSPMessageBundle.java
URL: http://svn.apache.org/viewvc/hama/trunk/core/src/main/java/org/apache/hama/bsp/BSPMessageBundle.java?rev=1674173&r1=1674172&r2=1674173&view=diff
==============================================================================
--- hama/trunk/core/src/main/java/org/apache/hama/bsp/BSPMessageBundle.java (original)
+++ hama/trunk/core/src/main/java/org/apache/hama/bsp/BSPMessageBundle.java Fri Apr 17 00:25:42 2015
@@ -17,20 +17,18 @@
*/
package org.apache.hama.bsp;
-import java.io.ByteArrayInputStream;
-import java.io.ByteArrayOutputStream;
import java.io.DataInput;
import java.io.DataOutput;
import java.io.IOException;
+import java.util.ArrayList;
+import java.util.Collection;
import java.util.Iterator;
+import java.util.List;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.apache.hadoop.io.Writable;
-
-import com.esotericsoftware.kryo.Kryo;
-import com.esotericsoftware.kryo.io.Input;
-import com.esotericsoftware.kryo.io.Output;
+import org.apache.hama.util.ReflectionUtils;
/**
* BSPMessageBundle stores a group of messages so that they can be sent in batch
@@ -42,15 +40,9 @@ public class BSPMessageBundle<M extends
public static final Log LOG = LogFactory.getLog(BSPMessageBundle.class);
- private String className = null;
- private int bundleSize = 0;
-
- private Kryo kryo = new Kryo();
- private ByteArrayOutputStream outputStream = new ByteArrayOutputStream();
- private Output output = new Output(outputStream, 4096);
+ private List<M> messages = new ArrayList<M>();
public BSPMessageBundle() {
- bundleSize = 0;
}
/**
@@ -59,115 +51,54 @@ public class BSPMessageBundle<M extends
* @param message BSPMessage to add.
*/
public void addMessage(M message) {
- if (className == null) {
- className = message.getClass().getName();
- kryo.register(message.getClass());
- }
-
- kryo.writeObject(output, message);
- bundleSize++;
- }
-
- public void addMessages(Iterator<M> iterator) {
- M message = iterator.next();
- if (className == null) {
- className = message.getClass().getName();
- kryo.register(message.getClass());
- }
-
- kryo.writeObject(output, message);
- bundleSize++;
-
- while(iterator.hasNext()) {
- kryo.writeObject(output, iterator.next());
- bundleSize++;
- }
- }
-
- public void finishAddition() {
- output.flush();
- }
-
- public byte[] getBuffer() {
- return outputStream.toByteArray();
+ messages.add(message);
}
- private ByteArrayInputStream bis = null;
- private Input in = null;
+ public void addMessages(Collection<M> msgs) {
+ messages.addAll(msgs);
+ }
public Iterator<M> iterator() {
- bis = new ByteArrayInputStream(outputStream.toByteArray());
- in = new Input(bis, 4096);
-
- Iterator<M> it = new Iterator<M>() {
- Class<M> clazz = null;
- int counter = 0;
-
- @Override
- public boolean hasNext() {
- if ((bundleSize - counter) > 0) {
- return true;
- } else {
- return false;
- }
- }
-
- @SuppressWarnings("unchecked")
- @Override
- public M next() {
- try {
- if (clazz == null) {
- clazz = (Class<M>) Class.forName(className);
- }
- } catch (ClassNotFoundException ce) {
- LOG.error("Class was not found.", ce);
- }
-
- counter++;
-
- return kryo.readObject(in, clazz);
- }
-
- @Override
- public void remove() {
- // TODO Auto-generated method stub
- }
- };
- return it;
+ return messages.iterator();
}
public int size() {
- return bundleSize;
- }
-
- /**
- * @return the byte length of messages
- * @throws IOException
- */
- public long getLength() {
- return outputStream.size();
+ return messages.size();
}
+ @SuppressWarnings("unchecked")
@Override
public void write(DataOutput out) throws IOException {
- out.writeInt(bundleSize);
- if (bundleSize > 0) {
- out.writeUTF(className);
- out.writeInt(outputStream.size());
- out.write(outputStream.toByteArray());
+ out.writeInt(messages.size());
+
+ if (messages.size() > 0) {
+ Class<M> clazz = (Class<M>) messages.get(0).getClass();
+ out.writeUTF(clazz.getName());
+
+ for (M m : messages) {
+ m.write(out);
+ }
}
}
+ @SuppressWarnings("unchecked")
@Override
public void readFields(DataInput in) throws IOException {
- this.bundleSize = in.readInt();
+ int num = in.readInt();
- if (this.bundleSize > 0) {
- className = in.readUTF();
- int bytesLength = in.readInt();
- byte[] temp = new byte[bytesLength];
- in.readFully(temp);
- outputStream.write(temp);
+ if (num > 0) {
+ Class<M> clazz = null;
+ try {
+ clazz = (Class<M>) Class.forName(in.readUTF());
+ } catch (ClassNotFoundException e) {
+ LOG.error("Class was not found.", e);
+ }
+
+ for (int i = 0; i < num; i++) {
+ M msg = ReflectionUtils.newInstance(clazz);
+ msg.readFields(in);
+ messages.add(msg);
+ }
}
}
Modified: hama/trunk/core/src/main/java/org/apache/hama/bsp/BSPMessageBundleInterface.java
URL: http://svn.apache.org/viewvc/hama/trunk/core/src/main/java/org/apache/hama/bsp/BSPMessageBundleInterface.java?rev=1674173&r1=1674172&r2=1674173&view=diff
==============================================================================
--- hama/trunk/core/src/main/java/org/apache/hama/bsp/BSPMessageBundleInterface.java (original)
+++ hama/trunk/core/src/main/java/org/apache/hama/bsp/BSPMessageBundleInterface.java Fri Apr 17 00:25:42 2015
@@ -17,7 +17,6 @@
*/
package org.apache.hama.bsp;
-import java.io.IOException;
import java.util.Iterator;
import org.apache.hadoop.io.Writable;
@@ -41,15 +40,4 @@ public interface BSPMessageBundleInterfa
*/
public Iterator<M> iterator();
- /**
- * @return the message buffer.
- */
- public byte[] getBuffer();
-
- /**
- * @return the total byte length of messages
- * @throws IOException
- */
- public long getLength();
-
}
Modified: hama/trunk/core/src/main/java/org/apache/hama/bsp/LocalBSPRunner.java
URL: http://svn.apache.org/viewvc/hama/trunk/core/src/main/java/org/apache/hama/bsp/LocalBSPRunner.java?rev=1674173&r1=1674172&r2=1674173&view=diff
==============================================================================
--- hama/trunk/core/src/main/java/org/apache/hama/bsp/LocalBSPRunner.java (original)
+++ hama/trunk/core/src/main/java/org/apache/hama/bsp/LocalBSPRunner.java Fri Apr 17 00:25:42 2015
@@ -349,8 +349,8 @@ public class LocalBSPRunner implements J
@Override
public void transfer(InetSocketAddress addr, BSPMessageBundle<M> bundle)
throws IOException {
- peer.incrementCounter(BSPPeerImpl.PeerCounter.TOTAL_MESSAGE_BYTES_TRANSFERED,
- bundle.getLength());
+ //peer.incrementCounter(BSPPeerImpl.PeerCounter.TOTAL_MESSAGE_BYTES_TRANSFERED,
+ // bundle.getLength());
MANAGER_MAP.get(addr).localQueueForNextIteration.addBundle(bundle);
peer.incrementCounter(BSPPeerImpl.PeerCounter.TOTAL_MESSAGES_RECEIVED,
Modified: hama/trunk/core/src/main/java/org/apache/hama/bsp/message/HamaAsyncMessageManagerImpl.java
URL: http://svn.apache.org/viewvc/hama/trunk/core/src/main/java/org/apache/hama/bsp/message/HamaAsyncMessageManagerImpl.java?rev=1674173&r1=1674172&r2=1674173&view=diff
==============================================================================
--- hama/trunk/core/src/main/java/org/apache/hama/bsp/message/HamaAsyncMessageManagerImpl.java (original)
+++ hama/trunk/core/src/main/java/org/apache/hama/bsp/message/HamaAsyncMessageManagerImpl.java Fri Apr 17 00:25:42 2015
@@ -133,8 +133,8 @@ public final class HamaAsyncMessageManag
compressed.length);
bspPeerConnection.put(compressed);
} else {
- peer.incrementCounter(BSPPeerImpl.PeerCounter.TOTAL_MESSAGE_BYTES_TRANSFERED,
- bundle.getLength());
+ //peer.incrementCounter(BSPPeerImpl.PeerCounter.TOTAL_MESSAGE_BYTES_TRANSFERED,
+ // bundle.getLength());
bspPeerConnection.put(bundle);
}
}
Modified: hama/trunk/core/src/main/java/org/apache/hama/bsp/message/HamaMessageManagerImpl.java
URL: http://svn.apache.org/viewvc/hama/trunk/core/src/main/java/org/apache/hama/bsp/message/HamaMessageManagerImpl.java?rev=1674173&r1=1674172&r2=1674173&view=diff
==============================================================================
--- hama/trunk/core/src/main/java/org/apache/hama/bsp/message/HamaMessageManagerImpl.java (original)
+++ hama/trunk/core/src/main/java/org/apache/hama/bsp/message/HamaMessageManagerImpl.java Fri Apr 17 00:25:42 2015
@@ -133,7 +133,7 @@ public final class HamaMessageManagerImp
peer.incrementCounter(BSPPeerImpl.PeerCounter.TOTAL_DECOMPRESSED_BYTES, byteBuffer.size());
bspPeerConnection.put(compressed);
} else {
- peer.incrementCounter(BSPPeerImpl.PeerCounter.TOTAL_MESSAGE_BYTES_TRANSFERED, bundle.getLength());
+ //peer.incrementCounter(BSPPeerImpl.PeerCounter.TOTAL_MESSAGE_BYTES_TRANSFERED, bundle.getLength());
bspPeerConnection.put(bundle);
}
}
Modified: hama/trunk/core/src/main/java/org/apache/hama/bsp/message/OutgoingPOJOMessageBundle.java
URL: http://svn.apache.org/viewvc/hama/trunk/core/src/main/java/org/apache/hama/bsp/message/OutgoingPOJOMessageBundle.java?rev=1674173&r1=1674172&r2=1674173&view=diff
==============================================================================
--- hama/trunk/core/src/main/java/org/apache/hama/bsp/message/OutgoingPOJOMessageBundle.java (original)
+++ hama/trunk/core/src/main/java/org/apache/hama/bsp/message/OutgoingPOJOMessageBundle.java Fri Apr 17 00:25:42 2015
@@ -70,9 +70,6 @@ public class OutgoingPOJOMessageBundle<M
@Override
public Iterator<Entry<InetSocketAddress, BSPMessageBundle<M>>> getBundleIterator() {
- for(BSPMessageBundle<M> b : outgoingBundles.values()) {
- b.finishAddition();
- }
return outgoingBundles.entrySet().iterator();
}
Modified: hama/trunk/core/src/test/java/org/apache/hama/bsp/TestBSPMessageBundle.java
URL: http://svn.apache.org/viewvc/hama/trunk/core/src/test/java/org/apache/hama/bsp/TestBSPMessageBundle.java?rev=1674173&r1=1674172&r2=1674173&view=diff
==============================================================================
--- hama/trunk/core/src/test/java/org/apache/hama/bsp/TestBSPMessageBundle.java (original)
+++ hama/trunk/core/src/test/java/org/apache/hama/bsp/TestBSPMessageBundle.java Fri Apr 17 00:25:42 2015
@@ -38,7 +38,6 @@ public class TestBSPMessageBundle extend
// Serialize it.
ByteArrayOutputStream baos = new ByteArrayOutputStream();
bundle.write(new DataOutputStream(baos));
- bundle.finishAddition();
baos.close();
// Deserialize it.
BSPMessageBundle<BytesWritable> readBundle = new BSPMessageBundle<BytesWritable>();
@@ -65,7 +64,6 @@ public class TestBSPMessageBundle extend
testMessages[i] = msg;
bundle.addMessage(testMessages[i]);
}
- bundle.finishAddition();
// Serialize it.
ByteArrayOutputStream baos = new ByteArrayOutputStream();
Modified: hama/trunk/core/src/test/java/org/apache/hama/bsp/TestCheckpoint.java
URL: http://svn.apache.org/viewvc/hama/trunk/core/src/test/java/org/apache/hama/bsp/TestCheckpoint.java?rev=1674173&r1=1674172&r2=1674173&view=diff
==============================================================================
--- hama/trunk/core/src/test/java/org/apache/hama/bsp/TestCheckpoint.java (original)
+++ hama/trunk/core/src/test/java/org/apache/hama/bsp/TestCheckpoint.java Fri Apr 17 00:25:42 2015
@@ -653,7 +653,6 @@ public class TestCheckpoint extends Test
BSPMessageBundle<Text> bundleRead = messenger.getLoopbackBundle();
assertEquals(5, bundleRead.size());
- bundleRead.finishAddition();
String recoveredMsg = bundleRead.iterator().next().toString();
assertEquals(recoveredMsg, "data");
Modified: hama/trunk/core/src/test/java/org/apache/hama/bsp/message/TestHamaAsyncMessageManager.java
URL: http://svn.apache.org/viewvc/hama/trunk/core/src/test/java/org/apache/hama/bsp/message/TestHamaAsyncMessageManager.java?rev=1674173&r1=1674172&r2=1674173&view=diff
==============================================================================
--- hama/trunk/core/src/test/java/org/apache/hama/bsp/message/TestHamaAsyncMessageManager.java (original)
+++ hama/trunk/core/src/test/java/org/apache/hama/bsp/message/TestHamaAsyncMessageManager.java Fri Apr 17 00:25:42 2015
@@ -91,8 +91,6 @@ public class TestHamaAsyncMessageManager
bundle.addMessage(it.next());
}
- bundle.finishAddition();
-
messageManager.transfer(peer, bundle);
messageManager.clearOutgoingMessages();
Modified: hama/trunk/core/src/test/java/org/apache/hama/bsp/message/TestHamaMessageManager.java
URL: http://svn.apache.org/viewvc/hama/trunk/core/src/test/java/org/apache/hama/bsp/message/TestHamaMessageManager.java?rev=1674173&r1=1674172&r2=1674173&view=diff
==============================================================================
--- hama/trunk/core/src/test/java/org/apache/hama/bsp/message/TestHamaMessageManager.java (original)
+++ hama/trunk/core/src/test/java/org/apache/hama/bsp/message/TestHamaMessageManager.java Fri Apr 17 00:25:42 2015
@@ -90,7 +90,6 @@ public class TestHamaMessageManager exte
while (it.hasNext()) {
bundle.addMessage(it.next());
}
- bundle.finishAddition();
messageManager.transfer(peer, bundle);
Modified: hama/trunk/core/src/test/java/org/apache/hama/bsp/message/compress/TestBSPMessageCompressor.java
URL: http://svn.apache.org/viewvc/hama/trunk/core/src/test/java/org/apache/hama/bsp/message/compress/TestBSPMessageCompressor.java?rev=1674173&r1=1674172&r2=1674173&view=diff
==============================================================================
--- hama/trunk/core/src/test/java/org/apache/hama/bsp/message/compress/TestBSPMessageCompressor.java (original)
+++ hama/trunk/core/src/test/java/org/apache/hama/bsp/message/compress/TestBSPMessageCompressor.java Fri Apr 17 00:25:42 2015
@@ -50,8 +50,6 @@ public class TestBSPMessageCompressor ex
a.addMessage(new IntWritable(i));
}
- a.finishAddition();
-
ByteArrayOutputStream byteBuffer = new ByteArrayOutputStream();
DataOutputStream bufferDos = new DataOutputStream(byteBuffer);
a.write(bufferDos);
Modified: hama/trunk/examples/src/test/java/org/apache/hama/examples/PageRankTest.java
URL: http://svn.apache.org/viewvc/hama/trunk/examples/src/test/java/org/apache/hama/examples/PageRankTest.java?rev=1674173&r1=1674172&r2=1674173&view=diff
==============================================================================
--- hama/trunk/examples/src/test/java/org/apache/hama/examples/PageRankTest.java (original)
+++ hama/trunk/examples/src/test/java/org/apache/hama/examples/PageRankTest.java Fri Apr 17 00:25:42 2015
@@ -83,7 +83,7 @@ public class PageRankTest extends TestCa
private void generateTestData() {
try {
- FastGraphGen.main(new String[] { "-v", "60", "-e", "3", "-output_path",
+ FastGraphGen.main(new String[] { "-v", "30", "-e", "3", "-output_path",
INPUT, "-task_num", "3", "-of", "json"});
} catch (Exception e) {
e.printStackTrace();
Modified: hama/trunk/graph/src/main/java/org/apache/hama/graph/GraphJobMessage.java
URL: http://svn.apache.org/viewvc/hama/trunk/graph/src/main/java/org/apache/hama/graph/GraphJobMessage.java?rev=1674173&r1=1674172&r2=1674173&view=diff
==============================================================================
--- hama/trunk/graph/src/main/java/org/apache/hama/graph/GraphJobMessage.java (original)
+++ hama/trunk/graph/src/main/java/org/apache/hama/graph/GraphJobMessage.java Fri Apr 17 00:25:42 2015
@@ -55,7 +55,6 @@ public final class GraphJobMessage imple
private WritableComparable vertexId;
private IntWritable integerMessage;
private static GraphJobMessageComparator comparator;
- private Vertex<?, ?, ?> vertex;
private int numOfValues = 0;
@@ -91,27 +90,10 @@ public final class GraphJobMessage imple
addAll(values);
}
- public GraphJobMessage(WritableComparable<?> vertexID, byte[] valuesBytes,
- int numOfValues) {
- this.flag = VERTEX_FLAG;
- this.vertexId = vertexID;
- try {
- this.byteBuffer.write(valuesBytes);
- } catch (IOException e) {
- e.printStackTrace();
- }
-
- this.numOfValues = numOfValues;
- }
-
public MapWritable getMap() {
return map;
}
- public Vertex<?, ?, ?> getVertex() {
- return vertex;
- }
-
public WritableComparable<?> getVertexId() {
return vertexId;
}
@@ -169,7 +151,8 @@ public final class GraphJobMessage imple
public GraphJobMessage(Vertex<?, ?, ?> vertex) {
this.flag = PARTITION_FLAG;
- this.vertex = vertex;
+
+ add(vertex);
}
@Override
@@ -188,7 +171,9 @@ public final class GraphJobMessage imple
} else if (isVerticesSizeMessage()) {
integerMessage.write(out);
} else if (isPartitioningMessage()) {
- vertex.write(out);
+ out.writeInt(numOfValues);
+ out.writeInt(byteBuffer.size());
+ out.write(byteBuffer.toByteArray());
} else {
vertexId.write(out);
}
@@ -235,8 +220,11 @@ public final class GraphJobMessage imple
integerMessage = new IntWritable();
integerMessage.readFields(in);
} else if (isPartitioningMessage()) {
- vertex = (Vertex<?, ?, ?>) ReflectionUtils.newInstance(GraphJobRunner.VERTEX_CLASS, null);
- vertex.readFields(in);
+ this.numOfValues = in.readInt();
+ int bytesLength = in.readInt();
+ byte[] temp = new byte[bytesLength];
+ in.readFully(temp);
+ byteBuffer.write(temp);
} else {
vertexId = ReflectionUtils.newInstance(GraphJobRunner.VERTEX_ID_CLASS,
null);
@@ -296,7 +284,7 @@ public final class GraphJobMessage imple
return "#Vertices: " + integerMessage;
} else {
return "GraphJobMessage [flag=" + flag + ", map=" + map + ", vertexId="
- + vertexId + ", vertexValue=" + numOfValues + ", " + vertex.toString() + "]";
+ + vertexId + ", vertexValue=" + numOfValues + "]";
}
}
Modified: hama/trunk/graph/src/main/java/org/apache/hama/graph/GraphJobRunner.java
URL: http://svn.apache.org/viewvc/hama/trunk/graph/src/main/java/org/apache/hama/graph/GraphJobRunner.java?rev=1674173&r1=1674172&r2=1674173&view=diff
==============================================================================
--- hama/trunk/graph/src/main/java/org/apache/hama/graph/GraphJobRunner.java (original)
+++ hama/trunk/graph/src/main/java/org/apache/hama/graph/GraphJobRunner.java Fri Apr 17 00:25:42 2015
@@ -17,9 +17,13 @@
*/
package org.apache.hama.graph;
+import java.io.ByteArrayInputStream;
+import java.io.DataInputStream;
import java.io.IOException;
import java.util.Collections;
+import java.util.HashMap;
import java.util.Iterator;
+import java.util.Map;
import java.util.Map.Entry;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
@@ -278,12 +282,12 @@ public final class GraphJobRunner<V exte
this.changedVertexCnt = 0;
vertices.startSuperstep();
- ExecutorService executor = Executors.newFixedThreadPool((vertices.size()
- / conf.getInt("hama.graph.threadpool.percentage", 10)) + 1);
+ ExecutorService executor = Executors
+ .newFixedThreadPool((vertices.size() / conf.getInt(
+ "hama.graph.threadpool.percentage", 10)) + 1);
for (Vertex<V, E, M> v : vertices.getValues()) {
- Runnable worker = new ComputeRunnable(v, Collections.singleton(v
- .getValue()));
+ Runnable worker = new ComputeRunnable(v, null);
executor.execute(worker);
}
executor.shutdown();
@@ -308,8 +312,10 @@ public final class GraphJobRunner<V exte
public void run() {
try {
// call once at initial superstep
- if(iteration == 0)
+ if (iteration == 0) {
vertex.setup(conf);
+ msgs = Collections.singleton(vertex.getValue());
+ }
vertex.compute(msgs);
vertices.finishVertexComputation(vertex);
@@ -370,6 +376,8 @@ public final class GraphJobRunner<V exte
EDGE_VALUE_CLASS = edgeValueClass;
}
+ Map<String, GraphJobMessage> messages = new HashMap<String, GraphJobMessage>();
+
/**
* Loads vertices into memory of each peer.
*/
@@ -377,6 +385,9 @@ public final class GraphJobRunner<V exte
private void loadVertices(
BSPPeer<Writable, Writable, Writable, Writable, GraphJobMessage> peer)
throws IOException, SyncException, InterruptedException {
+ // kryo.register(GraphJobRunner
+ // .<V, E, M> newVertexInstance(VERTEX_CLASS).getClass());
+
VertexInputReader<Writable, Writable, V, E, M> reader = (VertexInputReader<Writable, Writable, V, E, M>) ReflectionUtils
.newInstance(conf.getClass(Constants.RUNTIME_PARTITION_RECORDCONVERTER,
VertexInputReader.class));
@@ -401,19 +412,38 @@ public final class GraphJobRunner<V exte
Runnable worker = new LoadWorker(vertex);
executor.execute(worker);
} else {
- peer.send(dstHost, new GraphJobMessage(vertex));
+ if (!messages.containsKey(dstHost)) {
+ messages.put(dstHost, new GraphJobMessage(vertex));
+ } else {
+ messages.get(dstHost).add(vertex);
+ }
}
}
} catch (Exception e) {
e.printStackTrace();
}
+ for (Entry<String, GraphJobMessage> e : messages.entrySet()) {
+ peer.send(e.getKey(), e.getValue());
+ }
+ messages.clear();
+ messages = null;
+
peer.sync();
GraphJobMessage msg;
while ((msg = peer.getCurrentMessage()) != null) {
- Runnable worker = new LoadWorker((Vertex<V, E, M>) msg.getVertex());
- executor.execute(worker);
+ ByteArrayInputStream bis = new ByteArrayInputStream(msg.getValuesBytes());
+ DataInputStream dis = new DataInputStream(bis);
+
+ for (int i = 0; i < msg.getNumOfValues(); i++) {
+ Vertex<V, E, M> vertex = GraphJobRunner
+ .<V, E, M> newVertexInstance(VERTEX_CLASS);
+ vertex.readFields(dis);
+
+ Runnable worker = new LoadWorker(vertex);
+ executor.execute(worker);
+ }
}
executor.shutdown();
while (!executor.isTerminated()) {
Modified: hama/trunk/graph/src/main/java/org/apache/hama/graph/MessagePerVertex.java
URL: http://svn.apache.org/viewvc/hama/trunk/graph/src/main/java/org/apache/hama/graph/MessagePerVertex.java?rev=1674173&r1=1674172&r2=1674173&view=diff
==============================================================================
--- hama/trunk/graph/src/main/java/org/apache/hama/graph/MessagePerVertex.java (original)
+++ hama/trunk/graph/src/main/java/org/apache/hama/graph/MessagePerVertex.java Fri Apr 17 00:25:42 2015
@@ -17,7 +17,7 @@
*/
package org.apache.hama.graph;
-import java.util.Iterator;
+import java.util.Collection;
import java.util.List;
import java.util.concurrent.ConcurrentNavigableMap;
import java.util.concurrent.ConcurrentSkipListMap;
@@ -62,8 +62,8 @@ public class MessagePerVertex {
return storage.get(vertexID);
}
- public Iterator<GraphJobMessage> iterator() {
- return storage.values().iterator();
+ public Collection<GraphJobMessage> getMessages() {
+ return storage.values();
}
public GraphJobMessage pollFirstEntry() {
@@ -71,7 +71,7 @@ public class MessagePerVertex {
}
public List<List<GraphJobMessage>> getSubLists(int num) {
- return Lists.partition(Lists.newArrayList(iterator()), num);
+ return Lists.partition(Lists.newArrayList(storage.values()), num);
}
}
Modified: hama/trunk/graph/src/main/java/org/apache/hama/graph/OutgoingVertexMessageManager.java
URL: http://svn.apache.org/viewvc/hama/trunk/graph/src/main/java/org/apache/hama/graph/OutgoingVertexMessageManager.java?rev=1674173&r1=1674172&r2=1674173&view=diff
==============================================================================
--- hama/trunk/graph/src/main/java/org/apache/hama/graph/OutgoingVertexMessageManager.java (original)
+++ hama/trunk/graph/src/main/java/org/apache/hama/graph/OutgoingVertexMessageManager.java Fri Apr 17 00:25:42 2015
@@ -108,9 +108,8 @@ public class OutgoingVertexMessageManage
MessagePerVertex msgStorage = storage.get(bundle.getKey());
if (msgStorage != null) {
- bundle.getValue().addMessages(msgStorage.iterator());
+ bundle.getValue().addMessages(msgStorage.getMessages());
}
- bundle.getValue().finishAddition();
storage.remove(bundle.getKey());
return bundle;
}