You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@hama.apache.org by ed...@apache.org on 2015/02/05 05:15:50 UTC
svn commit: r1657473 - in
/hama/trunk/graph/src/main/java/org/apache/hama/graph: GraphJobMessage.java
GraphJobRunner.java IncomingVertexMessageManager.java MessagePerVertex.java
OutgoingVertexMessageManager.java
Author: edwardyoon
Date: Thu Feb 5 04:15:50 2015
New Revision: 1657473
URL: http://svn.apache.org/r1657473
Log:
This patch improves data structure of GraphJobMessage.
Modified:
hama/trunk/graph/src/main/java/org/apache/hama/graph/GraphJobMessage.java
hama/trunk/graph/src/main/java/org/apache/hama/graph/GraphJobRunner.java
hama/trunk/graph/src/main/java/org/apache/hama/graph/IncomingVertexMessageManager.java
hama/trunk/graph/src/main/java/org/apache/hama/graph/MessagePerVertex.java
hama/trunk/graph/src/main/java/org/apache/hama/graph/OutgoingVertexMessageManager.java
Modified: hama/trunk/graph/src/main/java/org/apache/hama/graph/GraphJobMessage.java
URL: http://svn.apache.org/viewvc/hama/trunk/graph/src/main/java/org/apache/hama/graph/GraphJobMessage.java?rev=1657473&r1=1657472&r2=1657473&view=diff
==============================================================================
--- hama/trunk/graph/src/main/java/org/apache/hama/graph/GraphJobMessage.java (original)
+++ hama/trunk/graph/src/main/java/org/apache/hama/graph/GraphJobMessage.java Thu Feb 5 04:15:50 2015
@@ -71,13 +71,6 @@ public final class GraphJobMessage imple
public GraphJobMessage() {
}
- public byte[] serialize(Writable message) throws IOException {
- ByteArrayOutputStream mbos = new ByteArrayOutputStream();
- DataOutputStream mdos = new DataOutputStream(mbos);
- message.write(mdos);
- return mbos.toByteArray();
- }
-
public GraphJobMessage(MapWritable map) {
this.flag = MAP_FLAG;
this.map = map;
@@ -97,6 +90,81 @@ public final class GraphJobMessage imple
addAll(values);
}
+ public GraphJobMessage(WritableComparable<?> vertexID, byte[] valuesBytes,
+ int numOfValues) {
+ this.flag = VERTEX_FLAG;
+ this.vertexId = vertexID;
+ try {
+ this.bufferDos.write(valuesBytes);
+ } catch (IOException e) {
+ e.printStackTrace();
+ }
+
+ this.numOfValues = numOfValues;
+ }
+
+ public MapWritable getMap() {
+ return map;
+ }
+
+ public WritableComparable<?> getVertexId() {
+ return vertexId;
+ }
+
+ private ByteArrayInputStream bis = null;
+ private DataInputStream dis = null;
+ List<Writable> valuesCache;
+
+ public List<Writable> getValues() {
+ bis = new ByteArrayInputStream(byteBuffer.toByteArray());
+ dis = new DataInputStream(bis);
+
+ valuesCache = new ArrayList<Writable>();
+
+ if (valuesCache.isEmpty()) {
+ for (int i = 0; i < numOfValues; i++) {
+ try {
+ Writable v = GraphJobRunner.createVertexValue();
+ v.readFields(dis);
+ valuesCache.add(v);
+ } catch (IOException e) {
+ e.printStackTrace();
+ }
+ }
+ }
+
+ return valuesCache;
+ }
+
+ public byte[] getValuesBytes() {
+ return byteBuffer.toByteArray();
+ }
+
+ public void addValuesBytes(byte[] values, int numOfValues) {
+ try {
+ bufferDos.write(values);
+ this.numOfValues += numOfValues;
+ } catch (IOException e) {
+ // TODO Auto-generated catch block
+ e.printStackTrace();
+ }
+ }
+
+ public void add(Writable value) {
+ try {
+ value.write(bufferDos);
+ numOfValues++;
+ } catch (IOException e) {
+ // TODO Auto-generated catch block
+ e.printStackTrace();
+ }
+ }
+
+ public void addAll(List<Writable> values) {
+ for (Writable v : values)
+ add(v);
+ }
+
public GraphJobMessage(IntWritable size) {
this.flag = VERTICES_SIZE_FLAG;
this.integerMessage = size;
@@ -185,53 +253,6 @@ public final class GraphJobMessage imple
return 0;
}
- public MapWritable getMap() {
- return map;
- }
-
- @SuppressWarnings("rawtypes")
- public WritableComparable getVertexId() {
- return vertexId;
- }
-
- private ByteArrayInputStream bis = null;
- private DataInputStream dis = null;
-
- public List<Writable> getVertexValue() {
- bis = new ByteArrayInputStream(byteBuffer.toByteArray());
- dis = new DataInputStream(bis);
-
- List<Writable> valuesCache = new ArrayList<Writable>();
-
- for (int i = 0; i < numOfValues; i++) {
- try {
- Writable v = GraphJobRunner.createVertexValue();
- v.readFields(dis);
- valuesCache.add(v);
- } catch (IOException e) {
- System.out.println(i + ", " + numOfValues);
- e.printStackTrace();
- }
- }
-
- return valuesCache;
- }
-
- public void add(Writable value) {
- try {
- bufferDos.write(serialize(value));
- numOfValues++;
- } catch (IOException e) {
- // TODO Auto-generated catch block
- e.printStackTrace();
- }
- }
-
- public void addAll(List<Writable> values) {
- for (Writable v : values)
- add(v);
- }
-
/**
* @return the number of values
*/
Modified: hama/trunk/graph/src/main/java/org/apache/hama/graph/GraphJobRunner.java
URL: http://svn.apache.org/viewvc/hama/trunk/graph/src/main/java/org/apache/hama/graph/GraphJobRunner.java?rev=1657473&r1=1657472&r2=1657473&view=diff
==============================================================================
--- hama/trunk/graph/src/main/java/org/apache/hama/graph/GraphJobRunner.java (original)
+++ hama/trunk/graph/src/main/java/org/apache/hama/graph/GraphJobRunner.java Thu Feb 5 04:15:50 2015
@@ -236,7 +236,7 @@ public final class GraphJobRunner<V exte
while (currentMessage != null) {
vertex = vertices.get((V) currentMessage.getVertexId());
- msgs = (List<M>) currentMessage.getVertexValue();
+ msgs = (List<M>) currentMessage.getValues();
if (vertex.isHalted()) {
vertex.setActive();
}
Modified: hama/trunk/graph/src/main/java/org/apache/hama/graph/IncomingVertexMessageManager.java
URL: http://svn.apache.org/viewvc/hama/trunk/graph/src/main/java/org/apache/hama/graph/IncomingVertexMessageManager.java?rev=1657473&r1=1657472&r2=1657473&view=diff
==============================================================================
--- hama/trunk/graph/src/main/java/org/apache/hama/graph/IncomingVertexMessageManager.java (original)
+++ hama/trunk/graph/src/main/java/org/apache/hama/graph/IncomingVertexMessageManager.java Thu Feb 5 04:15:50 2015
@@ -66,7 +66,7 @@ public class IncomingVertexMessageManage
@Override
public void add(GraphJobMessage item) {
if (item.isVertexMessage()) {
- msgPerVertex.add(item.getVertexId(), item.getVertexValue());
+ msgPerVertex.add(item.getVertexId(), item);
} else if (item.isMapMessage() || item.isVerticesSizeMessage()) {
mapMessages.add(item);
}
Modified: hama/trunk/graph/src/main/java/org/apache/hama/graph/MessagePerVertex.java
URL: http://svn.apache.org/viewvc/hama/trunk/graph/src/main/java/org/apache/hama/graph/MessagePerVertex.java?rev=1657473&r1=1657472&r2=1657473&view=diff
==============================================================================
--- hama/trunk/graph/src/main/java/org/apache/hama/graph/MessagePerVertex.java (original)
+++ hama/trunk/graph/src/main/java/org/apache/hama/graph/MessagePerVertex.java Thu Feb 5 04:15:50 2015
@@ -18,11 +18,9 @@
package org.apache.hama.graph;
import java.util.Iterator;
-import java.util.List;
import java.util.concurrent.ConcurrentNavigableMap;
import java.util.concurrent.ConcurrentSkipListMap;
-import org.apache.hadoop.io.Writable;
import org.apache.hadoop.io.WritableComparable;
public class MessagePerVertex {
@@ -43,12 +41,11 @@ public class MessagePerVertex {
storage.put(vertexId, graphJobMessage);
}
- @SuppressWarnings("rawtypes")
- public void add(WritableComparable vertexID, List<Writable> values) {
+ public void add(WritableComparable<?> vertexID, GraphJobMessage msg) {
if (storage.containsKey(vertexID)) {
- storage.get(vertexID).addAll(values);
+ storage.get(vertexID).addValuesBytes(msg.getValuesBytes(), msg.size());
} else {
- put(vertexID, new GraphJobMessage(vertexID, values));
+ put(vertexID, msg);
}
}
Modified: hama/trunk/graph/src/main/java/org/apache/hama/graph/OutgoingVertexMessageManager.java
URL: http://svn.apache.org/viewvc/hama/trunk/graph/src/main/java/org/apache/hama/graph/OutgoingVertexMessageManager.java?rev=1657473&r1=1657472&r2=1657473&view=diff
==============================================================================
--- hama/trunk/graph/src/main/java/org/apache/hama/graph/OutgoingVertexMessageManager.java (original)
+++ hama/trunk/graph/src/main/java/org/apache/hama/graph/OutgoingVertexMessageManager.java Thu Feb 5 04:15:50 2015
@@ -58,27 +58,26 @@ public class OutgoingVertexMessageManage
}
}
- @SuppressWarnings("rawtypes")
@Override
public void addMessage(String peerName, GraphJobMessage msg) {
InetSocketAddress targetPeerAddress = getSocketAddress(peerName);
if (msg.isVertexMessage()) {
- WritableComparable vertexID = msg.getVertexId();
+ WritableComparable<?> vertexID = msg.getVertexId();
if (!storage.containsKey(targetPeerAddress)) {
storage.put(targetPeerAddress, new MessagePerVertex());
}
MessagePerVertex msgPerVertex = storage.get(targetPeerAddress);
- msgPerVertex.add(vertexID, msg.getVertexValue());
+ msgPerVertex.add(vertexID, msg);
// Combining messages
if (combiner != null
- && msgPerVertex.get(vertexID).getVertexValue().size() > 1) {
+ && msgPerVertex.get(vertexID).getValues().size() > 1) {
storage.get(targetPeerAddress).put(
vertexID,
new GraphJobMessage(vertexID, combiner.combine(msgPerVertex.get(
- vertexID).getVertexValue())));
+ vertexID).getValues())));
}
} else {
outgoingBundles.get(targetPeerAddress).addMessage(msg);