You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@hama.apache.org by ed...@apache.org on 2015/04/17 02:25:42 UTC

svn commit: r1674173 - in /hama/trunk: core/src/main/java/org/apache/hama/bsp/ core/src/main/java/org/apache/hama/bsp/message/ core/src/test/java/org/apache/hama/bsp/ core/src/test/java/org/apache/hama/bsp/message/ core/src/test/java/org/apache/hama/bs...

Author: edwardyoon
Date: Fri Apr 17 00:25:42 2015
New Revision: 1674173

URL: http://svn.apache.org/r1674173
Log:
HAMA-951: Make BSPMessageBundle thread-safe

Modified:
    hama/trunk/core/src/main/java/org/apache/hama/bsp/BSPMessageBundle.java
    hama/trunk/core/src/main/java/org/apache/hama/bsp/BSPMessageBundleInterface.java
    hama/trunk/core/src/main/java/org/apache/hama/bsp/LocalBSPRunner.java
    hama/trunk/core/src/main/java/org/apache/hama/bsp/message/HamaAsyncMessageManagerImpl.java
    hama/trunk/core/src/main/java/org/apache/hama/bsp/message/HamaMessageManagerImpl.java
    hama/trunk/core/src/main/java/org/apache/hama/bsp/message/OutgoingPOJOMessageBundle.java
    hama/trunk/core/src/test/java/org/apache/hama/bsp/TestBSPMessageBundle.java
    hama/trunk/core/src/test/java/org/apache/hama/bsp/TestCheckpoint.java
    hama/trunk/core/src/test/java/org/apache/hama/bsp/message/TestHamaAsyncMessageManager.java
    hama/trunk/core/src/test/java/org/apache/hama/bsp/message/TestHamaMessageManager.java
    hama/trunk/core/src/test/java/org/apache/hama/bsp/message/compress/TestBSPMessageCompressor.java
    hama/trunk/examples/src/test/java/org/apache/hama/examples/PageRankTest.java
    hama/trunk/graph/src/main/java/org/apache/hama/graph/GraphJobMessage.java
    hama/trunk/graph/src/main/java/org/apache/hama/graph/GraphJobRunner.java
    hama/trunk/graph/src/main/java/org/apache/hama/graph/MessagePerVertex.java
    hama/trunk/graph/src/main/java/org/apache/hama/graph/OutgoingVertexMessageManager.java

Modified: hama/trunk/core/src/main/java/org/apache/hama/bsp/BSPMessageBundle.java
URL: http://svn.apache.org/viewvc/hama/trunk/core/src/main/java/org/apache/hama/bsp/BSPMessageBundle.java?rev=1674173&r1=1674172&r2=1674173&view=diff
==============================================================================
--- hama/trunk/core/src/main/java/org/apache/hama/bsp/BSPMessageBundle.java (original)
+++ hama/trunk/core/src/main/java/org/apache/hama/bsp/BSPMessageBundle.java Fri Apr 17 00:25:42 2015
@@ -17,20 +17,18 @@
  */
 package org.apache.hama.bsp;
 
-import java.io.ByteArrayInputStream;
-import java.io.ByteArrayOutputStream;
 import java.io.DataInput;
 import java.io.DataOutput;
 import java.io.IOException;
+import java.util.ArrayList;
+import java.util.Collection;
 import java.util.Iterator;
+import java.util.List;
 
 import org.apache.commons.logging.Log;
 import org.apache.commons.logging.LogFactory;
 import org.apache.hadoop.io.Writable;
-
-import com.esotericsoftware.kryo.Kryo;
-import com.esotericsoftware.kryo.io.Input;
-import com.esotericsoftware.kryo.io.Output;
+import org.apache.hama.util.ReflectionUtils;
 
 /**
  * BSPMessageBundle stores a group of messages so that they can be sent in batch
@@ -42,15 +40,9 @@ public class BSPMessageBundle<M extends
 
   public static final Log LOG = LogFactory.getLog(BSPMessageBundle.class);
 
-  private String className = null;
-  private int bundleSize = 0;
-
-  private Kryo kryo = new Kryo();
-  private ByteArrayOutputStream outputStream = new ByteArrayOutputStream();
-  private Output output = new Output(outputStream, 4096);
+  private List<M> messages = new ArrayList<M>();
 
   public BSPMessageBundle() {
-    bundleSize = 0;
   }
 
   /**
@@ -59,115 +51,54 @@ public class BSPMessageBundle<M extends
    * @param message BSPMessage to add.
    */
   public void addMessage(M message) {
-    if (className == null) {
-      className = message.getClass().getName();
-      kryo.register(message.getClass());
-    }
-
-    kryo.writeObject(output, message);
-    bundleSize++;
-  }
-  
-  public void addMessages(Iterator<M> iterator) {
-    M message = iterator.next();
-    if (className == null) {
-      className = message.getClass().getName();
-      kryo.register(message.getClass());
-    }
-    
-    kryo.writeObject(output, message);
-    bundleSize++;
-    
-    while(iterator.hasNext()) {
-      kryo.writeObject(output, iterator.next());
-      bundleSize++;
-    }
-  }
-  
-  public void finishAddition() {
-    output.flush();
-  }
-  
-  public byte[] getBuffer() {
-    return outputStream.toByteArray();
+    messages.add(message);
   }
 
-  private ByteArrayInputStream bis = null;
-  private Input in = null;
+  public void addMessages(Collection<M> msgs) {
+    messages.addAll(msgs);
+  }
 
   public Iterator<M> iterator() {
-    bis = new ByteArrayInputStream(outputStream.toByteArray());
-    in = new Input(bis, 4096);
-
-    Iterator<M> it = new Iterator<M>() {
-      Class<M> clazz = null;
-      int counter = 0;
-
-      @Override
-      public boolean hasNext() {
-        if ((bundleSize - counter) > 0) {
-          return true;
-        } else {
-          return false;
-        }
-      }
-
-      @SuppressWarnings("unchecked")
-      @Override
-      public M next() {
-        try {
-          if (clazz == null) {
-            clazz = (Class<M>) Class.forName(className);
-          }
-        } catch (ClassNotFoundException ce) {
-          LOG.error("Class was not found.", ce);
-        }
-
-        counter++;
-
-        return kryo.readObject(in, clazz);
-      }
-
-      @Override
-      public void remove() {
-        // TODO Auto-generated method stub
-      }
-    };
-    return it;
+    return messages.iterator();
   }
 
   public int size() {
-    return bundleSize;
-  }
-
-  /**
-   * @return the byte length of messages
-   * @throws IOException
-   */
-  public long getLength() {
-    return outputStream.size();
+    return messages.size();
   }
 
+  @SuppressWarnings("unchecked")
   @Override
   public void write(DataOutput out) throws IOException {
-    out.writeInt(bundleSize);
-    if (bundleSize > 0) {
-      out.writeUTF(className);
-      out.writeInt(outputStream.size());
-      out.write(outputStream.toByteArray());
+    out.writeInt(messages.size());
+
+    if (messages.size() > 0) {
+      Class<M> clazz = (Class<M>) messages.get(0).getClass();
+      out.writeUTF(clazz.getName());
+
+      for (M m : messages) {
+        m.write(out);
+      }
     }
   }
 
+  @SuppressWarnings("unchecked")
   @Override
   public void readFields(DataInput in) throws IOException {
-    this.bundleSize = in.readInt();
+    int num = in.readInt();
 
-    if (this.bundleSize > 0) {
-      className = in.readUTF();
-      int bytesLength = in.readInt();
-      byte[] temp = new byte[bytesLength];
-      in.readFully(temp);
-      outputStream.write(temp);
+    if (num > 0) {
+      Class<M> clazz = null;
+      try {
+        clazz = (Class<M>) Class.forName(in.readUTF());
+      } catch (ClassNotFoundException e) {
+        LOG.error("Class was not found.", e);
+      }
+
+      for (int i = 0; i < num; i++) {
+        M msg = ReflectionUtils.newInstance(clazz);
+        msg.readFields(in);
+        messages.add(msg);
+      }
     }
   }
 

Modified: hama/trunk/core/src/main/java/org/apache/hama/bsp/BSPMessageBundleInterface.java
URL: http://svn.apache.org/viewvc/hama/trunk/core/src/main/java/org/apache/hama/bsp/BSPMessageBundleInterface.java?rev=1674173&r1=1674172&r2=1674173&view=diff
==============================================================================
--- hama/trunk/core/src/main/java/org/apache/hama/bsp/BSPMessageBundleInterface.java (original)
+++ hama/trunk/core/src/main/java/org/apache/hama/bsp/BSPMessageBundleInterface.java Fri Apr 17 00:25:42 2015
@@ -17,7 +17,6 @@
  */
 package org.apache.hama.bsp;
 
-import java.io.IOException;
 import java.util.Iterator;
 
 import org.apache.hadoop.io.Writable;
@@ -41,15 +40,4 @@ public interface BSPMessageBundleInterfa
    */
   public Iterator<M> iterator();
   
-  /**
-   * @return the message buffer.
-   */
-  public byte[] getBuffer();
-  
-  /**
-   * @return the total byte length of messages
-   * @throws IOException
-   */
-  public long getLength();
-  
 }

Modified: hama/trunk/core/src/main/java/org/apache/hama/bsp/LocalBSPRunner.java
URL: http://svn.apache.org/viewvc/hama/trunk/core/src/main/java/org/apache/hama/bsp/LocalBSPRunner.java?rev=1674173&r1=1674172&r2=1674173&view=diff
==============================================================================
--- hama/trunk/core/src/main/java/org/apache/hama/bsp/LocalBSPRunner.java (original)
+++ hama/trunk/core/src/main/java/org/apache/hama/bsp/LocalBSPRunner.java Fri Apr 17 00:25:42 2015
@@ -349,8 +349,8 @@ public class LocalBSPRunner implements J
     @Override
     public void transfer(InetSocketAddress addr, BSPMessageBundle<M> bundle)
         throws IOException {
-      peer.incrementCounter(BSPPeerImpl.PeerCounter.TOTAL_MESSAGE_BYTES_TRANSFERED,
-          bundle.getLength());
+      //peer.incrementCounter(BSPPeerImpl.PeerCounter.TOTAL_MESSAGE_BYTES_TRANSFERED,
+      //    bundle.getLength());
       
       MANAGER_MAP.get(addr).localQueueForNextIteration.addBundle(bundle);
       peer.incrementCounter(BSPPeerImpl.PeerCounter.TOTAL_MESSAGES_RECEIVED,

Modified: hama/trunk/core/src/main/java/org/apache/hama/bsp/message/HamaAsyncMessageManagerImpl.java
URL: http://svn.apache.org/viewvc/hama/trunk/core/src/main/java/org/apache/hama/bsp/message/HamaAsyncMessageManagerImpl.java?rev=1674173&r1=1674172&r2=1674173&view=diff
==============================================================================
--- hama/trunk/core/src/main/java/org/apache/hama/bsp/message/HamaAsyncMessageManagerImpl.java (original)
+++ hama/trunk/core/src/main/java/org/apache/hama/bsp/message/HamaAsyncMessageManagerImpl.java Fri Apr 17 00:25:42 2015
@@ -133,8 +133,8 @@ public final class HamaAsyncMessageManag
             compressed.length);
         bspPeerConnection.put(compressed);
       } else {
-        peer.incrementCounter(BSPPeerImpl.PeerCounter.TOTAL_MESSAGE_BYTES_TRANSFERED,
-            bundle.getLength());
+        //peer.incrementCounter(BSPPeerImpl.PeerCounter.TOTAL_MESSAGE_BYTES_TRANSFERED,
+        //    bundle.getLength());
         bspPeerConnection.put(bundle);
       }
     }

Modified: hama/trunk/core/src/main/java/org/apache/hama/bsp/message/HamaMessageManagerImpl.java
URL: http://svn.apache.org/viewvc/hama/trunk/core/src/main/java/org/apache/hama/bsp/message/HamaMessageManagerImpl.java?rev=1674173&r1=1674172&r2=1674173&view=diff
==============================================================================
--- hama/trunk/core/src/main/java/org/apache/hama/bsp/message/HamaMessageManagerImpl.java (original)
+++ hama/trunk/core/src/main/java/org/apache/hama/bsp/message/HamaMessageManagerImpl.java Fri Apr 17 00:25:42 2015
@@ -133,7 +133,7 @@ public final class HamaMessageManagerImp
         peer.incrementCounter(BSPPeerImpl.PeerCounter.TOTAL_DECOMPRESSED_BYTES, byteBuffer.size());
         bspPeerConnection.put(compressed);
       } else {
-        peer.incrementCounter(BSPPeerImpl.PeerCounter.TOTAL_MESSAGE_BYTES_TRANSFERED, bundle.getLength());
+        //peer.incrementCounter(BSPPeerImpl.PeerCounter.TOTAL_MESSAGE_BYTES_TRANSFERED, bundle.getLength());
         bspPeerConnection.put(bundle);
       }
     }

Modified: hama/trunk/core/src/main/java/org/apache/hama/bsp/message/OutgoingPOJOMessageBundle.java
URL: http://svn.apache.org/viewvc/hama/trunk/core/src/main/java/org/apache/hama/bsp/message/OutgoingPOJOMessageBundle.java?rev=1674173&r1=1674172&r2=1674173&view=diff
==============================================================================
--- hama/trunk/core/src/main/java/org/apache/hama/bsp/message/OutgoingPOJOMessageBundle.java (original)
+++ hama/trunk/core/src/main/java/org/apache/hama/bsp/message/OutgoingPOJOMessageBundle.java Fri Apr 17 00:25:42 2015
@@ -70,9 +70,6 @@ public class OutgoingPOJOMessageBundle<M
 
   @Override
   public Iterator<Entry<InetSocketAddress, BSPMessageBundle<M>>> getBundleIterator() {
-    for(BSPMessageBundle<M> b : outgoingBundles.values()) {
-      b.finishAddition();
-    }
     return outgoingBundles.entrySet().iterator();
   }
 

Modified: hama/trunk/core/src/test/java/org/apache/hama/bsp/TestBSPMessageBundle.java
URL: http://svn.apache.org/viewvc/hama/trunk/core/src/test/java/org/apache/hama/bsp/TestBSPMessageBundle.java?rev=1674173&r1=1674172&r2=1674173&view=diff
==============================================================================
--- hama/trunk/core/src/test/java/org/apache/hama/bsp/TestBSPMessageBundle.java (original)
+++ hama/trunk/core/src/test/java/org/apache/hama/bsp/TestBSPMessageBundle.java Fri Apr 17 00:25:42 2015
@@ -38,7 +38,6 @@ public class TestBSPMessageBundle extend
     // Serialize it.
     ByteArrayOutputStream baos = new ByteArrayOutputStream();
     bundle.write(new DataOutputStream(baos));
-    bundle.finishAddition();
     baos.close();
     // Deserialize it.
     BSPMessageBundle<BytesWritable> readBundle = new BSPMessageBundle<BytesWritable>();
@@ -65,7 +64,6 @@ public class TestBSPMessageBundle extend
       testMessages[i] = msg;
       bundle.addMessage(testMessages[i]);
     }
-    bundle.finishAddition();
     
     // Serialize it.
     ByteArrayOutputStream baos = new ByteArrayOutputStream();

Modified: hama/trunk/core/src/test/java/org/apache/hama/bsp/TestCheckpoint.java
URL: http://svn.apache.org/viewvc/hama/trunk/core/src/test/java/org/apache/hama/bsp/TestCheckpoint.java?rev=1674173&r1=1674172&r2=1674173&view=diff
==============================================================================
--- hama/trunk/core/src/test/java/org/apache/hama/bsp/TestCheckpoint.java (original)
+++ hama/trunk/core/src/test/java/org/apache/hama/bsp/TestCheckpoint.java Fri Apr 17 00:25:42 2015
@@ -653,7 +653,6 @@ public class TestCheckpoint extends Test
 
     BSPMessageBundle<Text> bundleRead = messenger.getLoopbackBundle();
     assertEquals(5, bundleRead.size());
-    bundleRead.finishAddition();
     
     String recoveredMsg = bundleRead.iterator().next().toString();
     assertEquals(recoveredMsg, "data");

Modified: hama/trunk/core/src/test/java/org/apache/hama/bsp/message/TestHamaAsyncMessageManager.java
URL: http://svn.apache.org/viewvc/hama/trunk/core/src/test/java/org/apache/hama/bsp/message/TestHamaAsyncMessageManager.java?rev=1674173&r1=1674172&r2=1674173&view=diff
==============================================================================
--- hama/trunk/core/src/test/java/org/apache/hama/bsp/message/TestHamaAsyncMessageManager.java (original)
+++ hama/trunk/core/src/test/java/org/apache/hama/bsp/message/TestHamaAsyncMessageManager.java Fri Apr 17 00:25:42 2015
@@ -91,8 +91,6 @@ public class TestHamaAsyncMessageManager
       bundle.addMessage(it.next());
     }
 
-    bundle.finishAddition();
-    
     messageManager.transfer(peer, bundle);
 
     messageManager.clearOutgoingMessages();

Modified: hama/trunk/core/src/test/java/org/apache/hama/bsp/message/TestHamaMessageManager.java
URL: http://svn.apache.org/viewvc/hama/trunk/core/src/test/java/org/apache/hama/bsp/message/TestHamaMessageManager.java?rev=1674173&r1=1674172&r2=1674173&view=diff
==============================================================================
--- hama/trunk/core/src/test/java/org/apache/hama/bsp/message/TestHamaMessageManager.java (original)
+++ hama/trunk/core/src/test/java/org/apache/hama/bsp/message/TestHamaMessageManager.java Fri Apr 17 00:25:42 2015
@@ -90,7 +90,6 @@ public class TestHamaMessageManager exte
     while (it.hasNext()) {
       bundle.addMessage(it.next());
     }
-    bundle.finishAddition();
     
     messageManager.transfer(peer, bundle);
 

Modified: hama/trunk/core/src/test/java/org/apache/hama/bsp/message/compress/TestBSPMessageCompressor.java
URL: http://svn.apache.org/viewvc/hama/trunk/core/src/test/java/org/apache/hama/bsp/message/compress/TestBSPMessageCompressor.java?rev=1674173&r1=1674172&r2=1674173&view=diff
==============================================================================
--- hama/trunk/core/src/test/java/org/apache/hama/bsp/message/compress/TestBSPMessageCompressor.java (original)
+++ hama/trunk/core/src/test/java/org/apache/hama/bsp/message/compress/TestBSPMessageCompressor.java Fri Apr 17 00:25:42 2015
@@ -50,8 +50,6 @@ public class TestBSPMessageCompressor ex
       a.addMessage(new IntWritable(i));
     }
 
-    a.finishAddition();
-    
     ByteArrayOutputStream byteBuffer = new ByteArrayOutputStream();
     DataOutputStream bufferDos = new DataOutputStream(byteBuffer);
     a.write(bufferDos);

Modified: hama/trunk/examples/src/test/java/org/apache/hama/examples/PageRankTest.java
URL: http://svn.apache.org/viewvc/hama/trunk/examples/src/test/java/org/apache/hama/examples/PageRankTest.java?rev=1674173&r1=1674172&r2=1674173&view=diff
==============================================================================
--- hama/trunk/examples/src/test/java/org/apache/hama/examples/PageRankTest.java (original)
+++ hama/trunk/examples/src/test/java/org/apache/hama/examples/PageRankTest.java Fri Apr 17 00:25:42 2015
@@ -83,7 +83,7 @@ public class PageRankTest extends TestCa
 
   private void generateTestData() {
     try {
-      FastGraphGen.main(new String[] { "-v", "60", "-e", "3", "-output_path",
+      FastGraphGen.main(new String[] { "-v", "30", "-e", "3", "-output_path",
           INPUT, "-task_num", "3", "-of", "json"});
     } catch (Exception e) {
       e.printStackTrace();

Modified: hama/trunk/graph/src/main/java/org/apache/hama/graph/GraphJobMessage.java
URL: http://svn.apache.org/viewvc/hama/trunk/graph/src/main/java/org/apache/hama/graph/GraphJobMessage.java?rev=1674173&r1=1674172&r2=1674173&view=diff
==============================================================================
--- hama/trunk/graph/src/main/java/org/apache/hama/graph/GraphJobMessage.java (original)
+++ hama/trunk/graph/src/main/java/org/apache/hama/graph/GraphJobMessage.java Fri Apr 17 00:25:42 2015
@@ -55,7 +55,6 @@ public final class GraphJobMessage imple
   private WritableComparable vertexId;
   private IntWritable integerMessage;
   private static GraphJobMessageComparator comparator;
-  private Vertex<?, ?, ?> vertex;
   
   private int numOfValues = 0;
 
@@ -91,27 +90,10 @@ public final class GraphJobMessage imple
     addAll(values);
   }
 
-  public GraphJobMessage(WritableComparable<?> vertexID, byte[] valuesBytes,
-      int numOfValues) {
-    this.flag = VERTEX_FLAG;
-    this.vertexId = vertexID;
-    try {
-      this.byteBuffer.write(valuesBytes);
-    } catch (IOException e) {
-      e.printStackTrace();
-    }
-
-    this.numOfValues = numOfValues;
-  }
-
   public MapWritable getMap() {
     return map;
   }
 
-  public Vertex<?, ?, ?> getVertex() {
-    return vertex;
-  }
-  
   public WritableComparable<?> getVertexId() {
     return vertexId;
   }
@@ -169,7 +151,8 @@ public final class GraphJobMessage imple
 
   public GraphJobMessage(Vertex<?, ?, ?> vertex) {
     this.flag = PARTITION_FLAG;
-    this.vertex = vertex;
+    
+    add(vertex);
   }
 
   @Override
@@ -188,7 +171,9 @@ public final class GraphJobMessage imple
     } else if (isVerticesSizeMessage()) {
       integerMessage.write(out);
     } else if (isPartitioningMessage()) {
-      vertex.write(out);
+      out.writeInt(numOfValues);
+      out.writeInt(byteBuffer.size());
+      out.write(byteBuffer.toByteArray());
     } else {
       vertexId.write(out);
     }
@@ -235,8 +220,11 @@ public final class GraphJobMessage imple
       integerMessage = new IntWritable();
       integerMessage.readFields(in);
     } else if (isPartitioningMessage()) {
-      vertex = (Vertex<?, ?, ?>) ReflectionUtils.newInstance(GraphJobRunner.VERTEX_CLASS, null);
-      vertex.readFields(in);
+      this.numOfValues = in.readInt();
+      int bytesLength = in.readInt();
+      byte[] temp = new byte[bytesLength];
+      in.readFully(temp);
+      byteBuffer.write(temp);
     } else {
       vertexId = ReflectionUtils.newInstance(GraphJobRunner.VERTEX_ID_CLASS,
           null);
@@ -296,7 +284,7 @@ public final class GraphJobMessage imple
       return "#Vertices: " + integerMessage;
     } else {
       return "GraphJobMessage [flag=" + flag + ", map=" + map + ", vertexId="
-          + vertexId + ", vertexValue=" + numOfValues + ", " + vertex.toString() + "]";
+          + vertexId + ", vertexValue=" + numOfValues + "]";
     }
   }
 

Modified: hama/trunk/graph/src/main/java/org/apache/hama/graph/GraphJobRunner.java
URL: http://svn.apache.org/viewvc/hama/trunk/graph/src/main/java/org/apache/hama/graph/GraphJobRunner.java?rev=1674173&r1=1674172&r2=1674173&view=diff
==============================================================================
--- hama/trunk/graph/src/main/java/org/apache/hama/graph/GraphJobRunner.java (original)
+++ hama/trunk/graph/src/main/java/org/apache/hama/graph/GraphJobRunner.java Fri Apr 17 00:25:42 2015
@@ -17,9 +17,13 @@
  */
 package org.apache.hama.graph;
 
+import java.io.ByteArrayInputStream;
+import java.io.DataInputStream;
 import java.io.IOException;
 import java.util.Collections;
+import java.util.HashMap;
 import java.util.Iterator;
+import java.util.Map;
 import java.util.Map.Entry;
 import java.util.concurrent.ExecutorService;
 import java.util.concurrent.Executors;
@@ -278,12 +282,12 @@ public final class GraphJobRunner<V exte
     this.changedVertexCnt = 0;
     vertices.startSuperstep();
 
-    ExecutorService executor = Executors.newFixedThreadPool((vertices.size()
-        / conf.getInt("hama.graph.threadpool.percentage", 10)) + 1);
+    ExecutorService executor = Executors
+        .newFixedThreadPool((vertices.size() / conf.getInt(
+            "hama.graph.threadpool.percentage", 10)) + 1);
 
     for (Vertex<V, E, M> v : vertices.getValues()) {
-      Runnable worker = new ComputeRunnable(v, Collections.singleton(v
-          .getValue()));
+      Runnable worker = new ComputeRunnable(v, null);
       executor.execute(worker);
     }
     executor.shutdown();
@@ -308,8 +312,10 @@ public final class GraphJobRunner<V exte
     public void run() {
       try {
         // call once at initial superstep
-        if(iteration == 0)
+        if (iteration == 0) {
           vertex.setup(conf);
+          msgs = Collections.singleton(vertex.getValue());
+        }
 
         vertex.compute(msgs);
         vertices.finishVertexComputation(vertex);
@@ -370,6 +376,8 @@ public final class GraphJobRunner<V exte
     EDGE_VALUE_CLASS = edgeValueClass;
   }
 
+  Map<String, GraphJobMessage> messages = new HashMap<String, GraphJobMessage>();
+
   /**
    * Loads vertices into memory of each peer.
    */
@@ -377,6 +385,9 @@ public final class GraphJobRunner<V exte
   private void loadVertices(
       BSPPeer<Writable, Writable, Writable, Writable, GraphJobMessage> peer)
       throws IOException, SyncException, InterruptedException {
+    // kryo.register(GraphJobRunner
+    // .<V, E, M> newVertexInstance(VERTEX_CLASS).getClass());
+
     VertexInputReader<Writable, Writable, V, E, M> reader = (VertexInputReader<Writable, Writable, V, E, M>) ReflectionUtils
         .newInstance(conf.getClass(Constants.RUNTIME_PARTITION_RECORDCONVERTER,
             VertexInputReader.class));
@@ -401,19 +412,38 @@ public final class GraphJobRunner<V exte
           Runnable worker = new LoadWorker(vertex);
           executor.execute(worker);
         } else {
-          peer.send(dstHost, new GraphJobMessage(vertex));
+          if (!messages.containsKey(dstHost)) {
+            messages.put(dstHost, new GraphJobMessage(vertex));
+          } else {
+            messages.get(dstHost).add(vertex);
+          }
         }
       }
     } catch (Exception e) {
       e.printStackTrace();
     }
 
+    for (Entry<String, GraphJobMessage> e : messages.entrySet()) {
+      peer.send(e.getKey(), e.getValue());
+    }
+    messages.clear();
+    messages = null;
+    
     peer.sync();
 
     GraphJobMessage msg;
     while ((msg = peer.getCurrentMessage()) != null) {
-      Runnable worker = new LoadWorker((Vertex<V, E, M>) msg.getVertex());
-      executor.execute(worker);
+      ByteArrayInputStream bis = new ByteArrayInputStream(msg.getValuesBytes());
+      DataInputStream dis = new DataInputStream(bis);
+
+      for (int i = 0; i < msg.getNumOfValues(); i++) {
+        Vertex<V, E, M> vertex = GraphJobRunner
+            .<V, E, M> newVertexInstance(VERTEX_CLASS);
+        vertex.readFields(dis);
+
+        Runnable worker = new LoadWorker(vertex);
+        executor.execute(worker);
+      }
     }
     executor.shutdown();
     while (!executor.isTerminated()) {

Modified: hama/trunk/graph/src/main/java/org/apache/hama/graph/MessagePerVertex.java
URL: http://svn.apache.org/viewvc/hama/trunk/graph/src/main/java/org/apache/hama/graph/MessagePerVertex.java?rev=1674173&r1=1674172&r2=1674173&view=diff
==============================================================================
--- hama/trunk/graph/src/main/java/org/apache/hama/graph/MessagePerVertex.java (original)
+++ hama/trunk/graph/src/main/java/org/apache/hama/graph/MessagePerVertex.java Fri Apr 17 00:25:42 2015
@@ -17,7 +17,7 @@
  */
 package org.apache.hama.graph;
 
-import java.util.Iterator;
+import java.util.Collection;
 import java.util.List;
 import java.util.concurrent.ConcurrentNavigableMap;
 import java.util.concurrent.ConcurrentSkipListMap;
@@ -62,8 +62,8 @@ public class MessagePerVertex {
     return storage.get(vertexID);
   }
 
-  public Iterator<GraphJobMessage> iterator() {
-    return storage.values().iterator();
+  public Collection<GraphJobMessage> getMessages() {
+    return storage.values();
   }
 
   public GraphJobMessage pollFirstEntry() {
@@ -71,7 +71,7 @@ public class MessagePerVertex {
   }
 
   public List<List<GraphJobMessage>> getSubLists(int num) {
-    return Lists.partition(Lists.newArrayList(iterator()), num);
+    return Lists.partition(Lists.newArrayList(storage.values()), num);
   }
 
 }

Modified: hama/trunk/graph/src/main/java/org/apache/hama/graph/OutgoingVertexMessageManager.java
URL: http://svn.apache.org/viewvc/hama/trunk/graph/src/main/java/org/apache/hama/graph/OutgoingVertexMessageManager.java?rev=1674173&r1=1674172&r2=1674173&view=diff
==============================================================================
--- hama/trunk/graph/src/main/java/org/apache/hama/graph/OutgoingVertexMessageManager.java (original)
+++ hama/trunk/graph/src/main/java/org/apache/hama/graph/OutgoingVertexMessageManager.java Fri Apr 17 00:25:42 2015
@@ -108,9 +108,8 @@ public class OutgoingVertexMessageManage
 
         MessagePerVertex msgStorage = storage.get(bundle.getKey());
         if (msgStorage != null) {
-          bundle.getValue().addMessages(msgStorage.iterator());
+          bundle.getValue().addMessages(msgStorage.getMessages());
         }
-        bundle.getValue().finishAddition();
         storage.remove(bundle.getKey());
         return bundle;
       }