You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@hama.apache.org by ed...@apache.org on 2015/02/05 05:15:50 UTC

svn commit: r1657473 - in /hama/trunk/graph/src/main/java/org/apache/hama/graph: GraphJobMessage.java GraphJobRunner.java IncomingVertexMessageManager.java MessagePerVertex.java OutgoingVertexMessageManager.java

Author: edwardyoon
Date: Thu Feb  5 04:15:50 2015
New Revision: 1657473

URL: http://svn.apache.org/r1657473
Log:
This patch improves data structure of GraphJobMessage.

Modified:
    hama/trunk/graph/src/main/java/org/apache/hama/graph/GraphJobMessage.java
    hama/trunk/graph/src/main/java/org/apache/hama/graph/GraphJobRunner.java
    hama/trunk/graph/src/main/java/org/apache/hama/graph/IncomingVertexMessageManager.java
    hama/trunk/graph/src/main/java/org/apache/hama/graph/MessagePerVertex.java
    hama/trunk/graph/src/main/java/org/apache/hama/graph/OutgoingVertexMessageManager.java

Modified: hama/trunk/graph/src/main/java/org/apache/hama/graph/GraphJobMessage.java
URL: http://svn.apache.org/viewvc/hama/trunk/graph/src/main/java/org/apache/hama/graph/GraphJobMessage.java?rev=1657473&r1=1657472&r2=1657473&view=diff
==============================================================================
--- hama/trunk/graph/src/main/java/org/apache/hama/graph/GraphJobMessage.java (original)
+++ hama/trunk/graph/src/main/java/org/apache/hama/graph/GraphJobMessage.java Thu Feb  5 04:15:50 2015
@@ -71,13 +71,6 @@ public final class GraphJobMessage imple
   public GraphJobMessage() {
   }
 
-  public byte[] serialize(Writable message) throws IOException {
-    ByteArrayOutputStream mbos = new ByteArrayOutputStream();
-    DataOutputStream mdos = new DataOutputStream(mbos);
-    message.write(mdos);
-    return mbos.toByteArray();
-  }
-
   public GraphJobMessage(MapWritable map) {
     this.flag = MAP_FLAG;
     this.map = map;
@@ -97,6 +90,81 @@ public final class GraphJobMessage imple
     addAll(values);
   }
 
+  public GraphJobMessage(WritableComparable<?> vertexID, byte[] valuesBytes,
+      int numOfValues) {
+    this.flag = VERTEX_FLAG;
+    this.vertexId = vertexID;
+    try {
+      this.bufferDos.write(valuesBytes);
+    } catch (IOException e) {
+      e.printStackTrace();
+    }
+
+    this.numOfValues = numOfValues;
+  }
+
+  public MapWritable getMap() {
+    return map;
+  }
+
+  public WritableComparable<?> getVertexId() {
+    return vertexId;
+  }
+
+  private ByteArrayInputStream bis = null;
+  private DataInputStream dis = null;
+  List<Writable> valuesCache;
+
+  public List<Writable> getValues() {
+    bis = new ByteArrayInputStream(byteBuffer.toByteArray());
+    dis = new DataInputStream(bis);
+
+    valuesCache = new ArrayList<Writable>();
+
+    if (valuesCache.isEmpty()) {
+      for (int i = 0; i < numOfValues; i++) {
+        try {
+          Writable v = GraphJobRunner.createVertexValue();
+          v.readFields(dis);
+          valuesCache.add(v);
+        } catch (IOException e) {
+          e.printStackTrace();
+        }
+      }
+    }
+
+    return valuesCache;
+  }
+
+  public byte[] getValuesBytes() {
+    return byteBuffer.toByteArray();
+  }
+
+  public void addValuesBytes(byte[] values, int numOfValues) {
+    try {
+      bufferDos.write(values);
+      this.numOfValues += numOfValues;
+    } catch (IOException e) {
+      // TODO Auto-generated catch block
+      e.printStackTrace();
+    }
+  }
+
+  public void add(Writable value) {
+    try {
+      value.write(bufferDos);
+      numOfValues++;
+    } catch (IOException e) {
+      // TODO Auto-generated catch block
+      e.printStackTrace();
+    }
+  }
+
+  public void addAll(List<Writable> values) {
+    for (Writable v : values)
+      add(v);
+  }
+
   public GraphJobMessage(IntWritable size) {
     this.flag = VERTICES_SIZE_FLAG;
     this.integerMessage = size;
@@ -185,53 +253,6 @@ public final class GraphJobMessage imple
     return 0;
   }
 
-  public MapWritable getMap() {
-    return map;
-  }
-
-  @SuppressWarnings("rawtypes")
-  public WritableComparable getVertexId() {
-    return vertexId;
-  }
-
-  private ByteArrayInputStream bis = null;
-  private DataInputStream dis = null;
-
-  public List<Writable> getVertexValue() {
-    bis = new ByteArrayInputStream(byteBuffer.toByteArray());
-    dis = new DataInputStream(bis);
-
-    List<Writable> valuesCache = new ArrayList<Writable>();
-
-    for (int i = 0; i < numOfValues; i++) {
-      try {
-        Writable v = GraphJobRunner.createVertexValue();
-        v.readFields(dis);
-        valuesCache.add(v);
-      } catch (IOException e) {
-        System.out.println(i + ", " + numOfValues);
-        e.printStackTrace();
-      }
-    }
-
-    return valuesCache;
-  }
-
-  public void add(Writable value) {
-    try {
-      bufferDos.write(serialize(value));
-      numOfValues++;
-    } catch (IOException e) {
-      // TODO Auto-generated catch block
-      e.printStackTrace();
-    }
-  }
-
-  public void addAll(List<Writable> values) {
-    for (Writable v : values)
-      add(v);
-  }
-
   /**
    * @return the number of values
    */

Modified: hama/trunk/graph/src/main/java/org/apache/hama/graph/GraphJobRunner.java
URL: http://svn.apache.org/viewvc/hama/trunk/graph/src/main/java/org/apache/hama/graph/GraphJobRunner.java?rev=1657473&r1=1657472&r2=1657473&view=diff
==============================================================================
--- hama/trunk/graph/src/main/java/org/apache/hama/graph/GraphJobRunner.java (original)
+++ hama/trunk/graph/src/main/java/org/apache/hama/graph/GraphJobRunner.java Thu Feb  5 04:15:50 2015
@@ -236,7 +236,7 @@ public final class GraphJobRunner<V exte
     while (currentMessage != null) {
       vertex = vertices.get((V) currentMessage.getVertexId());
       
-      msgs = (List<M>) currentMessage.getVertexValue();
+      msgs = (List<M>) currentMessage.getValues();
       if (vertex.isHalted()) {
         vertex.setActive();
       }

Modified: hama/trunk/graph/src/main/java/org/apache/hama/graph/IncomingVertexMessageManager.java
URL: http://svn.apache.org/viewvc/hama/trunk/graph/src/main/java/org/apache/hama/graph/IncomingVertexMessageManager.java?rev=1657473&r1=1657472&r2=1657473&view=diff
==============================================================================
--- hama/trunk/graph/src/main/java/org/apache/hama/graph/IncomingVertexMessageManager.java (original)
+++ hama/trunk/graph/src/main/java/org/apache/hama/graph/IncomingVertexMessageManager.java Thu Feb  5 04:15:50 2015
@@ -66,7 +66,7 @@ public class IncomingVertexMessageManage
   @Override
   public void add(GraphJobMessage item) {
     if (item.isVertexMessage()) {
-      msgPerVertex.add(item.getVertexId(), item.getVertexValue());
+      msgPerVertex.add(item.getVertexId(), item);
     } else if (item.isMapMessage() || item.isVerticesSizeMessage()) {
       mapMessages.add(item);
     }

Modified: hama/trunk/graph/src/main/java/org/apache/hama/graph/MessagePerVertex.java
URL: http://svn.apache.org/viewvc/hama/trunk/graph/src/main/java/org/apache/hama/graph/MessagePerVertex.java?rev=1657473&r1=1657472&r2=1657473&view=diff
==============================================================================
--- hama/trunk/graph/src/main/java/org/apache/hama/graph/MessagePerVertex.java (original)
+++ hama/trunk/graph/src/main/java/org/apache/hama/graph/MessagePerVertex.java Thu Feb  5 04:15:50 2015
@@ -18,11 +18,9 @@
 package org.apache.hama.graph;
 
 import java.util.Iterator;
-import java.util.List;
 import java.util.concurrent.ConcurrentNavigableMap;
 import java.util.concurrent.ConcurrentSkipListMap;
 
-import org.apache.hadoop.io.Writable;
 import org.apache.hadoop.io.WritableComparable;
 
 public class MessagePerVertex {
@@ -43,12 +41,11 @@ public class MessagePerVertex {
     storage.put(vertexId, graphJobMessage);
   }
 
-  @SuppressWarnings("rawtypes")
-  public void add(WritableComparable vertexID, List<Writable> values) {
+  public void add(WritableComparable<?> vertexID, GraphJobMessage msg) {
     if (storage.containsKey(vertexID)) {
-      storage.get(vertexID).addAll(values);
+      storage.get(vertexID).addValuesBytes(msg.getValuesBytes(), msg.size());
     } else {
-      put(vertexID, new GraphJobMessage(vertexID, values));
+      put(vertexID, msg);
     }
   }
 

Modified: hama/trunk/graph/src/main/java/org/apache/hama/graph/OutgoingVertexMessageManager.java
URL: http://svn.apache.org/viewvc/hama/trunk/graph/src/main/java/org/apache/hama/graph/OutgoingVertexMessageManager.java?rev=1657473&r1=1657472&r2=1657473&view=diff
==============================================================================
--- hama/trunk/graph/src/main/java/org/apache/hama/graph/OutgoingVertexMessageManager.java (original)
+++ hama/trunk/graph/src/main/java/org/apache/hama/graph/OutgoingVertexMessageManager.java Thu Feb  5 04:15:50 2015
@@ -58,27 +58,26 @@ public class OutgoingVertexMessageManage
     }
   }
 
-  @SuppressWarnings("rawtypes")
   @Override
   public void addMessage(String peerName, GraphJobMessage msg) {
     InetSocketAddress targetPeerAddress = getSocketAddress(peerName);
     if (msg.isVertexMessage()) {
-      WritableComparable vertexID = msg.getVertexId();
+      WritableComparable<?> vertexID = msg.getVertexId();
 
       if (!storage.containsKey(targetPeerAddress)) {
         storage.put(targetPeerAddress, new MessagePerVertex());
       }
 
       MessagePerVertex msgPerVertex = storage.get(targetPeerAddress);
-      msgPerVertex.add(vertexID, msg.getVertexValue());
+      msgPerVertex.add(vertexID, msg);
 
       // Combining messages
       if (combiner != null
-          && msgPerVertex.get(vertexID).getVertexValue().size() > 1) {
+          && msgPerVertex.get(vertexID).getValues().size() > 1) {
         storage.get(targetPeerAddress).put(
             vertexID,
             new GraphJobMessage(vertexID, combiner.combine(msgPerVertex.get(
-                vertexID).getVertexValue())));
+                vertexID).getValues())));
       }
     } else {
       outgoingBundles.get(targetPeerAddress).addMessage(msg);